Skip to main content

turbomcp_client/client/
core.rs

1//! Core Client implementation for MCP communication
2//!
3//! This module contains the main `Client<T>` struct and its implementation,
4//! providing the core MCP client functionality including:
5//!
6//! - Connection initialization and lifecycle management
7//! - Message processing and bidirectional communication
8//! - MCP operation support (tools, prompts, resources, sampling, etc.)
9//! - Plugin middleware integration
10//! - Handler registration and management
11//!
12//! # Architecture
13//!
14//! `Client<T>` is implemented as a cheaply-cloneable Arc wrapper with interior
15//! mutability (same pattern as reqwest and AWS SDK):
16//!
17//! - **AtomicBool** for initialized flag (lock-free)
18//! - **Arc<Mutex<...>>** for handlers/plugins (infrequent mutation)
19//! - **`Arc<ClientInner<T>>`** for cheap cloning
20
21use parking_lot::Mutex;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, Ordering};
24
25use tokio::sync::Semaphore;
26
27use turbomcp_protocol::jsonrpc::*;
28#[cfg(feature = "experimental-tasks")]
29use turbomcp_protocol::types::tasks::*;
30use turbomcp_protocol::types::{
31    ClientCapabilities as ProtocolClientCapabilities, InitializeResult as ProtocolInitializeResult,
32    *,
33};
34use turbomcp_protocol::{Error, PROTOCOL_VERSION, Result};
35use turbomcp_transport::{Transport, TransportConfig, TransportMessage};
36
37use super::config::InitializeResult;
38use super::protocol::ProtocolClient;
39use crate::{
40    ClientCapabilities,
41    handlers::{HandlerError, HandlerRegistry},
42    sampling::SamplingHandler,
43};
44
45/// Inner client state with interior mutability
46///
47/// This structure contains the actual client state and is wrapped in Arc<...>
48/// to enable cheap cloning (same pattern as reqwest and AWS SDK).
49pub(super) struct ClientInner<T: Transport + 'static> {
50    /// Protocol client for low-level communication
51    pub(super) protocol: ProtocolClient<T>,
52
53    /// Client capabilities (immutable after construction)
54    pub(super) capabilities: ClientCapabilities,
55
56    /// Initialization state (lock-free atomic boolean)
57    pub(super) initialized: AtomicBool,
58
59    /// Tracks whether graceful shutdown has already been requested.
60    pub(super) shutdown_requested: AtomicBool,
61
62    /// Optional sampling handler (mutex for dynamic updates)
63    pub(super) sampling_handler: Arc<Mutex<Option<Arc<dyn SamplingHandler>>>>,
64
65    /// Handler registry for bidirectional communication (mutex for registration)
66    pub(super) handlers: Arc<Mutex<HandlerRegistry>>,
67
68    /// ✅ Semaphore for bounded concurrency of request/notification handlers
69    /// Limits concurrent server-initiated request handlers to prevent resource exhaustion
70    pub(super) handler_semaphore: Arc<Semaphore>,
71}
72
73/// The core MCP client implementation
74///
75/// Client provides a comprehensive interface for communicating with MCP servers,
76/// supporting all protocol features including tools, prompts, resources, sampling,
77/// elicitation, and bidirectional communication patterns.
78///
79/// # Clone Pattern
80///
81/// `Client<T>` is cheaply cloneable via Arc (same pattern as reqwest and AWS SDK).
82/// All clones share the same underlying connection and state:
83///
84/// ```rust,no_run
85/// use turbomcp_client::Client;
86/// use turbomcp_transport::stdio::StdioTransport;
87///
88/// # async fn example() -> turbomcp_protocol::Result<()> {
89/// let client = Client::new(StdioTransport::new());
90/// client.initialize().await?;
91///
92/// // Cheap clone - shares same connection
93/// let client2 = client.clone();
94/// tokio::spawn(async move {
95///     client2.list_tools().await.ok();
96/// });
97/// # Ok(())
98/// # }
99/// ```
100///
101/// The client must be initialized before use by calling `initialize()` to perform
102/// the MCP handshake and capability negotiation.
103///
104/// # Features
105///
106/// - **Protocol Compliance**: Full current MCP specification support
107/// - **Bidirectional Communication**: Server-initiated requests and client responses
108/// - **Plugin Middleware**: Extensible request/response processing
109/// - **Handler Registry**: Callbacks for server-initiated operations
110/// - **Connection Management**: Robust error handling and recovery
111/// - **Type Safety**: Compile-time guarantees for MCP message types
112/// - **Cheap Cloning**: Arc-based sharing like reqwest/AWS SDK
113///
114/// # Examples
115///
116/// ```rust,no_run
117/// use turbomcp_client::Client;
118/// use turbomcp_transport::stdio::StdioTransport;
119/// use std::collections::HashMap;
120///
121/// # async fn example() -> turbomcp_protocol::Result<()> {
122/// // Create and initialize client (no mut needed!)
123/// let client = Client::new(StdioTransport::new());
124/// let init_result = client.initialize().await?;
125/// println!("Connected to: {}", init_result.server_info.name);
126///
127/// // Use MCP operations
128/// let tools = client.list_tools().await?;
129/// let mut args = HashMap::new();
130/// args.insert("input".to_string(), serde_json::json!("test"));
131/// let result = client.call_tool("my_tool", Some(args), None).await?;
132/// # Ok(())
133/// # }
134/// ```
135pub struct Client<T: Transport + 'static> {
136    pub(super) inner: Arc<ClientInner<T>>,
137}
138
139/// Clone implementation via Arc (same pattern as reqwest/AWS SDK)
140///
141/// Cloning a Client is cheap (just an Arc clone) and all clones share
142/// the same underlying connection and state.
143impl<T: Transport + 'static> Clone for Client<T> {
144    fn clone(&self) -> Self {
145        Self {
146            inner: Arc::clone(&self.inner),
147        }
148    }
149}
150
151impl<T: Transport + 'static> Drop for ClientInner<T> {
152    fn drop(&mut self) {
153        // Best-effort cleanup if shutdown() wasn't called
154        // This is a safety net, but applications SHOULD call shutdown() explicitly
155
156        // Single clear warning
157        if !self.shutdown_requested.load(Ordering::Relaxed) {
158            tracing::warn!(
159                "MCP Client dropped without explicit shutdown(). \
160                 Call client.shutdown().await before dropping for clean resource cleanup. \
161                 Background tasks (WebSocket reconnection) may continue running."
162            );
163        }
164
165        // We can shutdown the dispatcher (it's synchronous)
166        self.protocol.dispatcher().shutdown();
167    }
168}
169
170impl<T: Transport + 'static> Client<T> {
171    /// Create a new client with the specified transport
172    ///
173    /// Creates a new MCP client instance with default capabilities.
174    /// The client must be initialized before use by calling `initialize()`.
175    ///
176    /// # Arguments
177    ///
178    /// * `transport` - The transport implementation to use for communication
179    ///
180    /// # Examples
181    ///
182    /// ```rust,no_run
183    /// use turbomcp_client::Client;
184    /// use turbomcp_transport::stdio::StdioTransport;
185    ///
186    /// let transport = StdioTransport::new();
187    /// let client = Client::new(transport);
188    /// ```
189    pub fn new(transport: T) -> Self {
190        Self::new_with_config(transport, TransportConfig::default())
191    }
192
193    /// Create a new client with an explicit transport configuration.
194    pub fn new_with_config(transport: T, config: TransportConfig) -> Self {
195        let capabilities = ClientCapabilities::default();
196        let client = Self {
197            inner: Arc::new(ClientInner {
198                protocol: ProtocolClient::with_config(transport, config),
199                capabilities: capabilities.clone(),
200                initialized: AtomicBool::new(false),
201                shutdown_requested: AtomicBool::new(false),
202                sampling_handler: Arc::new(Mutex::new(None)),
203                handlers: Arc::new(Mutex::new(HandlerRegistry::new())),
204                handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), // ✅ Configurable concurrent handlers
205            }),
206        };
207
208        // Register dispatcher handlers for bidirectional communication
209        client.register_dispatcher_handlers();
210
211        client
212    }
213
214    /// Create a new client with custom capabilities
215    ///
216    /// # Arguments
217    ///
218    /// * `transport` - The transport implementation to use
219    /// * `capabilities` - The client capabilities to negotiate
220    ///
221    /// # Examples
222    ///
223    /// ```rust,no_run
224    /// use turbomcp_client::{Client, ClientCapabilities};
225    /// use turbomcp_transport::stdio::StdioTransport;
226    ///
227    /// let capabilities = ClientCapabilities {
228    ///     tools: true,
229    ///     prompts: true,
230    ///     resources: false,
231    ///     sampling: false,
232    ///     max_concurrent_handlers: 100,
233    /// };
234    ///
235    /// let transport = StdioTransport::new();
236    /// let client = Client::with_capabilities(transport, capabilities);
237    /// ```
238    pub fn with_capabilities(transport: T, capabilities: ClientCapabilities) -> Self {
239        Self::with_capabilities_and_config(transport, capabilities, TransportConfig::default())
240    }
241
242    /// Create a new client with custom capabilities and transport configuration.
243    pub fn with_capabilities_and_config(
244        transport: T,
245        capabilities: ClientCapabilities,
246        config: TransportConfig,
247    ) -> Self {
248        let client = Self {
249            inner: Arc::new(ClientInner {
250                protocol: ProtocolClient::with_config(transport, config),
251                capabilities: capabilities.clone(),
252                initialized: AtomicBool::new(false),
253                shutdown_requested: AtomicBool::new(false),
254                sampling_handler: Arc::new(Mutex::new(None)),
255                handlers: Arc::new(Mutex::new(HandlerRegistry::new())),
256                handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), // ✅ Configurable concurrent handlers
257            }),
258        };
259
260        // Register dispatcher handlers for bidirectional communication
261        client.register_dispatcher_handlers();
262
263        client
264    }
265
266    /// Shutdown the client and clean up all resources
267    ///
268    /// This method performs a **graceful shutdown** of the MCP client by:
269    /// 1. Stopping the message dispatcher background task
270    /// 2. Disconnecting the transport (closes connection, stops background tasks)
271    ///
272    /// **CRITICAL**: After calling `shutdown()`, the client can no longer be used.
273    ///
274    /// # Why This Method Exists
275    ///
276    /// The `Drop` implementation cannot call async methods like `transport.disconnect()`,
277    /// which is required for proper cleanup of WebSocket connections and background tasks.
278    /// Without calling `shutdown()`, WebSocket reconnection tasks will continue running.
279    ///
280    /// # Best Practices
281    ///
282    /// - **Always call `shutdown()` before dropping** for clean resource cleanup
283    /// - For applications: call in signal handlers (`SIGINT`, `SIGTERM`)
284    /// - For tests: call in cleanup/teardown
285    /// - If forgotten, Drop will log warnings and do best-effort cleanup
286    ///
287    /// # Examples
288    ///
289    /// ```rust,no_run
290    /// # use turbomcp_client::Client;
291    /// # use turbomcp_transport::stdio::StdioTransport;
292    /// # async fn example() -> turbomcp_protocol::Result<()> {
293    /// let client = Client::new(StdioTransport::new());
294    /// client.initialize().await?;
295    ///
296    /// // Use client...
297    /// let _tools = client.list_tools().await?;
298    ///
299    /// // ✅ Clean shutdown
300    /// client.shutdown().await?;
301    /// # Ok(())
302    /// # }
303    /// ```
304    pub async fn shutdown(&self) -> Result<()> {
305        self.inner.shutdown_requested.store(true, Ordering::Relaxed);
306        tracing::info!("🛑 Shutting down MCP client");
307
308        // 1. Shutdown message dispatcher
309        self.inner.protocol.dispatcher().shutdown();
310        tracing::debug!("✅ Message dispatcher stopped");
311
312        // 2. Disconnect transport (WebSocket: stops reconnection, HTTP: closes connections)
313        match self.inner.protocol.transport().disconnect().await {
314            Ok(()) => {
315                tracing::info!("✅ Transport disconnected successfully");
316            }
317            Err(e) => {
318                tracing::warn!("Transport disconnect error (may already be closed): {}", e);
319            }
320        }
321
322        tracing::info!("✅ MCP client shutdown complete");
323        Ok(())
324    }
325}
326
327// ============================================================================
328// HTTP-Specific Convenience Constructors (Feature-Gated)
329// ============================================================================
330
331#[cfg(feature = "http")]
332impl Client<turbomcp_transport::streamable_http_client::StreamableHttpClientTransport> {
333    /// Connect to an HTTP MCP server (convenience method)
334    ///
335    /// This is a convenient one-liner alternative to manual configuration.
336    /// Creates an HTTP client, connects, and initializes in one call.
337    ///
338    /// # Arguments
339    ///
340    /// * `url` - The base URL of the MCP server (e.g., "http://localhost:8080")
341    ///
342    /// # Returns
343    ///
344    /// Returns an initialized `Client` ready to use.
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if:
349    /// - The URL is invalid
350    /// - Connection to the server fails
351    /// - Initialization handshake fails
352    ///
353    /// # Examples
354    ///
355    /// ```rust,no_run
356    /// use turbomcp_client::Client;
357    ///
358    /// # async fn example() -> turbomcp_protocol::Result<()> {
359    /// // Convenient one-liner - balanced with server DX
360    /// let client = Client::connect_http("http://localhost:8080").await?;
361    ///
362    /// // Now use it directly
363    /// let tools = client.list_tools().await?;
364    /// # Ok(())
365    /// # }
366    /// ```
367    ///
368    /// Compare to verbose approach (10+ lines):
369    /// ```rust,no_run
370    /// use turbomcp_client::Client;
371    /// use turbomcp_transport::streamable_http_client::{
372    ///     StreamableHttpClientConfig, StreamableHttpClientTransport
373    /// };
374    ///
375    /// # async fn example() -> turbomcp_protocol::Result<()> {
376    /// let config = StreamableHttpClientConfig {
377    ///     base_url: "http://localhost:8080".to_string(),
378    ///     ..Default::default()
379    /// };
380    /// let transport = StreamableHttpClientTransport::new(config);
381    /// let client = Client::new(transport);
382    /// client.initialize().await?;
383    /// # Ok(())
384    /// # }
385    /// ```
386    pub async fn connect_http(url: impl Into<String>) -> Result<Self> {
387        use turbomcp_transport::streamable_http_client::{
388            StreamableHttpClientConfig, StreamableHttpClientTransport,
389        };
390
391        let config = StreamableHttpClientConfig {
392            base_url: url.into(),
393            ..Default::default()
394        };
395
396        let transport = StreamableHttpClientTransport::new(config);
397        let client = Self::new(transport);
398
399        // Initialize connection immediately
400        client.initialize().await?;
401
402        Ok(client)
403    }
404
405    /// Connect to an HTTP MCP server with custom configuration
406    ///
407    /// Provides more control than `connect_http()` while still being ergonomic.
408    ///
409    /// # Arguments
410    ///
411    /// * `url` - The base URL of the MCP server
412    /// * `config_fn` - Function to customize the configuration
413    ///
414    /// # Examples
415    ///
416    /// ```rust,no_run
417    /// use turbomcp_client::Client;
418    /// use std::time::Duration;
419    ///
420    /// # async fn example() -> turbomcp_protocol::Result<()> {
421    /// let client = Client::connect_http_with("http://localhost:8080", |config| {
422    ///     config.timeout = Duration::from_secs(60);
423    ///     config.endpoint_path = "/api/mcp".to_string();
424    /// }).await?;
425    /// # Ok(())
426    /// # }
427    /// ```
428    pub async fn connect_http_with<F>(url: impl Into<String>, config_fn: F) -> Result<Self>
429    where
430        F: FnOnce(&mut turbomcp_transport::streamable_http_client::StreamableHttpClientConfig),
431    {
432        use turbomcp_transport::streamable_http_client::{
433            StreamableHttpClientConfig, StreamableHttpClientTransport,
434        };
435
436        let mut config = StreamableHttpClientConfig {
437            base_url: url.into(),
438            ..Default::default()
439        };
440
441        config_fn(&mut config);
442
443        let transport = StreamableHttpClientTransport::new(config);
444        let client = Self::new(transport);
445
446        client.initialize().await?;
447
448        Ok(client)
449    }
450}
451
452// ============================================================================
453// TCP-Specific Convenience Constructors (Feature-Gated)
454// ============================================================================
455
456#[cfg(feature = "tcp")]
457impl Client<turbomcp_transport::tcp::TcpTransport> {
458    /// Connect to a TCP MCP server (convenience method)
459    ///
460    /// Convenient one-liner for TCP connections - balanced DX.
461    ///
462    /// # Arguments
463    ///
464    /// * `addr` - Server address (e.g., "127.0.0.1:8765" or localhost:8765")
465    ///
466    /// # Returns
467    ///
468    /// Returns an initialized `Client` ready to use.
469    ///
470    /// # Examples
471    ///
472    /// ```rust,no_run
473    /// # #[cfg(feature = "tcp")]
474    /// use turbomcp_client::Client;
475    ///
476    /// # async fn example() -> turbomcp_protocol::Result<()> {
477    /// let client = Client::connect_tcp("127.0.0.1:8765").await?;
478    /// let tools = client.list_tools().await?;
479    /// # Ok(())
480    /// # }
481    /// ```
482    pub async fn connect_tcp(addr: impl AsRef<str>) -> Result<Self> {
483        use std::net::SocketAddr;
484        use turbomcp_transport::tcp::TcpTransport;
485
486        let server_addr: SocketAddr = addr
487            .as_ref()
488            .parse()
489            .map_err(|e| Error::invalid_request(format!("Invalid address: {}", e)))?;
490
491        // Client binds to 0.0.0.0:0 (any available port)
492        let bind_addr: SocketAddr = if server_addr.is_ipv6() {
493            "[::]:0".parse().expect("valid IPv6 any-port address")
494        } else {
495            "0.0.0.0:0".parse().expect("valid IPv4 any-port address")
496        };
497
498        let transport = TcpTransport::new_client(bind_addr, server_addr);
499        let client = Self::new(transport);
500
501        client.initialize().await?;
502
503        Ok(client)
504    }
505}
506
507// ============================================================================
508// Unix Socket-Specific Convenience Constructors (Feature-Gated)
509// ============================================================================
510
511#[cfg(all(unix, feature = "unix"))]
512impl Client<turbomcp_transport::unix::UnixTransport> {
513    /// Connect to a Unix socket MCP server (convenience method)
514    ///
515    /// Convenient one-liner for Unix socket IPC - balanced DX.
516    ///
517    /// # Arguments
518    ///
519    /// * `path` - Socket file path (e.g., "/tmp/mcp.sock")
520    ///
521    /// # Returns
522    ///
523    /// Returns an initialized `Client` ready to use.
524    ///
525    /// # Examples
526    ///
527    /// ```rust,no_run
528    /// # #[cfg(all(unix, feature = "unix"))]
529    /// use turbomcp_client::Client;
530    ///
531    /// # async fn example() -> turbomcp_protocol::Result<()> {
532    /// let client = Client::connect_unix("/tmp/mcp.sock").await?;
533    /// let tools = client.list_tools().await?;
534    /// # Ok(())
535    /// # }
536    /// ```
537    pub async fn connect_unix(path: impl Into<std::path::PathBuf>) -> Result<Self> {
538        use turbomcp_transport::unix::UnixTransport;
539
540        let transport = UnixTransport::new_client(path.into());
541        let client = Self::new(transport);
542
543        client.initialize().await?;
544
545        Ok(client)
546    }
547}
548
549impl<T: Transport + 'static> Client<T> {
550    /// Register message handlers with the dispatcher
551    ///
552    /// This method sets up the callbacks that handle server-initiated requests
553    /// and notifications. The dispatcher's background task routes incoming
554    /// messages to these handlers.
555    ///
556    /// This is called automatically during Client construction (in `new()` and
557    /// `with_capabilities()`), so you don't need to call it manually.
558    ///
559    /// ## How It Works
560    ///
561    /// The handlers are synchronous closures that spawn async tasks to do the
562    /// actual work. This allows the dispatcher to continue routing messages
563    /// without blocking on handler execution.
564    fn register_dispatcher_handlers(&self) {
565        let dispatcher = self.inner.protocol.dispatcher();
566        let client_for_requests = self.clone();
567        let client_for_notifications = self.clone();
568
569        // Request handler (elicitation, sampling, etc.)
570        let semaphore = Arc::clone(&self.inner.handler_semaphore);
571        let request_handler = Arc::new(move |request: JsonRpcRequest| {
572            let client = client_for_requests.clone();
573            let method = request.method.clone();
574            let req_id = request.id.clone();
575            let semaphore = Arc::clone(&semaphore);
576
577            // ✅ Spawn async task with bounded concurrency
578            tokio::spawn(async move {
579                // Acquire permit (blocks if 100 requests already in flight)
580                let _permit = match semaphore.acquire().await {
581                    Ok(permit) => permit,
582                    Err(_) => {
583                        tracing::warn!(
584                            "Handler semaphore closed, dropping request: method={}",
585                            method
586                        );
587                        return;
588                    }
589                };
590
591                tracing::debug!(
592                    "🔄 [request_handler] Handling server-initiated request: method={}, id={:?}",
593                    method,
594                    req_id
595                );
596                if let Err(e) = client.handle_request(request).await {
597                    tracing::error!(
598                        "❌ [request_handler] Error handling server request '{}': {}",
599                        method,
600                        e
601                    );
602                    tracing::error!("   Request ID: {:?}", req_id);
603                    tracing::error!("   Error kind: {:?}", e.kind);
604                } else {
605                    tracing::debug!(
606                        "✅ [request_handler] Successfully handled server request: method={}, id={:?}",
607                        method,
608                        req_id
609                    );
610                }
611                // Permit automatically released on drop ✅
612            });
613            Ok(())
614        });
615
616        // Notification handler
617        let semaphore_notif = Arc::clone(&self.inner.handler_semaphore);
618        let notification_handler = Arc::new(move |notification: JsonRpcNotification| {
619            let client = client_for_notifications.clone();
620            let semaphore = Arc::clone(&semaphore_notif);
621
622            // ✅ Spawn async task with bounded concurrency
623            tokio::spawn(async move {
624                // Acquire permit (blocks if 100 handlers already in flight)
625                let _permit = match semaphore.acquire().await {
626                    Ok(permit) => permit,
627                    Err(_) => {
628                        tracing::warn!("Handler semaphore closed, dropping notification");
629                        return;
630                    }
631                };
632
633                if let Err(e) = client.handle_notification(notification).await {
634                    tracing::error!("Error handling server notification: {}", e);
635                }
636                // Permit automatically released on drop ✅
637            });
638            Ok(())
639        });
640
641        // Register handlers synchronously - no race condition!
642        // The set_* methods are now synchronous with std::sync::Mutex
643        dispatcher.set_request_handler(request_handler);
644        dispatcher.set_notification_handler(notification_handler);
645        tracing::debug!("Dispatcher handlers registered successfully");
646    }
647
648    /// Handle server-initiated requests (elicitation, sampling, roots)
649    ///
650    /// This method is called by the MessageDispatcher when it receives a request
651    /// from the server. It routes the request to the appropriate handler based on
652    /// the method name.
653    async fn handle_request(&self, request: JsonRpcRequest) -> Result<()> {
654        match request.method.as_str() {
655            "sampling/createMessage" => {
656                let handler_opt = self.inner.sampling_handler.lock().clone();
657                if let Some(handler) = handler_opt {
658                    // Extract request ID for proper correlation
659                    let request_id = match &request.id {
660                        turbomcp_protocol::MessageId::String(s) => s.clone(),
661                        turbomcp_protocol::MessageId::Number(n) => n.to_string(),
662                        turbomcp_protocol::MessageId::Uuid(u) => u.to_string(),
663                    };
664
665                    let params: CreateMessageRequest =
666                        serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
667                            .map_err(|e| {
668                            Error::internal(format!("Invalid createMessage params: {}", e))
669                        })?;
670
671                    match handler.handle_create_message(request_id, params).await {
672                        Ok(result) => {
673                            let result_value = serde_json::to_value(result).map_err(|e| {
674                                Error::internal(format!("Failed to serialize response: {}", e))
675                            })?;
676                            let response = JsonRpcResponse::success(result_value, request.id);
677                            self.send_response(response).await?;
678                        }
679                        Err(e) => {
680                            tracing::warn!(
681                                "⚠️  [handle_request] Sampling handler returned error: {}",
682                                e
683                            );
684
685                            // Preserve error semantics by checking actual error type
686                            // This allows proper error code propagation for retry logic
687                            let (code, message) = if let Some(handler_err) =
688                                e.downcast_ref::<HandlerError>()
689                            {
690                                // HandlerError has explicit JSON-RPC code mapping
691                                let json_err = handler_err.into_jsonrpc_error();
692                                tracing::info!(
693                                    "📋 [handle_request] HandlerError mapped to JSON-RPC code: {}",
694                                    json_err.code
695                                );
696                                (json_err.code, json_err.message)
697                            } else if let Some(proto_err) =
698                                e.downcast_ref::<turbomcp_protocol::Error>()
699                            {
700                                // Protocol errors have ErrorKind-based mapping
701                                tracing::info!(
702                                    "📋 [handle_request] Protocol error mapped to code: {}",
703                                    proto_err.jsonrpc_error_code()
704                                );
705                                (proto_err.jsonrpc_error_code(), proto_err.to_string())
706                            } else {
707                                // Generic errors default to Internal (-32603)
708                                // Log the error type for debugging (should rarely hit this path)
709                                tracing::warn!(
710                                    "📋 [handle_request] Sampling handler returned unknown error type (not HandlerError or Protocol error): {}",
711                                    std::any::type_name_of_val(&*e)
712                                );
713                                (-32603, format!("Sampling handler error: {}", e))
714                            };
715
716                            let error = turbomcp_protocol::jsonrpc::JsonRpcError {
717                                code,
718                                message,
719                                data: None,
720                            };
721                            let response =
722                                JsonRpcResponse::error_response(error, request.id.clone());
723
724                            tracing::info!(
725                                "🔄 [handle_request] Attempting to send error response for request: {:?}",
726                                request.id
727                            );
728                            self.send_response(response).await?;
729                            tracing::info!(
730                                "✅ [handle_request] Error response sent successfully for request: {:?}",
731                                request.id
732                            );
733                        }
734                    }
735                } else {
736                    // No handler configured
737                    let error = turbomcp_protocol::jsonrpc::JsonRpcError {
738                        code: -32601,
739                        message: "Sampling not supported".to_string(),
740                        data: None,
741                    };
742                    let response = JsonRpcResponse::error_response(error, request.id);
743                    self.send_response(response).await?;
744                }
745            }
746            "roots/list" => {
747                // Handle roots/list request from server
748                // Clone the handler Arc to avoid holding mutex across await
749                let handler_opt = self.inner.handlers.lock().roots.clone();
750
751                let roots_result = if let Some(handler) = handler_opt {
752                    handler.handle_roots_request().await
753                } else {
754                    // No handler - return empty list per MCP spec
755                    Ok(Vec::new())
756                };
757
758                match roots_result {
759                    Ok(roots) => {
760                        let result_value =
761                            serde_json::to_value(turbomcp_protocol::types::ListRootsResult {
762                                roots,
763                                _meta: None,
764                            })
765                            .map_err(|e| {
766                                Error::internal(format!(
767                                    "Failed to serialize roots response: {}",
768                                    e
769                                ))
770                            })?;
771                        let response = JsonRpcResponse::success(result_value, request.id);
772                        self.send_response(response).await?;
773                    }
774                    Err(e) => {
775                        // FIXED: Extract error code from HandlerError instead of hardcoding -32603
776                        // Roots handler returns HandlerResult<Vec<Root>>, so error is HandlerError
777                        let json_err = e.into_jsonrpc_error();
778                        let response = JsonRpcResponse::error_response(json_err, request.id);
779                        self.send_response(response).await?;
780                    }
781                }
782            }
783            "elicitation/create" => {
784                // Clone handler Arc before await to avoid holding mutex across await
785                let handler_opt = self.inner.handlers.lock().elicitation.clone();
786                if let Some(handler) = handler_opt {
787                    // Parse elicitation request params as MCP protocol type
788                    let proto_request: turbomcp_protocol::types::ElicitRequest =
789                        serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
790                            .map_err(|e| {
791                            Error::internal(format!("Invalid elicitation params: {}", e))
792                        })?;
793
794                    // Wrap protocol request with ID for handler (preserves type safety!)
795                    let handler_request =
796                        crate::handlers::ElicitationRequest::new(request.id.clone(), proto_request);
797
798                    // Call the registered elicitation handler
799                    match handler.handle_elicitation(handler_request).await {
800                        Ok(elicit_response) => {
801                            // Convert handler response back to protocol type
802                            let proto_result = elicit_response.into_protocol();
803                            let result_value = serde_json::to_value(proto_result).map_err(|e| {
804                                Error::internal(format!(
805                                    "Failed to serialize elicitation response: {}",
806                                    e
807                                ))
808                            })?;
809                            let response = JsonRpcResponse::success(result_value, request.id);
810                            self.send_response(response).await?;
811                        }
812                        Err(e) => {
813                            // Convert handler error to JSON-RPC error using centralized mapping
814                            let response =
815                                JsonRpcResponse::error_response(e.into_jsonrpc_error(), request.id);
816                            self.send_response(response).await?;
817                        }
818                    }
819                } else {
820                    // No handler configured - elicitation not supported
821                    let error = turbomcp_protocol::jsonrpc::JsonRpcError {
822                        code: -32601,
823                        message: "Elicitation not supported - no handler registered".to_string(),
824                        data: None,
825                    };
826                    let response = JsonRpcResponse::error_response(error, request.id);
827                    self.send_response(response).await?;
828                }
829            }
830            _ => {
831                // Unknown method
832                let error = turbomcp_protocol::jsonrpc::JsonRpcError {
833                    code: -32601,
834                    message: format!("Method not found: {}", request.method),
835                    data: None,
836                };
837                let response = JsonRpcResponse::error_response(error, request.id);
838                self.send_response(response).await?;
839            }
840        }
841        Ok(())
842    }
843
844    /// Handle server-initiated notifications
845    ///
846    /// Routes notifications to appropriate handlers based on method name.
847    /// MCP defines several notification types that servers can send to clients:
848    ///
849    /// - `notifications/progress` - Progress updates for long-running operations
850    /// - `notifications/message` - Log messages from server
851    /// - `notifications/resources/updated` - Resource content changed
852    /// - `notifications/resources/list_changed` - Resource list changed
853    async fn handle_notification(&self, notification: JsonRpcNotification) -> Result<()> {
854        match notification.method.as_str() {
855            "notifications/progress" => {
856                // Route to progress handler
857                let handler_opt = self.inner.handlers.lock().get_progress_handler();
858
859                if let Some(handler) = handler_opt {
860                    let progress: crate::handlers::ProgressNotification = serde_json::from_value(
861                        notification.params.unwrap_or(serde_json::Value::Null),
862                    )
863                    .map_err(|e| {
864                        Error::internal(format!("Invalid progress notification: {}", e))
865                    })?;
866
867                    if let Err(e) = handler.handle_progress(progress).await {
868                        tracing::error!("Progress handler error: {}", e);
869                    }
870                } else {
871                    tracing::debug!("Progress notification received (no handler registered)");
872                }
873            }
874
875            "notifications/message" => {
876                // Route to log handler
877                let handler_opt = self.inner.handlers.lock().get_log_handler();
878
879                if let Some(handler) = handler_opt {
880                    // Parse log message
881                    let log: crate::handlers::LoggingNotification = serde_json::from_value(
882                        notification.params.unwrap_or(serde_json::Value::Null),
883                    )
884                    .map_err(|e| Error::internal(format!("Invalid log notification: {}", e)))?;
885
886                    // Call handler
887                    if let Err(e) = handler.handle_log(log).await {
888                        tracing::error!("Log handler error: {}", e);
889                    }
890                } else {
891                    tracing::debug!("Received log notification but no handler registered");
892                }
893            }
894
895            "notifications/resources/updated" => {
896                // Route to resource update handler
897                let handler_opt = self.inner.handlers.lock().get_resource_update_handler();
898
899                if let Some(handler) = handler_opt {
900                    // Parse resource update notification
901                    let update: crate::handlers::ResourceUpdatedNotification =
902                        serde_json::from_value(
903                            notification.params.unwrap_or(serde_json::Value::Null),
904                        )
905                        .map_err(|e| {
906                            Error::internal(format!("Invalid resource update notification: {}", e))
907                        })?;
908
909                    // Call handler
910                    if let Err(e) = handler.handle_resource_update(update).await {
911                        tracing::error!("Resource update handler error: {}", e);
912                    }
913                } else {
914                    tracing::debug!(
915                        "Received resource update notification but no handler registered"
916                    );
917                }
918            }
919
920            "notifications/resources/list_changed" => {
921                // Route to resource list changed handler
922                let handler_opt = self
923                    .inner
924                    .handlers
925                    .lock()
926                    .get_resource_list_changed_handler();
927
928                if let Some(handler) = handler_opt {
929                    if let Err(e) = handler.handle_resource_list_changed().await {
930                        tracing::error!("Resource list changed handler error: {}", e);
931                    }
932                } else {
933                    tracing::debug!(
934                        "Resource list changed notification received (no handler registered)"
935                    );
936                }
937            }
938
939            "notifications/prompts/list_changed" => {
940                // Route to prompt list changed handler
941                let handler_opt = self.inner.handlers.lock().get_prompt_list_changed_handler();
942
943                if let Some(handler) = handler_opt {
944                    if let Err(e) = handler.handle_prompt_list_changed().await {
945                        tracing::error!("Prompt list changed handler error: {}", e);
946                    }
947                } else {
948                    tracing::debug!(
949                        "Prompt list changed notification received (no handler registered)"
950                    );
951                }
952            }
953
954            "notifications/tools/list_changed" => {
955                // Route to tool list changed handler
956                let handler_opt = self.inner.handlers.lock().get_tool_list_changed_handler();
957
958                if let Some(handler) = handler_opt {
959                    if let Err(e) = handler.handle_tool_list_changed().await {
960                        tracing::error!("Tool list changed handler error: {}", e);
961                    }
962                } else {
963                    tracing::debug!(
964                        "Tool list changed notification received (no handler registered)"
965                    );
966                }
967            }
968
969            "notifications/cancelled" => {
970                // Route to cancellation handler
971                let handler_opt = self.inner.handlers.lock().get_cancellation_handler();
972
973                if let Some(handler) = handler_opt {
974                    // Parse cancellation notification
975                    let cancellation: crate::handlers::CancelledNotification =
976                        serde_json::from_value(
977                            notification.params.unwrap_or(serde_json::Value::Null),
978                        )
979                        .map_err(|e| {
980                            Error::internal(format!("Invalid cancellation notification: {}", e))
981                        })?;
982
983                    // Call handler
984                    if let Err(e) = handler.handle_cancellation(cancellation).await {
985                        tracing::error!("Cancellation handler error: {}", e);
986                    }
987                } else {
988                    tracing::debug!("Cancellation notification received (no handler registered)");
989                }
990            }
991
992            _ => {
993                // Unknown notification type
994                tracing::debug!("Received unknown notification: {}", notification.method);
995            }
996        }
997
998        Ok(())
999    }
1000
1001    async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
1002        tracing::info!(
1003            "📤 [send_response] Sending JSON-RPC response: id={:?}",
1004            response.id
1005        );
1006
1007        let payload = serde_json::to_vec(&response).map_err(|e| {
1008            tracing::error!("❌ [send_response] Failed to serialize response: {}", e);
1009            Error::internal(format!("Failed to serialize response: {}", e))
1010        })?;
1011
1012        tracing::debug!(
1013            "📤 [send_response] Response payload: {} bytes",
1014            payload.len()
1015        );
1016        tracing::debug!(
1017            "📤 [send_response] Response JSON: {}",
1018            String::from_utf8_lossy(&payload)
1019        );
1020
1021        let message = TransportMessage::new(
1022            turbomcp_protocol::MessageId::from("response".to_string()),
1023            payload.into(),
1024        );
1025
1026        self.inner
1027            .protocol
1028            .transport()
1029            .send(message)
1030            .await
1031            .map_err(|e| {
1032                tracing::error!("❌ [send_response] Transport send failed: {}", e);
1033                Error::transport(format!("Failed to send response: {}", e))
1034            })?;
1035
1036        tracing::info!(
1037            "✅ [send_response] Response sent successfully: id={:?}",
1038            response.id
1039        );
1040        Ok(())
1041    }
1042
1043    /// Initialize the connection with the MCP server
1044    ///
1045    /// Performs the initialization handshake with the server, negotiating capabilities
1046    /// and establishing the protocol version. This method must be called before
1047    /// any other operations can be performed.
1048    ///
1049    /// # Returns
1050    ///
1051    /// Returns an `InitializeResult` containing server information and negotiated capabilities.
1052    ///
1053    /// # Errors
1054    ///
1055    /// Returns an error if:
1056    /// - The transport connection fails
1057    /// - The server rejects the initialization request
1058    /// - Protocol negotiation fails
1059    ///
1060    /// # Examples
1061    ///
1062    /// ```rust,no_run
1063    /// # use turbomcp_client::Client;
1064    /// # use turbomcp_transport::stdio::StdioTransport;
1065    /// # async fn example() -> turbomcp_protocol::Result<()> {
1066    /// let mut client = Client::new(StdioTransport::new());
1067    ///
1068    /// let result = client.initialize().await?;
1069    /// println!("Server: {} v{}", result.server_info.name, result.server_info.version);
1070    /// # Ok(())
1071    /// # }
1072    /// ```
1073    pub async fn initialize(&self) -> Result<InitializeResult> {
1074        // Auto-connect transport if not already connected
1075        // This provides consistent DX across all transports (Stdio, TCP, HTTP, WebSocket, Unix)
1076        let transport = self.inner.protocol.transport();
1077        let transport_state = transport.state().await;
1078        if !matches!(
1079            transport_state,
1080            turbomcp_transport::TransportState::Connected
1081        ) {
1082            tracing::debug!(
1083                "Auto-connecting transport (current state: {:?})",
1084                transport_state
1085            );
1086            transport
1087                .connect()
1088                .await
1089                .map_err(|e| Error::transport(format!("Failed to connect transport: {}", e)))?;
1090            tracing::info!("Transport connected successfully");
1091        }
1092
1093        // Build client capabilities based on registered handlers (automatic detection)
1094        let mut client_caps = ProtocolClientCapabilities::default();
1095
1096        // Detect sampling capability from handler
1097        if let Some(sampling_caps) = self.get_sampling_capabilities() {
1098            client_caps.sampling = Some(sampling_caps);
1099        }
1100
1101        // Detect elicitation capability from handler
1102        if let Some(elicitation_caps) = self.get_elicitation_capabilities() {
1103            client_caps.elicitation = Some(elicitation_caps);
1104        }
1105
1106        // Detect roots capability from handler
1107        if let Some(roots_caps) = self.get_roots_capabilities() {
1108            client_caps.roots = Some(roots_caps);
1109        }
1110
1111        // Send MCP initialization request
1112        let request = InitializeRequest {
1113            protocol_version: PROTOCOL_VERSION.to_string(),
1114            capabilities: client_caps,
1115            client_info: turbomcp_protocol::types::Implementation {
1116                name: "turbomcp-client".to_string(),
1117                version: env!("CARGO_PKG_VERSION").to_string(),
1118                title: Some("TurboMCP Client".to_string()),
1119                ..Default::default()
1120            },
1121            _meta: None,
1122        };
1123
1124        let protocol_response: ProtocolInitializeResult = self
1125            .inner
1126            .protocol
1127            .request("initialize", Some(serde_json::to_value(request)?))
1128            .await?;
1129
1130        // AtomicBool: lock-free store with Ordering::Relaxed
1131        self.inner.initialized.store(true, Ordering::Relaxed);
1132
1133        // Send initialized notification
1134        self.inner
1135            .protocol
1136            .notify("notifications/initialized", None)
1137            .await?;
1138
1139        // Convert protocol response to client response type
1140        Ok(InitializeResult {
1141            server_info: protocol_response.server_info,
1142            server_capabilities: protocol_response.capabilities,
1143        })
1144    }
1145
1146    /// Subscribe to resource change notifications
1147    ///
1148    /// Registers interest in receiving notifications when the specified
1149    /// resource changes. The server will send notifications when the
1150    /// resource is modified, created, or deleted.
1151    ///
1152    /// # Arguments
1153    ///
1154    /// * `uri` - The URI of the resource to monitor
1155    ///
1156    /// # Returns
1157    ///
1158    /// Returns `EmptyResult` on successful subscription.
1159    ///
1160    /// # Errors
1161    ///
1162    /// Returns an error if:
1163    /// - The client is not initialized
1164    /// - The URI is invalid or empty
1165    /// - The server doesn't support subscriptions
1166    /// - The request fails
1167    ///
1168    /// # Examples
1169    ///
1170    /// ```rust,no_run
1171    /// # use turbomcp_client::Client;
1172    /// # use turbomcp_transport::stdio::StdioTransport;
1173    /// # async fn example() -> turbomcp_protocol::Result<()> {
1174    /// let mut client = Client::new(StdioTransport::new());
1175    /// client.initialize().await?;
1176    ///
1177    /// // Subscribe to file changes
1178    /// client.subscribe("file:///watch/directory").await?;
1179    /// println!("Subscribed to resource changes");
1180    /// # Ok(())
1181    /// # }
1182    /// ```
1183    pub async fn subscribe(&self, uri: &str) -> Result<EmptyResult> {
1184        if !self.inner.initialized.load(Ordering::Relaxed) {
1185            return Err(Error::invalid_request("Client not initialized"));
1186        }
1187
1188        if uri.is_empty() {
1189            return Err(Error::invalid_request("Subscription URI cannot be empty"));
1190        }
1191
1192        // Send resources/subscribe request
1193        let request = SubscribeRequest { uri: uri.into() };
1194
1195        self.inner
1196            .protocol
1197            .request(
1198                "resources/subscribe",
1199                Some(serde_json::to_value(request).map_err(|e| {
1200                    Error::internal(format!("Failed to serialize subscribe request: {}", e))
1201                })?),
1202            )
1203            .await
1204    }
1205
1206    /// Unsubscribe from resource change notifications
1207    ///
1208    /// Cancels a previous subscription to resource changes. After unsubscribing,
1209    /// the client will no longer receive notifications for the specified resource.
1210    ///
1211    /// # Arguments
1212    ///
1213    /// * `uri` - The URI of the resource to stop monitoring
1214    ///
1215    /// # Returns
1216    ///
1217    /// Returns `EmptyResult` on successful unsubscription.
1218    ///
1219    /// # Errors
1220    ///
1221    /// Returns an error if:
1222    /// - The client is not initialized
1223    /// - The URI is invalid or empty
1224    /// - No active subscription exists for the URI
1225    /// - The request fails
1226    ///
1227    /// # Examples
1228    ///
1229    /// ```rust,no_run
1230    /// # use turbomcp_client::Client;
1231    /// # use turbomcp_transport::stdio::StdioTransport;
1232    /// # async fn example() -> turbomcp_protocol::Result<()> {
1233    /// let mut client = Client::new(StdioTransport::new());
1234    /// client.initialize().await?;
1235    ///
1236    /// // Unsubscribe from file changes
1237    /// client.unsubscribe("file:///watch/directory").await?;
1238    /// println!("Unsubscribed from resource changes");
1239    /// # Ok(())
1240    /// # }
1241    /// ```
1242    pub async fn unsubscribe(&self, uri: &str) -> Result<EmptyResult> {
1243        if !self.inner.initialized.load(Ordering::Relaxed) {
1244            return Err(Error::invalid_request("Client not initialized"));
1245        }
1246
1247        if uri.is_empty() {
1248            return Err(Error::invalid_request("Unsubscription URI cannot be empty"));
1249        }
1250
1251        // Send resources/unsubscribe request
1252        let request = UnsubscribeRequest { uri: uri.into() };
1253
1254        self.inner
1255            .protocol
1256            .request(
1257                "resources/unsubscribe",
1258                Some(serde_json::to_value(request).map_err(|e| {
1259                    Error::internal(format!("Failed to serialize unsubscribe request: {}", e))
1260                })?),
1261            )
1262            .await
1263    }
1264
1265    /// Get the client's capabilities configuration
1266    #[must_use]
1267    pub fn capabilities(&self) -> &ClientCapabilities {
1268        &self.inner.capabilities
1269    }
1270
1271    // ============================================================================
1272    // Tasks API Methods (MCP 2025-11-25 Draft - SEP-1686)
1273    // ============================================================================
1274
1275    /// Retrieve the status of a task (tasks/get)
1276    ///
1277    /// Polls the server for the current status of a specific task.
1278    ///
1279    /// # Arguments
1280    ///
1281    /// * `task_id` - The ID of the task to query
1282    ///
1283    /// # Returns
1284    ///
1285    /// Returns the current `Task` state including status, timestamps, and messages.
1286    #[cfg(feature = "experimental-tasks")]
1287    pub async fn get_task(&self, task_id: &str) -> Result<Task> {
1288        let request = GetTaskRequest {
1289            task_id: task_id.to_string(),
1290        };
1291
1292        self.inner
1293            .protocol
1294            .request(
1295                "tasks/get",
1296                Some(serde_json::to_value(request).map_err(|e| {
1297                    Error::internal(format!("Failed to serialize get_task request: {}", e))
1298                })?),
1299            )
1300            .await
1301    }
1302
1303    /// Cancel a running task (tasks/cancel)
1304    ///
1305    /// Attempts to cancel a task execution. This is a best-effort operation.
1306    ///
1307    /// # Arguments
1308    ///
1309    /// * `task_id` - The ID of the task to cancel
1310    ///
1311    /// # Returns
1312    ///
1313    /// Returns the updated `Task` state (typically with status "cancelled").
1314    #[cfg(feature = "experimental-tasks")]
1315    pub async fn cancel_task(&self, task_id: &str) -> Result<Task> {
1316        let request = CancelTaskRequest {
1317            task_id: task_id.to_string(),
1318        };
1319
1320        self.inner
1321            .protocol
1322            .request(
1323                "tasks/cancel",
1324                Some(serde_json::to_value(request).map_err(|e| {
1325                    Error::internal(format!("Failed to serialize cancel_task request: {}", e))
1326                })?),
1327            )
1328            .await
1329    }
1330
1331    /// List all tasks (tasks/list)
1332    ///
1333    /// Retrieves a paginated list of tasks known to the server.
1334    ///
1335    /// # Arguments
1336    ///
1337    /// * `cursor` - Optional pagination cursor from a previous response
1338    /// * `limit` - Optional maximum number of tasks to return
1339    ///
1340    /// # Returns
1341    ///
1342    /// Returns a `ListTasksResult` containing the list of tasks and next cursor.
1343    #[cfg(feature = "experimental-tasks")]
1344    pub async fn list_tasks(
1345        &self,
1346        cursor: Option<String>,
1347        limit: Option<usize>,
1348    ) -> Result<ListTasksResult> {
1349        let request = ListTasksRequest { cursor, limit };
1350
1351        self.inner
1352            .protocol
1353            .request(
1354                "tasks/list",
1355                Some(serde_json::to_value(request).map_err(|e| {
1356                    Error::internal(format!("Failed to serialize list_tasks request: {}", e))
1357                })?),
1358            )
1359            .await
1360    }
1361
1362    /// Retrieve the result of a completed task (tasks/result)
1363    ///
1364    /// Blocks until the task reaches a terminal state (completed, failed, or cancelled),
1365    /// then returns the operation result.
1366    ///
1367    /// # Arguments
1368    ///
1369    /// * `task_id` - The ID of the task to retrieve results for
1370    ///
1371    /// # Returns
1372    ///
1373    /// Returns a `GetTaskPayloadResult` containing the operation result (e.g. CallToolResult).
1374    #[cfg(feature = "experimental-tasks")]
1375    pub async fn get_task_result(&self, task_id: &str) -> Result<GetTaskPayloadResult> {
1376        let request = GetTaskPayloadRequest {
1377            task_id: task_id.to_string(),
1378        };
1379
1380        self.inner
1381            .protocol
1382            .request(
1383                "tasks/result",
1384                Some(serde_json::to_value(request).map_err(|e| {
1385                    Error::internal(format!(
1386                        "Failed to serialize get_task_result request: {}",
1387                        e
1388                    ))
1389                })?),
1390            )
1391            .await
1392    }
1393
1394    // Note: Capability detection methods (has_*_handler, get_*_capabilities)
1395    // are defined in their respective operation modules:
1396    // - sampling.rs: has_sampling_handler, get_sampling_capabilities
1397    // - handlers.rs: has_elicitation_handler, has_roots_handler
1398    //
1399    // Additional capability getters for elicitation and roots added below
1400    // since they're used during initialization
1401
1402    /// Get elicitation capabilities if handler is registered
1403    /// Automatically detects capability based on registered handler
1404    fn get_elicitation_capabilities(
1405        &self,
1406    ) -> Option<turbomcp_protocol::types::ElicitationCapabilities> {
1407        if self.has_elicitation_handler() {
1408            // Currently returns default capabilities. In the future, schema_validation support
1409            // could be detected from handler traits by adding a HasSchemaValidation marker trait
1410            // that handlers could implement. For now, handlers validate schemas themselves.
1411            Some(turbomcp_protocol::types::ElicitationCapabilities::default())
1412        } else {
1413            None
1414        }
1415    }
1416
1417    /// Get roots capabilities if handler is registered
1418    fn get_roots_capabilities(&self) -> Option<turbomcp_protocol::types::RootsCapabilities> {
1419        if self.has_roots_handler() {
1420            // Roots capabilities indicate whether list can change
1421            Some(turbomcp_protocol::types::RootsCapabilities {
1422                list_changed: Some(true), // Support dynamic roots by default
1423            })
1424        } else {
1425            None
1426        }
1427    }
1428}
1429
1430#[cfg(test)]
1431mod tests {
1432    use super::*;
1433    use std::future::Future;
1434    use std::pin::Pin;
1435    use turbomcp_transport::{
1436        TransportCapabilities, TransportConfig, TransportMessage, TransportMetrics,
1437        TransportResult, TransportState, TransportType,
1438    };
1439
1440    #[derive(Debug, Default)]
1441    struct NoopTransport {
1442        capabilities: TransportCapabilities,
1443    }
1444
1445    impl Transport for NoopTransport {
1446        fn transport_type(&self) -> TransportType {
1447            TransportType::Stdio
1448        }
1449
1450        fn capabilities(&self) -> &TransportCapabilities {
1451            &self.capabilities
1452        }
1453
1454        fn state(&self) -> Pin<Box<dyn Future<Output = TransportState> + Send + '_>> {
1455            Box::pin(async { TransportState::Disconnected })
1456        }
1457
1458        fn connect(&self) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + '_>> {
1459            Box::pin(async { Ok(()) })
1460        }
1461
1462        fn disconnect(&self) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + '_>> {
1463            Box::pin(async { Ok(()) })
1464        }
1465
1466        fn send(
1467            &self,
1468            _message: TransportMessage,
1469        ) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + '_>> {
1470            Box::pin(async { Ok(()) })
1471        }
1472
1473        fn receive(
1474            &self,
1475        ) -> Pin<Box<dyn Future<Output = TransportResult<Option<TransportMessage>>> + Send + '_>>
1476        {
1477            Box::pin(async { Ok(None) })
1478        }
1479
1480        fn metrics(&self) -> Pin<Box<dyn Future<Output = TransportMetrics> + Send + '_>> {
1481            Box::pin(async { TransportMetrics::default() })
1482        }
1483
1484        fn configure(
1485            &self,
1486            _config: TransportConfig,
1487        ) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + '_>> {
1488            Box::pin(async { Ok(()) })
1489        }
1490    }
1491
1492    #[tokio::test]
1493    async fn test_with_capabilities_and_config_uses_handler_limit() {
1494        let capabilities = ClientCapabilities {
1495            max_concurrent_handlers: 7,
1496            ..Default::default()
1497        };
1498
1499        let client = Client::with_capabilities_and_config(
1500            NoopTransport::default(),
1501            capabilities,
1502            TransportConfig::default(),
1503        );
1504
1505        assert_eq!(client.inner.handler_semaphore.available_permits(), 7);
1506    }
1507
1508    #[tokio::test]
1509    async fn test_shutdown_sets_shutdown_flag() {
1510        let client = Client::new(NoopTransport::default());
1511        assert!(!client.inner.shutdown_requested.load(Ordering::Relaxed));
1512
1513        client.shutdown().await.expect("shutdown should succeed");
1514
1515        assert!(client.inner.shutdown_requested.load(Ordering::Relaxed));
1516    }
1517}