turbomcp_client/client/core.rs
1//! Core Client implementation for MCP communication
2//!
3//! This module contains the main `Client<T>` struct and its implementation,
4//! providing the core MCP client functionality including:
5//!
6//! - Connection initialization and lifecycle management
7//! - Message processing and bidirectional communication
8//! - MCP operation support (tools, prompts, resources, sampling, etc.)
9//! - Plugin middleware integration
10//! - Handler registration and management
11//!
12//! # Architecture
13//!
14//! `Client<T>` is implemented as a cheaply-cloneable Arc wrapper with interior
15//! mutability (same pattern as reqwest and AWS SDK):
16//!
17//! - **AtomicBool** for initialized flag (lock-free)
18//! - **Arc<Mutex<...>>** for handlers/plugins (infrequent mutation)
19//! - **`Arc<ClientInner<T>>`** for cheap cloning
20
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::{Arc, Mutex as StdMutex};
23
24use turbomcp_protocol::jsonrpc::*;
25use turbomcp_protocol::types::{
26 ClientCapabilities as ProtocolClientCapabilities, InitializeResult as ProtocolInitializeResult,
27 *,
28};
29use turbomcp_protocol::{Error, PROTOCOL_VERSION, Result};
30use turbomcp_transport::{Transport, TransportMessage};
31
32use super::config::InitializeResult;
33use super::protocol::ProtocolClient;
34use crate::{
35 ClientCapabilities,
36 handlers::{HandlerError, HandlerRegistry},
37 sampling::SamplingHandler,
38};
39
40/// Inner client state with interior mutability
41///
42/// This structure contains the actual client state and is wrapped in Arc<...>
43/// to enable cheap cloning (same pattern as reqwest and AWS SDK).
44pub(super) struct ClientInner<T: Transport + 'static> {
45 /// Protocol client for low-level communication
46 pub(super) protocol: ProtocolClient<T>,
47
48 /// Client capabilities (immutable after construction)
49 pub(super) capabilities: ClientCapabilities,
50
51 /// Initialization state (lock-free atomic boolean)
52 pub(super) initialized: AtomicBool,
53
54 /// Optional sampling handler (mutex for dynamic updates)
55 pub(super) sampling_handler: Arc<StdMutex<Option<Arc<dyn SamplingHandler>>>>,
56
57 /// Handler registry for bidirectional communication (mutex for registration)
58 pub(super) handlers: Arc<StdMutex<HandlerRegistry>>,
59
60 /// Plugin registry for middleware (tokio mutex - holds across await)
61 pub(super) plugin_registry: Arc<tokio::sync::Mutex<crate::plugins::PluginRegistry>>,
62}
63
64/// The core MCP client implementation
65///
66/// Client provides a comprehensive interface for communicating with MCP servers,
67/// supporting all protocol features including tools, prompts, resources, sampling,
68/// elicitation, and bidirectional communication patterns.
69///
70/// # Clone Pattern
71///
72/// `Client<T>` is cheaply cloneable via Arc (same pattern as reqwest and AWS SDK).
73/// All clones share the same underlying connection and state:
74///
75/// ```rust,no_run
76/// use turbomcp_client::Client;
77/// use turbomcp_transport::stdio::StdioTransport;
78///
79/// # async fn example() -> turbomcp_protocol::Result<()> {
80/// let client = Client::new(StdioTransport::new());
81/// client.initialize().await?;
82///
83/// // Cheap clone - shares same connection
84/// let client2 = client.clone();
85/// tokio::spawn(async move {
86/// client2.list_tools().await.ok();
87/// });
88/// # Ok(())
89/// # }
90/// ```
91///
92/// The client must be initialized before use by calling `initialize()` to perform
93/// the MCP handshake and capability negotiation.
94///
95/// # Features
96///
97/// - **Protocol Compliance**: Full MCP 2025-06-18 specification support
98/// - **Bidirectional Communication**: Server-initiated requests and client responses
99/// - **Plugin Middleware**: Extensible request/response processing
100/// - **Handler Registry**: Callbacks for server-initiated operations
101/// - **Connection Management**: Robust error handling and recovery
102/// - **Type Safety**: Compile-time guarantees for MCP message types
103/// - **Cheap Cloning**: Arc-based sharing like reqwest/AWS SDK
104///
105/// # Examples
106///
107/// ```rust,no_run
108/// use turbomcp_client::Client;
109/// use turbomcp_transport::stdio::StdioTransport;
110/// use std::collections::HashMap;
111///
112/// # async fn example() -> turbomcp_protocol::Result<()> {
113/// // Create and initialize client (no mut needed!)
114/// let client = Client::new(StdioTransport::new());
115/// let init_result = client.initialize().await?;
116/// println!("Connected to: {}", init_result.server_info.name);
117///
118/// // Use MCP operations
119/// let tools = client.list_tools().await?;
120/// let mut args = HashMap::new();
121/// args.insert("input".to_string(), serde_json::json!("test"));
122/// let result = client.call_tool("my_tool", Some(args)).await?;
123/// # Ok(())
124/// # }
125/// ```
126pub struct Client<T: Transport + 'static> {
127 pub(super) inner: Arc<ClientInner<T>>,
128}
129
130/// Clone implementation via Arc (same pattern as reqwest/AWS SDK)
131///
132/// Cloning a Client is cheap (just an Arc clone) and all clones share
133/// the same underlying connection and state.
134impl<T: Transport + 'static> Clone for Client<T> {
135 fn clone(&self) -> Self {
136 Self {
137 inner: Arc::clone(&self.inner),
138 }
139 }
140}
141
142impl<T: Transport + 'static> Drop for ClientInner<T> {
143 fn drop(&mut self) {
144 // Best-effort cleanup if shutdown() wasn't called
145 // This is a safety net, but applications SHOULD call shutdown() explicitly
146
147 // Single warning to catch attention
148 tracing::warn!(
149 "MCP Client dropped without explicit shutdown() - call client.shutdown().await for clean resource cleanup"
150 );
151
152 // Details at debug level to avoid noise in production
153 tracing::debug!(" 📘 RECOMMENDATION: Call client.shutdown().await before dropping");
154 tracing::debug!(
155 " 🐛 IMPACT: Background tasks (especially WebSocket reconnection) may continue running"
156 );
157 tracing::debug!(" 💡 See client shutdown documentation for best practices");
158
159 // We can shutdown the dispatcher (it's synchronous)
160 self.protocol.dispatcher().shutdown();
161 tracing::debug!(" ✅ Message dispatcher stopped");
162
163 // ⚠️ LIMITATION: Cannot call transport.disconnect() from Drop because it's async
164 // This means:
165 // - WebSocket: Close frame not sent, reconnection tasks keep running
166 // - HTTP: Connections may not close cleanly
167 // - All transports: Background tasks may be orphaned
168 tracing::debug!(" ❌ Transport NOT disconnected (Drop cannot call async methods)");
169 tracing::debug!(" ⚠️ WebSocket reconnection and other background tasks may continue");
170 }
171}
172
173impl<T: Transport + 'static> Client<T> {
174 /// Create a new client with the specified transport
175 ///
176 /// Creates a new MCP client instance with default capabilities.
177 /// The client must be initialized before use by calling `initialize()`.
178 ///
179 /// # Arguments
180 ///
181 /// * `transport` - The transport implementation to use for communication
182 ///
183 /// # Examples
184 ///
185 /// ```rust,no_run
186 /// use turbomcp_client::Client;
187 /// use turbomcp_transport::stdio::StdioTransport;
188 ///
189 /// let transport = StdioTransport::new();
190 /// let client = Client::new(transport);
191 /// ```
192 pub fn new(transport: T) -> Self {
193 let client = Self {
194 inner: Arc::new(ClientInner {
195 protocol: ProtocolClient::new(transport),
196 capabilities: ClientCapabilities::default(),
197 initialized: AtomicBool::new(false),
198 sampling_handler: Arc::new(StdMutex::new(None)),
199 handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
200 plugin_registry: Arc::new(tokio::sync::Mutex::new(
201 crate::plugins::PluginRegistry::new(),
202 )),
203 }),
204 };
205
206 // Register dispatcher handlers for bidirectional communication
207 client.register_dispatcher_handlers();
208
209 client
210 }
211
212 /// Create a new client with custom capabilities
213 ///
214 /// # Arguments
215 ///
216 /// * `transport` - The transport implementation to use
217 /// * `capabilities` - The client capabilities to negotiate
218 ///
219 /// # Examples
220 ///
221 /// ```rust,no_run
222 /// use turbomcp_client::{Client, ClientCapabilities};
223 /// use turbomcp_transport::stdio::StdioTransport;
224 ///
225 /// let capabilities = ClientCapabilities {
226 /// tools: true,
227 /// prompts: true,
228 /// resources: false,
229 /// sampling: false,
230 /// };
231 ///
232 /// let transport = StdioTransport::new();
233 /// let client = Client::with_capabilities(transport, capabilities);
234 /// ```
235 pub fn with_capabilities(transport: T, capabilities: ClientCapabilities) -> Self {
236 let client = Self {
237 inner: Arc::new(ClientInner {
238 protocol: ProtocolClient::new(transport),
239 capabilities,
240 initialized: AtomicBool::new(false),
241 sampling_handler: Arc::new(StdMutex::new(None)),
242 handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
243 plugin_registry: Arc::new(tokio::sync::Mutex::new(
244 crate::plugins::PluginRegistry::new(),
245 )),
246 }),
247 };
248
249 // Register dispatcher handlers for bidirectional communication
250 client.register_dispatcher_handlers();
251
252 client
253 }
254
255 /// Shutdown the client and clean up all resources
256 ///
257 /// This method performs a **graceful shutdown** of the MCP client by:
258 /// 1. Stopping the message dispatcher background task
259 /// 2. Disconnecting the transport (closes connection, stops background tasks)
260 ///
261 /// **CRITICAL**: After calling `shutdown()`, the client can no longer be used.
262 ///
263 /// # Why This Method Exists
264 ///
265 /// The `Drop` implementation cannot call async methods like `transport.disconnect()`,
266 /// which is required for proper cleanup of WebSocket connections and background tasks.
267 /// Without calling `shutdown()`, WebSocket reconnection tasks will continue running.
268 ///
269 /// # Best Practices
270 ///
271 /// - **Always call `shutdown()` before dropping** for clean resource cleanup
272 /// - For applications: call in signal handlers (`SIGINT`, `SIGTERM`)
273 /// - For tests: call in cleanup/teardown
274 /// - If forgotten, Drop will log warnings and do best-effort cleanup
275 ///
276 /// # Examples
277 ///
278 /// ```rust,no_run
279 /// # use turbomcp_client::Client;
280 /// # use turbomcp_transport::stdio::StdioTransport;
281 /// # async fn example() -> turbomcp_protocol::Result<()> {
282 /// let client = Client::new(StdioTransport::new());
283 /// client.initialize().await?;
284 ///
285 /// // Use client...
286 /// let _tools = client.list_tools().await?;
287 ///
288 /// // ✅ Clean shutdown
289 /// client.shutdown().await?;
290 /// # Ok(())
291 /// # }
292 /// ```
293 pub async fn shutdown(&self) -> Result<()> {
294 tracing::info!("🛑 Shutting down MCP client");
295
296 // 1. Shutdown message dispatcher
297 self.inner.protocol.dispatcher().shutdown();
298 tracing::debug!("✅ Message dispatcher stopped");
299
300 // 2. Disconnect transport (WebSocket: stops reconnection, HTTP: closes connections)
301 match self.inner.protocol.transport().disconnect().await {
302 Ok(()) => {
303 tracing::info!("✅ Transport disconnected successfully");
304 }
305 Err(e) => {
306 tracing::warn!("Transport disconnect error (may already be closed): {}", e);
307 }
308 }
309
310 tracing::info!("✅ MCP client shutdown complete");
311 Ok(())
312 }
313}
314
315// ============================================================================
316// HTTP-Specific Convenience Constructors (Feature-Gated)
317// ============================================================================
318
319#[cfg(feature = "http")]
320impl Client<turbomcp_transport::streamable_http_client::StreamableHttpClientTransport> {
321 /// Connect to an HTTP MCP server (convenience method)
322 ///
323 /// This is a beautiful one-liner alternative to manual configuration.
324 /// Creates an HTTP client, connects, and initializes in one call.
325 ///
326 /// # Arguments
327 ///
328 /// * `url` - The base URL of the MCP server (e.g., "http://localhost:8080")
329 ///
330 /// # Returns
331 ///
332 /// Returns an initialized `Client` ready to use.
333 ///
334 /// # Errors
335 ///
336 /// Returns an error if:
337 /// - The URL is invalid
338 /// - Connection to the server fails
339 /// - Initialization handshake fails
340 ///
341 /// # Examples
342 ///
343 /// ```rust,no_run
344 /// use turbomcp_client::Client;
345 ///
346 /// # async fn example() -> turbomcp_protocol::Result<()> {
347 /// // Beautiful one-liner - balanced with server DX
348 /// let client = Client::connect_http("http://localhost:8080").await?;
349 ///
350 /// // Now use it directly
351 /// let tools = client.list_tools().await?;
352 /// # Ok(())
353 /// # }
354 /// ```
355 ///
356 /// Compare to verbose approach (10+ lines):
357 /// ```rust,no_run
358 /// use turbomcp_client::Client;
359 /// use turbomcp_transport::streamable_http_client::{
360 /// StreamableHttpClientConfig, StreamableHttpClientTransport
361 /// };
362 ///
363 /// # async fn example() -> turbomcp_protocol::Result<()> {
364 /// let config = StreamableHttpClientConfig {
365 /// base_url: "http://localhost:8080".to_string(),
366 /// ..Default::default()
367 /// };
368 /// let transport = StreamableHttpClientTransport::new(config);
369 /// let client = Client::new(transport);
370 /// client.initialize().await?;
371 /// # Ok(())
372 /// # }
373 /// ```
374 pub async fn connect_http(url: impl Into<String>) -> Result<Self> {
375 use turbomcp_transport::streamable_http_client::{
376 StreamableHttpClientConfig, StreamableHttpClientTransport,
377 };
378
379 let config = StreamableHttpClientConfig {
380 base_url: url.into(),
381 ..Default::default()
382 };
383
384 let transport = StreamableHttpClientTransport::new(config);
385 let client = Self::new(transport);
386
387 // Initialize connection immediately
388 client.initialize().await?;
389
390 Ok(client)
391 }
392
393 /// Connect to an HTTP MCP server with custom configuration
394 ///
395 /// Provides more control than `connect_http()` while still being ergonomic.
396 ///
397 /// # Arguments
398 ///
399 /// * `url` - The base URL of the MCP server
400 /// * `config_fn` - Function to customize the configuration
401 ///
402 /// # Examples
403 ///
404 /// ```rust,no_run
405 /// use turbomcp_client::Client;
406 /// use std::time::Duration;
407 ///
408 /// # async fn example() -> turbomcp_protocol::Result<()> {
409 /// let client = Client::connect_http_with("http://localhost:8080", |config| {
410 /// config.timeout = Duration::from_secs(60);
411 /// config.endpoint_path = "/api/mcp".to_string();
412 /// }).await?;
413 /// # Ok(())
414 /// # }
415 /// ```
416 pub async fn connect_http_with<F>(url: impl Into<String>, config_fn: F) -> Result<Self>
417 where
418 F: FnOnce(&mut turbomcp_transport::streamable_http_client::StreamableHttpClientConfig),
419 {
420 use turbomcp_transport::streamable_http_client::{
421 StreamableHttpClientConfig, StreamableHttpClientTransport,
422 };
423
424 let mut config = StreamableHttpClientConfig {
425 base_url: url.into(),
426 ..Default::default()
427 };
428
429 config_fn(&mut config);
430
431 let transport = StreamableHttpClientTransport::new(config);
432 let client = Self::new(transport);
433
434 client.initialize().await?;
435
436 Ok(client)
437 }
438}
439
440// ============================================================================
441// TCP-Specific Convenience Constructors (Feature-Gated)
442// ============================================================================
443
444#[cfg(feature = "tcp")]
445impl Client<turbomcp_transport::tcp::TcpTransport> {
446 /// Connect to a TCP MCP server (convenience method)
447 ///
448 /// Beautiful one-liner for TCP connections - balanced DX.
449 ///
450 /// # Arguments
451 ///
452 /// * `addr` - Server address (e.g., "127.0.0.1:8765" or localhost:8765")
453 ///
454 /// # Returns
455 ///
456 /// Returns an initialized `Client` ready to use.
457 ///
458 /// # Examples
459 ///
460 /// ```rust,no_run
461 /// # #[cfg(feature = "tcp")]
462 /// use turbomcp_client::Client;
463 ///
464 /// # async fn example() -> turbomcp_protocol::Result<()> {
465 /// let client = Client::connect_tcp("127.0.0.1:8765").await?;
466 /// let tools = client.list_tools().await?;
467 /// # Ok(())
468 /// # }
469 /// ```
470 pub async fn connect_tcp(addr: impl AsRef<str>) -> Result<Self> {
471 use std::net::SocketAddr;
472 use turbomcp_transport::tcp::TcpTransport;
473
474 let server_addr: SocketAddr = addr
475 .as_ref()
476 .parse()
477 .map_err(|e| Error::bad_request(format!("Invalid address: {}", e)))?;
478
479 // Client binds to 0.0.0.0:0 (any available port)
480 let bind_addr: SocketAddr = if server_addr.is_ipv6() {
481 "[::]:0".parse().unwrap()
482 } else {
483 "0.0.0.0:0".parse().unwrap()
484 };
485
486 let transport = TcpTransport::new_client(bind_addr, server_addr);
487 let client = Self::new(transport);
488
489 client.initialize().await?;
490
491 Ok(client)
492 }
493}
494
495// ============================================================================
496// Unix Socket-Specific Convenience Constructors (Feature-Gated)
497// ============================================================================
498
499#[cfg(all(unix, feature = "unix"))]
500impl Client<turbomcp_transport::unix::UnixTransport> {
501 /// Connect to a Unix socket MCP server (convenience method)
502 ///
503 /// Beautiful one-liner for Unix socket IPC - balanced DX.
504 ///
505 /// # Arguments
506 ///
507 /// * `path` - Socket file path (e.g., "/tmp/mcp.sock")
508 ///
509 /// # Returns
510 ///
511 /// Returns an initialized `Client` ready to use.
512 ///
513 /// # Examples
514 ///
515 /// ```rust,no_run
516 /// # #[cfg(all(unix, feature = "unix"))]
517 /// use turbomcp_client::Client;
518 ///
519 /// # async fn example() -> turbomcp_protocol::Result<()> {
520 /// let client = Client::connect_unix("/tmp/mcp.sock").await?;
521 /// let tools = client.list_tools().await?;
522 /// # Ok(())
523 /// # }
524 /// ```
525 pub async fn connect_unix(path: impl Into<std::path::PathBuf>) -> Result<Self> {
526 use turbomcp_transport::unix::UnixTransport;
527
528 let transport = UnixTransport::new_client(path.into());
529 let client = Self::new(transport);
530
531 client.initialize().await?;
532
533 Ok(client)
534 }
535}
536
537impl<T: Transport + 'static> Client<T> {
538 /// Register message handlers with the dispatcher
539 ///
540 /// This method sets up the callbacks that handle server-initiated requests
541 /// and notifications. The dispatcher's background task routes incoming
542 /// messages to these handlers.
543 ///
544 /// This is called automatically during Client construction (in `new()` and
545 /// `with_capabilities()`), so you don't need to call it manually.
546 ///
547 /// ## How It Works
548 ///
549 /// The handlers are synchronous closures that spawn async tasks to do the
550 /// actual work. This allows the dispatcher to continue routing messages
551 /// without blocking on handler execution.
552 fn register_dispatcher_handlers(&self) {
553 let dispatcher = self.inner.protocol.dispatcher();
554 let client_for_requests = self.clone();
555 let client_for_notifications = self.clone();
556
557 // Request handler (elicitation, sampling, etc.)
558 let request_handler = Arc::new(move |request: JsonRpcRequest| {
559 let client = client_for_requests.clone();
560 let method = request.method.clone();
561 let req_id = request.id.clone();
562 // Spawn async task to handle the request
563 tokio::spawn(async move {
564 tracing::debug!(
565 "🔄 [request_handler] Handling server-initiated request: method={}, id={:?}",
566 method,
567 req_id
568 );
569 if let Err(e) = client.handle_request(request).await {
570 tracing::error!(
571 "❌ [request_handler] Error handling server request '{}': {}",
572 method,
573 e
574 );
575 tracing::error!(" Request ID: {:?}", req_id);
576 tracing::error!(" Error kind: {:?}", e.kind);
577 } else {
578 tracing::debug!(
579 "✅ [request_handler] Successfully handled server request: method={}, id={:?}",
580 method,
581 req_id
582 );
583 }
584 });
585 Ok(())
586 });
587
588 // Notification handler
589 let notification_handler = Arc::new(move |notification: JsonRpcNotification| {
590 let client = client_for_notifications.clone();
591 // Spawn async task to handle the notification
592 tokio::spawn(async move {
593 if let Err(e) = client.handle_notification(notification).await {
594 tracing::error!("Error handling server notification: {}", e);
595 }
596 });
597 Ok(())
598 });
599
600 // Register handlers synchronously - no race condition!
601 // The set_* methods are now synchronous with std::sync::Mutex
602 dispatcher.set_request_handler(request_handler);
603 dispatcher.set_notification_handler(notification_handler);
604 tracing::debug!("Dispatcher handlers registered successfully");
605 }
606
607 /// Handle server-initiated requests (elicitation, sampling, roots)
608 ///
609 /// This method is called by the MessageDispatcher when it receives a request
610 /// from the server. It routes the request to the appropriate handler based on
611 /// the method name.
612 async fn handle_request(&self, request: JsonRpcRequest) -> Result<()> {
613 match request.method.as_str() {
614 "sampling/createMessage" => {
615 let handler_opt = self
616 .inner
617 .sampling_handler
618 .lock()
619 .expect("sampling_handler mutex poisoned")
620 .clone();
621 if let Some(handler) = handler_opt {
622 // Extract request ID for proper correlation
623 let request_id = match &request.id {
624 turbomcp_protocol::MessageId::String(s) => s.clone(),
625 turbomcp_protocol::MessageId::Number(n) => n.to_string(),
626 turbomcp_protocol::MessageId::Uuid(u) => u.to_string(),
627 };
628
629 let params: CreateMessageRequest =
630 serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
631 .map_err(|e| {
632 Error::protocol(format!("Invalid createMessage params: {}", e))
633 })?;
634
635 match handler.handle_create_message(request_id, params).await {
636 Ok(result) => {
637 let result_value = serde_json::to_value(result).map_err(|e| {
638 Error::protocol(format!("Failed to serialize response: {}", e))
639 })?;
640 let response = JsonRpcResponse::success(result_value, request.id);
641 self.send_response(response).await?;
642 }
643 Err(e) => {
644 tracing::warn!(
645 "⚠️ [handle_request] Sampling handler returned error: {}",
646 e
647 );
648
649 // Preserve error semantics by checking actual error type
650 // This allows proper error code propagation for retry logic
651 let (code, message) = if let Some(handler_err) =
652 e.downcast_ref::<HandlerError>()
653 {
654 // HandlerError has explicit JSON-RPC code mapping
655 let json_err = handler_err.into_jsonrpc_error();
656 tracing::info!(
657 "📋 [handle_request] HandlerError mapped to JSON-RPC code: {}",
658 json_err.code
659 );
660 (json_err.code, json_err.message)
661 } else if let Some(proto_err) =
662 e.downcast_ref::<turbomcp_protocol::Error>()
663 {
664 // Protocol errors have ErrorKind-based mapping
665 tracing::info!(
666 "📋 [handle_request] Protocol error mapped to code: {}",
667 proto_err.jsonrpc_error_code()
668 );
669 (proto_err.jsonrpc_error_code(), proto_err.to_string())
670 } else {
671 // Generic errors default to Internal (-32603)
672 // Log the error type for debugging (should rarely hit this path)
673 tracing::warn!(
674 "📋 [handle_request] Sampling handler returned unknown error type (not HandlerError or Protocol error): {}",
675 std::any::type_name_of_val(&*e)
676 );
677 (-32603, format!("Sampling handler error: {}", e))
678 };
679
680 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
681 code,
682 message,
683 data: None,
684 };
685 let response =
686 JsonRpcResponse::error_response(error, request.id.clone());
687
688 tracing::info!(
689 "🔄 [handle_request] Attempting to send error response for request: {:?}",
690 request.id
691 );
692 self.send_response(response).await?;
693 tracing::info!(
694 "✅ [handle_request] Error response sent successfully for request: {:?}",
695 request.id
696 );
697 }
698 }
699 } else {
700 // No handler configured
701 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
702 code: -32601,
703 message: "Sampling not supported".to_string(),
704 data: None,
705 };
706 let response = JsonRpcResponse::error_response(error, request.id);
707 self.send_response(response).await?;
708 }
709 }
710 "roots/list" => {
711 // Handle roots/list request from server
712 // Clone the handler Arc to avoid holding mutex across await
713 let handler_opt = self
714 .inner
715 .handlers
716 .lock()
717 .expect("handlers mutex poisoned")
718 .roots
719 .clone();
720
721 let roots_result = if let Some(handler) = handler_opt {
722 handler.handle_roots_request().await
723 } else {
724 // No handler - return empty list per MCP spec
725 Ok(Vec::new())
726 };
727
728 match roots_result {
729 Ok(roots) => {
730 let result_value =
731 serde_json::to_value(turbomcp_protocol::types::ListRootsResult {
732 roots,
733 _meta: None,
734 })
735 .map_err(|e| {
736 Error::protocol(format!(
737 "Failed to serialize roots response: {}",
738 e
739 ))
740 })?;
741 let response = JsonRpcResponse::success(result_value, request.id);
742 self.send_response(response).await?;
743 }
744 Err(e) => {
745 // FIXED: Extract error code from HandlerError instead of hardcoding -32603
746 // Roots handler returns HandlerResult<Vec<Root>>, so error is HandlerError
747 let json_err = e.into_jsonrpc_error();
748 let response = JsonRpcResponse::error_response(json_err, request.id);
749 self.send_response(response).await?;
750 }
751 }
752 }
753 "elicitation/create" => {
754 // Clone handler Arc before await to avoid holding mutex across await
755 let handler_opt = self
756 .inner
757 .handlers
758 .lock()
759 .expect("handlers mutex poisoned")
760 .elicitation
761 .clone();
762 if let Some(handler) = handler_opt {
763 // Parse elicitation request params as MCP protocol type
764 let proto_request: turbomcp_protocol::types::ElicitRequest =
765 serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
766 .map_err(|e| {
767 Error::protocol(format!("Invalid elicitation params: {}", e))
768 })?;
769
770 // Wrap protocol request with ID for handler (preserves type safety!)
771 let handler_request =
772 crate::handlers::ElicitationRequest::new(request.id.clone(), proto_request);
773
774 // Call the registered elicitation handler
775 match handler.handle_elicitation(handler_request).await {
776 Ok(elicit_response) => {
777 // Convert handler response back to protocol type
778 let proto_result = elicit_response.into_protocol();
779 let result_value = serde_json::to_value(proto_result).map_err(|e| {
780 Error::protocol(format!(
781 "Failed to serialize elicitation response: {}",
782 e
783 ))
784 })?;
785 let response = JsonRpcResponse::success(result_value, request.id);
786 self.send_response(response).await?;
787 }
788 Err(e) => {
789 // Convert handler error to JSON-RPC error using centralized mapping
790 let response =
791 JsonRpcResponse::error_response(e.into_jsonrpc_error(), request.id);
792 self.send_response(response).await?;
793 }
794 }
795 } else {
796 // No handler configured - elicitation not supported
797 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
798 code: -32601,
799 message: "Elicitation not supported - no handler registered".to_string(),
800 data: None,
801 };
802 let response = JsonRpcResponse::error_response(error, request.id);
803 self.send_response(response).await?;
804 }
805 }
806 _ => {
807 // Unknown method
808 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
809 code: -32601,
810 message: format!("Method not found: {}", request.method),
811 data: None,
812 };
813 let response = JsonRpcResponse::error_response(error, request.id);
814 self.send_response(response).await?;
815 }
816 }
817 Ok(())
818 }
819
820 /// Handle server-initiated notifications
821 ///
822 /// Routes notifications to appropriate handlers based on method name.
823 /// MCP defines several notification types that servers can send to clients:
824 ///
825 /// - `notifications/progress` - Progress updates for long-running operations
826 /// - `notifications/message` - Log messages from server
827 /// - `notifications/resources/updated` - Resource content changed
828 /// - `notifications/resources/list_changed` - Resource list changed
829 async fn handle_notification(&self, notification: JsonRpcNotification) -> Result<()> {
830 match notification.method.as_str() {
831 "notifications/progress" => {
832 // Progress tracking has been removed
833 tracing::debug!(
834 "Received progress notification - progress tracking has been removed"
835 );
836 }
837
838 "notifications/message" => {
839 // Route to log handler
840 let handler_opt = self
841 .inner
842 .handlers
843 .lock()
844 .expect("handlers mutex poisoned")
845 .get_log_handler();
846
847 if let Some(handler) = handler_opt {
848 // Parse log message
849 let log: crate::handlers::LoggingNotification = serde_json::from_value(
850 notification.params.unwrap_or(serde_json::Value::Null),
851 )
852 .map_err(|e| Error::protocol(format!("Invalid log notification: {}", e)))?;
853
854 // Call handler
855 if let Err(e) = handler.handle_log(log).await {
856 tracing::error!("Log handler error: {}", e);
857 }
858 } else {
859 tracing::debug!("Received log notification but no handler registered");
860 }
861 }
862
863 "notifications/resources/updated" => {
864 // Route to resource update handler
865 let handler_opt = self
866 .inner
867 .handlers
868 .lock()
869 .expect("handlers mutex poisoned")
870 .get_resource_update_handler();
871
872 if let Some(handler) = handler_opt {
873 // Parse resource update notification
874 let update: crate::handlers::ResourceUpdatedNotification =
875 serde_json::from_value(
876 notification.params.unwrap_or(serde_json::Value::Null),
877 )
878 .map_err(|e| {
879 Error::protocol(format!("Invalid resource update notification: {}", e))
880 })?;
881
882 // Call handler
883 if let Err(e) = handler.handle_resource_update(update).await {
884 tracing::error!("Resource update handler error: {}", e);
885 }
886 } else {
887 tracing::debug!(
888 "Received resource update notification but no handler registered"
889 );
890 }
891 }
892
893 "notifications/resources/list_changed" => {
894 // Route to resource list changed handler
895 let handler_opt = self
896 .inner
897 .handlers
898 .lock()
899 .expect("handlers mutex poisoned")
900 .get_resource_list_changed_handler();
901
902 if let Some(handler) = handler_opt {
903 if let Err(e) = handler.handle_resource_list_changed().await {
904 tracing::error!("Resource list changed handler error: {}", e);
905 }
906 } else {
907 tracing::debug!(
908 "Resource list changed notification received (no handler registered)"
909 );
910 }
911 }
912
913 "notifications/prompts/list_changed" => {
914 // Route to prompt list changed handler
915 let handler_opt = self
916 .inner
917 .handlers
918 .lock()
919 .expect("handlers mutex poisoned")
920 .get_prompt_list_changed_handler();
921
922 if let Some(handler) = handler_opt {
923 if let Err(e) = handler.handle_prompt_list_changed().await {
924 tracing::error!("Prompt list changed handler error: {}", e);
925 }
926 } else {
927 tracing::debug!(
928 "Prompt list changed notification received (no handler registered)"
929 );
930 }
931 }
932
933 "notifications/tools/list_changed" => {
934 // Route to tool list changed handler
935 let handler_opt = self
936 .inner
937 .handlers
938 .lock()
939 .expect("handlers mutex poisoned")
940 .get_tool_list_changed_handler();
941
942 if let Some(handler) = handler_opt {
943 if let Err(e) = handler.handle_tool_list_changed().await {
944 tracing::error!("Tool list changed handler error: {}", e);
945 }
946 } else {
947 tracing::debug!(
948 "Tool list changed notification received (no handler registered)"
949 );
950 }
951 }
952
953 "notifications/cancelled" => {
954 // Route to cancellation handler
955 let handler_opt = self
956 .inner
957 .handlers
958 .lock()
959 .expect("handlers mutex poisoned")
960 .get_cancellation_handler();
961
962 if let Some(handler) = handler_opt {
963 // Parse cancellation notification
964 let cancellation: crate::handlers::CancelledNotification =
965 serde_json::from_value(
966 notification.params.unwrap_or(serde_json::Value::Null),
967 )
968 .map_err(|e| {
969 Error::protocol(format!("Invalid cancellation notification: {}", e))
970 })?;
971
972 // Call handler
973 if let Err(e) = handler.handle_cancellation(cancellation).await {
974 tracing::error!("Cancellation handler error: {}", e);
975 }
976 } else {
977 tracing::debug!("Cancellation notification received (no handler registered)");
978 }
979 }
980
981 _ => {
982 // Unknown notification type
983 tracing::debug!("Received unknown notification: {}", notification.method);
984 }
985 }
986
987 Ok(())
988 }
989
990 async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
991 tracing::info!(
992 "📤 [send_response] Sending JSON-RPC response: id={:?}",
993 response.id
994 );
995
996 let payload = serde_json::to_vec(&response).map_err(|e| {
997 tracing::error!("❌ [send_response] Failed to serialize response: {}", e);
998 Error::protocol(format!("Failed to serialize response: {}", e))
999 })?;
1000
1001 tracing::debug!(
1002 "📤 [send_response] Response payload: {} bytes",
1003 payload.len()
1004 );
1005 tracing::debug!(
1006 "📤 [send_response] Response JSON: {}",
1007 String::from_utf8_lossy(&payload)
1008 );
1009
1010 let message = TransportMessage::new(
1011 turbomcp_protocol::MessageId::from("response".to_string()),
1012 payload.into(),
1013 );
1014
1015 self.inner
1016 .protocol
1017 .transport()
1018 .send(message)
1019 .await
1020 .map_err(|e| {
1021 tracing::error!("❌ [send_response] Transport send failed: {}", e);
1022 Error::transport(format!("Failed to send response: {}", e))
1023 })?;
1024
1025 tracing::info!(
1026 "✅ [send_response] Response sent successfully: id={:?}",
1027 response.id
1028 );
1029 Ok(())
1030 }
1031
1032 /// Initialize the connection with the MCP server
1033 ///
1034 /// Performs the initialization handshake with the server, negotiating capabilities
1035 /// and establishing the protocol version. This method must be called before
1036 /// any other operations can be performed.
1037 ///
1038 /// # Returns
1039 ///
1040 /// Returns an `InitializeResult` containing server information and negotiated capabilities.
1041 ///
1042 /// # Errors
1043 ///
1044 /// Returns an error if:
1045 /// - The transport connection fails
1046 /// - The server rejects the initialization request
1047 /// - Protocol negotiation fails
1048 ///
1049 /// # Examples
1050 ///
1051 /// ```rust,no_run
1052 /// # use turbomcp_client::Client;
1053 /// # use turbomcp_transport::stdio::StdioTransport;
1054 /// # async fn example() -> turbomcp_protocol::Result<()> {
1055 /// let mut client = Client::new(StdioTransport::new());
1056 ///
1057 /// let result = client.initialize().await?;
1058 /// println!("Server: {} v{}", result.server_info.name, result.server_info.version);
1059 /// # Ok(())
1060 /// # }
1061 /// ```
1062 pub async fn initialize(&self) -> Result<InitializeResult> {
1063 // Auto-connect transport if not already connected
1064 // This provides consistent DX across all transports (Stdio, TCP, HTTP, WebSocket, Unix)
1065 let transport = self.inner.protocol.transport();
1066 let transport_state = transport.state().await;
1067 if !matches!(
1068 transport_state,
1069 turbomcp_transport::TransportState::Connected
1070 ) {
1071 tracing::debug!(
1072 "Auto-connecting transport (current state: {:?})",
1073 transport_state
1074 );
1075 transport
1076 .connect()
1077 .await
1078 .map_err(|e| Error::transport(format!("Failed to connect transport: {}", e)))?;
1079 tracing::info!("Transport connected successfully");
1080 }
1081
1082 // Build client capabilities based on registered handlers (automatic detection)
1083 let mut client_caps = ProtocolClientCapabilities::default();
1084
1085 // Detect sampling capability from handler
1086 if let Some(sampling_caps) = self.get_sampling_capabilities() {
1087 client_caps.sampling = Some(sampling_caps);
1088 }
1089
1090 // Detect elicitation capability from handler
1091 if let Some(elicitation_caps) = self.get_elicitation_capabilities() {
1092 client_caps.elicitation = Some(elicitation_caps);
1093 }
1094
1095 // Detect roots capability from handler
1096 if let Some(roots_caps) = self.get_roots_capabilities() {
1097 client_caps.roots = Some(roots_caps);
1098 }
1099
1100 // Send MCP initialization request
1101 let request = InitializeRequest {
1102 protocol_version: PROTOCOL_VERSION.to_string(),
1103 capabilities: client_caps,
1104 client_info: turbomcp_protocol::types::Implementation {
1105 name: "turbomcp-client".to_string(),
1106 version: env!("CARGO_PKG_VERSION").to_string(),
1107 title: Some("TurboMCP Client".to_string()),
1108 },
1109 _meta: None,
1110 };
1111
1112 let protocol_response: ProtocolInitializeResult = self
1113 .inner
1114 .protocol
1115 .request("initialize", Some(serde_json::to_value(request)?))
1116 .await?;
1117
1118 // AtomicBool: lock-free store with Ordering::Relaxed
1119 self.inner.initialized.store(true, Ordering::Relaxed);
1120
1121 // Send initialized notification
1122 self.inner
1123 .protocol
1124 .notify("notifications/initialized", None)
1125 .await?;
1126
1127 // Convert protocol response to client response type
1128 Ok(InitializeResult {
1129 server_info: protocol_response.server_info,
1130 server_capabilities: protocol_response.capabilities,
1131 })
1132 }
1133
1134 /// Execute a protocol method with plugin middleware
1135 ///
1136 /// This is a generic helper for wrapping protocol calls with plugin middleware.
1137 pub(crate) async fn execute_with_plugins<R>(
1138 &self,
1139 method_name: &str,
1140 params: Option<serde_json::Value>,
1141 ) -> Result<R>
1142 where
1143 R: serde::de::DeserializeOwned + serde::Serialize + Clone,
1144 {
1145 // Create JSON-RPC request for plugin context
1146 let json_rpc_request = turbomcp_protocol::jsonrpc::JsonRpcRequest {
1147 jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
1148 id: turbomcp_protocol::MessageId::Number(1),
1149 method: method_name.to_string(),
1150 params: params.clone(),
1151 };
1152
1153 // 1. Create request context for plugins
1154 let mut req_ctx =
1155 crate::plugins::RequestContext::new(json_rpc_request, std::collections::HashMap::new());
1156
1157 // 2. Execute before_request plugin middleware
1158 if let Err(e) = self
1159 .inner
1160 .plugin_registry
1161 .lock()
1162 .await
1163 .execute_before_request(&mut req_ctx)
1164 .await
1165 {
1166 return Err(Error::bad_request(format!(
1167 "Plugin before_request failed: {}",
1168 e
1169 )));
1170 }
1171
1172 // 3. Execute the actual protocol call
1173 let start_time = std::time::Instant::now();
1174 let protocol_result: Result<R> = self
1175 .inner
1176 .protocol
1177 .request(method_name, req_ctx.params().cloned())
1178 .await;
1179 let duration = start_time.elapsed();
1180
1181 // 4. Prepare response context
1182 let mut resp_ctx = match protocol_result {
1183 Ok(ref response) => {
1184 let response_value = serde_json::to_value(response.clone())?;
1185 crate::plugins::ResponseContext::new(req_ctx, Some(response_value), None, duration)
1186 }
1187 Err(ref e) => {
1188 crate::plugins::ResponseContext::new(req_ctx, None, Some(*e.clone()), duration)
1189 }
1190 };
1191
1192 // 5. Execute after_response plugin middleware
1193 if let Err(e) = self
1194 .inner
1195 .plugin_registry
1196 .lock()
1197 .await
1198 .execute_after_response(&mut resp_ctx)
1199 .await
1200 {
1201 return Err(Error::bad_request(format!(
1202 "Plugin after_response failed: {}",
1203 e
1204 )));
1205 }
1206
1207 // 6. Return the final result, checking for plugin modifications
1208 match protocol_result {
1209 Ok(ref response) => {
1210 // Check if plugins modified the response
1211 if let Some(modified_response) = resp_ctx.response {
1212 // Try to deserialize the modified response
1213 if let Ok(modified_result) =
1214 serde_json::from_value::<R>(modified_response.clone())
1215 {
1216 return Ok(modified_result);
1217 }
1218 }
1219
1220 // No plugin modifications, use original response
1221 Ok(response.clone())
1222 }
1223 Err(e) => {
1224 // Check if plugins provided an error recovery response
1225 if let Some(recovery_response) = resp_ctx.response {
1226 if let Ok(recovery_result) = serde_json::from_value::<R>(recovery_response) {
1227 Ok(recovery_result)
1228 } else {
1229 Err(e)
1230 }
1231 } else {
1232 Err(e)
1233 }
1234 }
1235 }
1236 }
1237
1238 /// Subscribe to resource change notifications
1239 ///
1240 /// Registers interest in receiving notifications when the specified
1241 /// resource changes. The server will send notifications when the
1242 /// resource is modified, created, or deleted.
1243 ///
1244 /// # Arguments
1245 ///
1246 /// * `uri` - The URI of the resource to monitor
1247 ///
1248 /// # Returns
1249 ///
1250 /// Returns `EmptyResult` on successful subscription.
1251 ///
1252 /// # Errors
1253 ///
1254 /// Returns an error if:
1255 /// - The client is not initialized
1256 /// - The URI is invalid or empty
1257 /// - The server doesn't support subscriptions
1258 /// - The request fails
1259 ///
1260 /// # Examples
1261 ///
1262 /// ```rust,no_run
1263 /// # use turbomcp_client::Client;
1264 /// # use turbomcp_transport::stdio::StdioTransport;
1265 /// # async fn example() -> turbomcp_protocol::Result<()> {
1266 /// let mut client = Client::new(StdioTransport::new());
1267 /// client.initialize().await?;
1268 ///
1269 /// // Subscribe to file changes
1270 /// client.subscribe("file:///watch/directory").await?;
1271 /// println!("Subscribed to resource changes");
1272 /// # Ok(())
1273 /// # }
1274 /// ```
1275 pub async fn subscribe(&self, uri: &str) -> Result<EmptyResult> {
1276 if !self.inner.initialized.load(Ordering::Relaxed) {
1277 return Err(Error::bad_request("Client not initialized"));
1278 }
1279
1280 if uri.is_empty() {
1281 return Err(Error::bad_request("Subscription URI cannot be empty"));
1282 }
1283
1284 // Send resources/subscribe request with plugin middleware
1285 let request = SubscribeRequest {
1286 uri: uri.to_string(),
1287 };
1288
1289 self.execute_with_plugins(
1290 "resources/subscribe",
1291 Some(serde_json::to_value(request).map_err(|e| {
1292 Error::protocol(format!("Failed to serialize subscribe request: {}", e))
1293 })?),
1294 )
1295 .await
1296 }
1297
1298 /// Unsubscribe from resource change notifications
1299 ///
1300 /// Cancels a previous subscription to resource changes. After unsubscribing,
1301 /// the client will no longer receive notifications for the specified resource.
1302 ///
1303 /// # Arguments
1304 ///
1305 /// * `uri` - The URI of the resource to stop monitoring
1306 ///
1307 /// # Returns
1308 ///
1309 /// Returns `EmptyResult` on successful unsubscription.
1310 ///
1311 /// # Errors
1312 ///
1313 /// Returns an error if:
1314 /// - The client is not initialized
1315 /// - The URI is invalid or empty
1316 /// - No active subscription exists for the URI
1317 /// - The request fails
1318 ///
1319 /// # Examples
1320 ///
1321 /// ```rust,no_run
1322 /// # use turbomcp_client::Client;
1323 /// # use turbomcp_transport::stdio::StdioTransport;
1324 /// # async fn example() -> turbomcp_protocol::Result<()> {
1325 /// let mut client = Client::new(StdioTransport::new());
1326 /// client.initialize().await?;
1327 ///
1328 /// // Unsubscribe from file changes
1329 /// client.unsubscribe("file:///watch/directory").await?;
1330 /// println!("Unsubscribed from resource changes");
1331 /// # Ok(())
1332 /// # }
1333 /// ```
1334 pub async fn unsubscribe(&self, uri: &str) -> Result<EmptyResult> {
1335 if !self.inner.initialized.load(Ordering::Relaxed) {
1336 return Err(Error::bad_request("Client not initialized"));
1337 }
1338
1339 if uri.is_empty() {
1340 return Err(Error::bad_request("Unsubscription URI cannot be empty"));
1341 }
1342
1343 // Send resources/unsubscribe request with plugin middleware
1344 let request = UnsubscribeRequest {
1345 uri: uri.to_string(),
1346 };
1347
1348 self.execute_with_plugins(
1349 "resources/unsubscribe",
1350 Some(serde_json::to_value(request).map_err(|e| {
1351 Error::protocol(format!("Failed to serialize unsubscribe request: {}", e))
1352 })?),
1353 )
1354 .await
1355 }
1356
1357 /// Get the client's capabilities configuration
1358 #[must_use]
1359 pub fn capabilities(&self) -> &ClientCapabilities {
1360 &self.inner.capabilities
1361 }
1362
1363 /// Initialize all registered plugins
1364 ///
1365 /// This should be called after registration but before using the client.
1366 pub async fn initialize_plugins(&self) -> Result<()> {
1367 // Set up client context for plugins with actual client capabilities
1368 let mut capabilities = std::collections::HashMap::new();
1369 capabilities.insert(
1370 "protocol_version".to_string(),
1371 serde_json::json!("2024-11-05"),
1372 );
1373 capabilities.insert(
1374 "mcp_version".to_string(),
1375 serde_json::json!(env!("CARGO_PKG_VERSION")),
1376 );
1377 capabilities.insert(
1378 "supports_notifications".to_string(),
1379 serde_json::json!(true),
1380 );
1381 capabilities.insert(
1382 "supports_sampling".to_string(),
1383 serde_json::json!(self.has_sampling_handler()),
1384 );
1385 capabilities.insert("supports_progress".to_string(), serde_json::json!(true));
1386 capabilities.insert("supports_roots".to_string(), serde_json::json!(true));
1387
1388 // Extract client configuration
1389 let mut config = std::collections::HashMap::new();
1390 config.insert(
1391 "client_name".to_string(),
1392 serde_json::json!("turbomcp-client"),
1393 );
1394 config.insert(
1395 "initialized".to_string(),
1396 serde_json::json!(self.inner.initialized.load(Ordering::Relaxed)),
1397 );
1398 config.insert(
1399 "plugin_count".to_string(),
1400 serde_json::json!(self.inner.plugin_registry.lock().await.plugin_count()),
1401 );
1402
1403 let context = crate::plugins::PluginContext::new(
1404 "turbomcp-client".to_string(),
1405 env!("CARGO_PKG_VERSION").to_string(),
1406 capabilities,
1407 config,
1408 vec![], // Will be populated by the registry
1409 );
1410
1411 self.inner
1412 .plugin_registry
1413 .lock()
1414 .await
1415 .set_client_context(context);
1416
1417 // Note: Individual plugins are initialized automatically during registration
1418 // via PluginRegistry::register_plugin(). This method ensures the registry
1419 // has proper client context for any future plugin registrations.
1420 Ok(())
1421 }
1422
1423 /// Cleanup all registered plugins
1424 ///
1425 /// This should be called when the client is being shut down.
1426 pub async fn cleanup_plugins(&self) -> Result<()> {
1427 // Clear the plugin registry - plugins will be dropped and cleaned up automatically
1428 // The Rust ownership system ensures proper cleanup when the Arc<dyn ClientPlugin>
1429 // references are dropped.
1430
1431 // Note: The plugin system uses RAII (Resource Acquisition Is Initialization)
1432 // pattern where plugins clean up their resources in their Drop implementation.
1433 // Replace the registry with a fresh one (mutex ensures safe access)
1434 *self.inner.plugin_registry.lock().await = crate::plugins::PluginRegistry::new();
1435 Ok(())
1436 }
1437
1438 // Note: Capability detection methods (has_*_handler, get_*_capabilities)
1439 // are defined in their respective operation modules:
1440 // - sampling.rs: has_sampling_handler, get_sampling_capabilities
1441 // - handlers.rs: has_elicitation_handler, has_roots_handler
1442 //
1443 // Additional capability getters for elicitation and roots added below
1444 // since they're used during initialization
1445
1446 /// Get elicitation capabilities if handler is registered
1447 /// Automatically detects capability based on registered handler
1448 fn get_elicitation_capabilities(
1449 &self,
1450 ) -> Option<turbomcp_protocol::types::ElicitationCapabilities> {
1451 if self.has_elicitation_handler() {
1452 // Currently returns default capabilities. In the future, schema_validation support
1453 // could be detected from handler traits by adding a HasSchemaValidation marker trait
1454 // that handlers could implement. For now, handlers validate schemas themselves.
1455 Some(turbomcp_protocol::types::ElicitationCapabilities::default())
1456 } else {
1457 None
1458 }
1459 }
1460
1461 /// Get roots capabilities if handler is registered
1462 fn get_roots_capabilities(&self) -> Option<turbomcp_protocol::types::RootsCapabilities> {
1463 if self.has_roots_handler() {
1464 // Roots capabilities indicate whether list can change
1465 Some(turbomcp_protocol::types::RootsCapabilities {
1466 list_changed: Some(true), // Support dynamic roots by default
1467 })
1468 } else {
1469 None
1470 }
1471 }
1472}