Skip to main content

turbomcp_client/client/
core.rs

1//! Core Client implementation for MCP communication
2//!
3//! This module contains the main `Client<T>` struct and its implementation,
4//! providing the core MCP client functionality including:
5//!
6//! - Connection initialization and lifecycle management
7//! - Message processing and bidirectional communication
8//! - MCP operation support (tools, prompts, resources, sampling, etc.)
9//! - Plugin middleware integration
10//! - Handler registration and management
11//!
12//! # Architecture
13//!
14//! `Client<T>` is implemented as a cheaply-cloneable Arc wrapper with interior
15//! mutability (same pattern as reqwest and AWS SDK):
16//!
17//! - **AtomicBool** for initialized flag (lock-free)
18//! - **Arc<Mutex<...>>** for handlers/plugins (infrequent mutation)
19//! - **`Arc<ClientInner<T>>`** for cheap cloning
20
21use parking_lot::Mutex;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, Ordering};
24
25use tokio::sync::Semaphore;
26
27use turbomcp_protocol::jsonrpc::*;
28#[cfg(feature = "experimental-tasks")]
29use turbomcp_protocol::types::tasks::*;
30use turbomcp_protocol::types::{
31    ClientCapabilities as ProtocolClientCapabilities, InitializeResult as ProtocolInitializeResult,
32    *,
33};
34use turbomcp_protocol::{Error, PROTOCOL_VERSION, Result};
35use turbomcp_transport::{Transport, TransportConfig, TransportMessage};
36
37use super::config::InitializeResult;
38use super::protocol::ProtocolClient;
39use crate::{
40    ClientCapabilities,
41    handlers::{HandlerError, HandlerRegistry},
42    sampling::SamplingHandler,
43};
44
45/// Inner client state with interior mutability
46///
47/// This structure contains the actual client state and is wrapped in Arc<...>
48/// to enable cheap cloning (same pattern as reqwest and AWS SDK).
49pub(super) struct ClientInner<T: Transport + 'static> {
50    /// Protocol client for low-level communication
51    pub(super) protocol: ProtocolClient<T>,
52
53    /// Client capabilities (immutable after construction)
54    pub(super) capabilities: ClientCapabilities,
55
56    /// Initialization state (lock-free atomic boolean)
57    pub(super) initialized: AtomicBool,
58
59    /// Tracks whether graceful shutdown has already been requested.
60    pub(super) shutdown_requested: AtomicBool,
61
62    /// Optional sampling handler (mutex for dynamic updates)
63    pub(super) sampling_handler: Arc<Mutex<Option<Arc<dyn SamplingHandler>>>>,
64
65    /// Handler registry for bidirectional communication (mutex for registration)
66    pub(super) handlers: Arc<Mutex<HandlerRegistry>>,
67
68    /// ✅ Semaphore for bounded concurrency of request/notification handlers
69    /// Limits concurrent server-initiated request handlers to prevent resource exhaustion
70    pub(super) handler_semaphore: Arc<Semaphore>,
71}
72
73/// The core MCP client implementation
74///
75/// Client provides a comprehensive interface for communicating with MCP servers,
76/// supporting all protocol features including tools, prompts, resources, sampling,
77/// elicitation, and bidirectional communication patterns.
78///
79/// # Clone Pattern
80///
81/// `Client<T>` is cheaply cloneable via Arc (same pattern as reqwest and AWS SDK).
82/// All clones share the same underlying connection and state:
83///
84/// ```rust,no_run
85/// use turbomcp_client::Client;
86/// use turbomcp_transport::stdio::StdioTransport;
87///
88/// # async fn example() -> turbomcp_protocol::Result<()> {
89/// let client = Client::new(StdioTransport::new());
90/// client.initialize().await?;
91///
92/// // Cheap clone - shares same connection
93/// let client2 = client.clone();
94/// tokio::spawn(async move {
95///     client2.list_tools().await.ok();
96/// });
97/// # Ok(())
98/// # }
99/// ```
100///
101/// The client must be initialized before use by calling `initialize()` to perform
102/// the MCP handshake and capability negotiation.
103///
104/// # Features
105///
106/// - **Protocol Compliance**: Full current MCP specification support
107/// - **Bidirectional Communication**: Server-initiated requests and client responses
108/// - **Plugin Middleware**: Extensible request/response processing
109/// - **Handler Registry**: Callbacks for server-initiated operations
110/// - **Connection Management**: Robust error handling and recovery
111/// - **Type Safety**: Compile-time guarantees for MCP message types
112/// - **Cheap Cloning**: Arc-based sharing like reqwest/AWS SDK
113///
114/// # Examples
115///
116/// ```rust,no_run
117/// use turbomcp_client::Client;
118/// use turbomcp_transport::stdio::StdioTransport;
119/// use std::collections::HashMap;
120///
121/// # async fn example() -> turbomcp_protocol::Result<()> {
122/// // Create and initialize client (no mut needed!)
123/// let client = Client::new(StdioTransport::new());
124/// let init_result = client.initialize().await?;
125/// println!("Connected to: {}", init_result.server_info.name);
126///
127/// // Use MCP operations
128/// let tools = client.list_tools().await?;
129/// let mut args = HashMap::new();
130/// args.insert("input".to_string(), serde_json::json!("test"));
131/// let result = client.call_tool("my_tool", Some(args), None).await?;
132/// # Ok(())
133/// # }
134/// ```
135pub struct Client<T: Transport + 'static> {
136    pub(super) inner: Arc<ClientInner<T>>,
137}
138
139/// Clone implementation via Arc (same pattern as reqwest/AWS SDK)
140///
141/// Cloning a Client is cheap (just an Arc clone) and all clones share
142/// the same underlying connection and state.
143impl<T: Transport + 'static> Clone for Client<T> {
144    fn clone(&self) -> Self {
145        Self {
146            inner: Arc::clone(&self.inner),
147        }
148    }
149}
150
151impl<T: Transport + 'static> Drop for ClientInner<T> {
152    fn drop(&mut self) {
153        // Best-effort cleanup if shutdown() wasn't called
154        // This is a safety net, but applications SHOULD call shutdown() explicitly
155
156        // Single clear warning
157        if !self.shutdown_requested.load(Ordering::Relaxed) {
158            tracing::warn!(
159                "MCP Client dropped without explicit shutdown(). \
160                 Call client.shutdown().await before dropping for clean resource cleanup. \
161                 Background tasks (WebSocket reconnection) may continue running."
162            );
163        }
164
165        // We can shutdown the dispatcher (it's synchronous)
166        self.protocol.dispatcher().shutdown();
167    }
168}
169
170impl<T: Transport + 'static> Client<T> {
171    /// Create a new client with the specified transport
172    ///
173    /// Creates a new MCP client instance with default capabilities.
174    /// The client must be initialized before use by calling `initialize()`.
175    ///
176    /// # Arguments
177    ///
178    /// * `transport` - The transport implementation to use for communication
179    ///
180    /// # Examples
181    ///
182    /// ```rust,no_run
183    /// use turbomcp_client::Client;
184    /// use turbomcp_transport::stdio::StdioTransport;
185    ///
186    /// let transport = StdioTransport::new();
187    /// let client = Client::new(transport);
188    /// ```
189    pub fn new(transport: T) -> Self {
190        Self::new_with_config(transport, TransportConfig::default())
191    }
192
193    /// Create a new client with an explicit transport configuration.
194    pub fn new_with_config(transport: T, config: TransportConfig) -> Self {
195        let capabilities = ClientCapabilities::default();
196        let client = Self {
197            inner: Arc::new(ClientInner {
198                protocol: ProtocolClient::with_config(transport, config),
199                capabilities: capabilities.clone(),
200                initialized: AtomicBool::new(false),
201                shutdown_requested: AtomicBool::new(false),
202                sampling_handler: Arc::new(Mutex::new(None)),
203                handlers: Arc::new(Mutex::new(HandlerRegistry::new())),
204                handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), // ✅ Configurable concurrent handlers
205            }),
206        };
207
208        // Register dispatcher handlers for bidirectional communication
209        client.register_dispatcher_handlers();
210
211        client
212    }
213
214    /// Create a new client with custom capabilities
215    ///
216    /// # Arguments
217    ///
218    /// * `transport` - The transport implementation to use
219    /// * `capabilities` - The client capabilities to negotiate
220    ///
221    /// # Examples
222    ///
223    /// ```rust,no_run
224    /// use turbomcp_client::{Client, ClientCapabilities};
225    /// use turbomcp_transport::stdio::StdioTransport;
226    ///
227    /// let capabilities = ClientCapabilities {
228    ///     tools: true,
229    ///     prompts: true,
230    ///     resources: false,
231    ///     sampling: false,
232    ///     max_concurrent_handlers: 100,
233    /// };
234    ///
235    /// let transport = StdioTransport::new();
236    /// let client = Client::with_capabilities(transport, capabilities);
237    /// ```
238    pub fn with_capabilities(transport: T, capabilities: ClientCapabilities) -> Self {
239        Self::with_capabilities_and_config(transport, capabilities, TransportConfig::default())
240    }
241
242    /// Create a new client with custom capabilities and transport configuration.
243    pub fn with_capabilities_and_config(
244        transport: T,
245        capabilities: ClientCapabilities,
246        config: TransportConfig,
247    ) -> Self {
248        let client = Self {
249            inner: Arc::new(ClientInner {
250                protocol: ProtocolClient::with_config(transport, config),
251                capabilities: capabilities.clone(),
252                initialized: AtomicBool::new(false),
253                shutdown_requested: AtomicBool::new(false),
254                sampling_handler: Arc::new(Mutex::new(None)),
255                handlers: Arc::new(Mutex::new(HandlerRegistry::new())),
256                handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), // ✅ Configurable concurrent handlers
257            }),
258        };
259
260        // Register dispatcher handlers for bidirectional communication
261        client.register_dispatcher_handlers();
262
263        client
264    }
265
266    /// Shutdown the client and clean up all resources
267    ///
268    /// This method performs a **graceful shutdown** of the MCP client by:
269    /// 1. Stopping the message dispatcher background task
270    /// 2. Disconnecting the transport (closes connection, stops background tasks)
271    ///
272    /// **CRITICAL**: After calling `shutdown()`, the client can no longer be used.
273    ///
274    /// # Why This Method Exists
275    ///
276    /// The `Drop` implementation cannot call async methods like `transport.disconnect()`,
277    /// which is required for proper cleanup of WebSocket connections and background tasks.
278    /// Without calling `shutdown()`, WebSocket reconnection tasks will continue running.
279    ///
280    /// # Best Practices
281    ///
282    /// - **Always call `shutdown()` before dropping** for clean resource cleanup
283    /// - For applications: call in signal handlers (`SIGINT`, `SIGTERM`)
284    /// - For tests: call in cleanup/teardown
285    /// - If forgotten, Drop will log warnings and do best-effort cleanup
286    ///
287    /// # Examples
288    ///
289    /// ```rust,no_run
290    /// # use turbomcp_client::Client;
291    /// # use turbomcp_transport::stdio::StdioTransport;
292    /// # async fn example() -> turbomcp_protocol::Result<()> {
293    /// let client = Client::new(StdioTransport::new());
294    /// client.initialize().await?;
295    ///
296    /// // Use client...
297    /// let _tools = client.list_tools().await?;
298    ///
299    /// // ✅ Clean shutdown
300    /// client.shutdown().await?;
301    /// # Ok(())
302    /// # }
303    /// ```
304    pub async fn shutdown(&self) -> Result<()> {
305        self.inner.shutdown_requested.store(true, Ordering::Relaxed);
306        tracing::info!("Shutting down MCP client");
307
308        // 1. Shutdown message dispatcher
309        self.inner.protocol.dispatcher().shutdown();
310        tracing::debug!("Message dispatcher stopped");
311
312        // 2. Disconnect transport (WebSocket: stops reconnection, HTTP: closes connections)
313        match self.inner.protocol.transport().disconnect().await {
314            Ok(()) => {
315                tracing::info!("Transport disconnected successfully");
316            }
317            Err(e) => {
318                tracing::warn!("Transport disconnect error (may already be closed): {}", e);
319            }
320        }
321
322        tracing::info!("MCP client shutdown complete");
323        Ok(())
324    }
325}
326
327// ============================================================================
328// HTTP-Specific Convenience Constructors (Feature-Gated)
329// ============================================================================
330
331#[cfg(feature = "http")]
332impl Client<turbomcp_transport::streamable_http_client::StreamableHttpClientTransport> {
333    /// Connect to an HTTP MCP server (convenience method)
334    ///
335    /// This is a convenient one-liner alternative to manual configuration.
336    /// Creates an HTTP client, connects, and initializes in one call.
337    ///
338    /// # Arguments
339    ///
340    /// * `url` - The base URL of the MCP server (e.g., "http://localhost:8080")
341    ///
342    /// # Returns
343    ///
344    /// Returns an initialized `Client` ready to use.
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if:
349    /// - The URL is invalid
350    /// - Connection to the server fails
351    /// - Initialization handshake fails
352    ///
353    /// # Examples
354    ///
355    /// ```rust,no_run
356    /// use turbomcp_client::Client;
357    ///
358    /// # async fn example() -> turbomcp_protocol::Result<()> {
359    /// // Convenient one-liner - balanced with server DX
360    /// let client = Client::connect_http("http://localhost:8080").await?;
361    ///
362    /// // Now use it directly
363    /// let tools = client.list_tools().await?;
364    /// # Ok(())
365    /// # }
366    /// ```
367    ///
368    /// Compare to verbose approach (10+ lines):
369    /// ```rust,no_run
370    /// use turbomcp_client::Client;
371    /// use turbomcp_transport::streamable_http_client::{
372    ///     StreamableHttpClientConfig, StreamableHttpClientTransport
373    /// };
374    ///
375    /// # async fn example() -> turbomcp_protocol::Result<()> {
376    /// let config = StreamableHttpClientConfig {
377    ///     base_url: "http://localhost:8080".to_string(),
378    ///     ..Default::default()
379    /// };
380    /// let transport = StreamableHttpClientTransport::new(config)
381    ///     .map_err(|e| turbomcp_protocol::Error::transport(e.to_string()))?;
382    /// let client = Client::new(transport);
383    /// client.initialize().await?;
384    /// # Ok(())
385    /// # }
386    /// ```
387    pub async fn connect_http(url: impl Into<String>) -> Result<Self> {
388        use turbomcp_transport::streamable_http_client::{
389            StreamableHttpClientConfig, StreamableHttpClientTransport,
390        };
391
392        let config = StreamableHttpClientConfig {
393            base_url: url.into(),
394            ..Default::default()
395        };
396
397        let transport = StreamableHttpClientTransport::new(config).map_err(|e| {
398            turbomcp_protocol::Error::transport(format!("Failed to build HTTP transport: {e}"))
399        })?;
400        let client = Self::new(transport);
401
402        // Initialize connection immediately
403        client.initialize().await?;
404
405        Ok(client)
406    }
407
408    /// Connect to an HTTP MCP server with custom configuration
409    ///
410    /// Provides more control than `connect_http()` while still being ergonomic.
411    ///
412    /// # Arguments
413    ///
414    /// * `url` - The base URL of the MCP server
415    /// * `config_fn` - Function to customize the configuration
416    ///
417    /// # Examples
418    ///
419    /// ```rust,no_run
420    /// use turbomcp_client::Client;
421    /// use std::time::Duration;
422    ///
423    /// # async fn example() -> turbomcp_protocol::Result<()> {
424    /// let client = Client::connect_http_with("http://localhost:8080", |config| {
425    ///     config.timeout = Duration::from_secs(60);
426    ///     config.endpoint_path = "/api/mcp".to_string();
427    /// }).await?;
428    /// # Ok(())
429    /// # }
430    /// ```
431    pub async fn connect_http_with<F>(url: impl Into<String>, config_fn: F) -> Result<Self>
432    where
433        F: FnOnce(&mut turbomcp_transport::streamable_http_client::StreamableHttpClientConfig),
434    {
435        use turbomcp_transport::streamable_http_client::{
436            StreamableHttpClientConfig, StreamableHttpClientTransport,
437        };
438
439        let mut config = StreamableHttpClientConfig {
440            base_url: url.into(),
441            ..Default::default()
442        };
443
444        config_fn(&mut config);
445
446        let transport = StreamableHttpClientTransport::new(config).map_err(|e| {
447            turbomcp_protocol::Error::transport(format!("Failed to build HTTP transport: {e}"))
448        })?;
449        let client = Self::new(transport);
450
451        client.initialize().await?;
452
453        Ok(client)
454    }
455}
456
457// ============================================================================
458// TCP-Specific Convenience Constructors (Feature-Gated)
459// ============================================================================
460
461#[cfg(feature = "tcp")]
462impl Client<turbomcp_transport::tcp::TcpTransport> {
463    /// Connect to a TCP MCP server (convenience method)
464    ///
465    /// Convenient one-liner for TCP connections - balanced DX.
466    ///
467    /// # Arguments
468    ///
469    /// * `addr` - Server address as a numeric `SocketAddr` (e.g. `"127.0.0.1:8765"`,
470    ///   `"[::1]:8765"`). DNS hostnames like `"localhost:8765"` are NOT resolved here —
471    ///   pre-resolve via `tokio::net::lookup_host` if needed.
472    ///
473    /// # Returns
474    ///
475    /// Returns an initialized `Client` ready to use.
476    ///
477    /// # Examples
478    ///
479    /// ```rust,no_run
480    /// # #[cfg(feature = "tcp")]
481    /// use turbomcp_client::Client;
482    ///
483    /// # async fn example() -> turbomcp_protocol::Result<()> {
484    /// let client = Client::connect_tcp("127.0.0.1:8765").await?;
485    /// let tools = client.list_tools().await?;
486    /// # Ok(())
487    /// # }
488    /// ```
489    pub async fn connect_tcp(addr: impl AsRef<str>) -> Result<Self> {
490        use std::net::SocketAddr;
491        use turbomcp_transport::tcp::TcpTransport;
492
493        let server_addr: SocketAddr = addr
494            .as_ref()
495            .parse()
496            .map_err(|e| Error::invalid_request(format!("Invalid address: {}", e)))?;
497
498        // Client binds to 0.0.0.0:0 (any available port)
499        let bind_addr: SocketAddr = if server_addr.is_ipv6() {
500            "[::]:0".parse().expect("valid IPv6 any-port address")
501        } else {
502            "0.0.0.0:0".parse().expect("valid IPv4 any-port address")
503        };
504
505        let transport = TcpTransport::new_client(bind_addr, server_addr);
506        let client = Self::new(transport);
507
508        client.initialize().await?;
509
510        Ok(client)
511    }
512}
513
514// ============================================================================
515// Unix Socket-Specific Convenience Constructors (Feature-Gated)
516// ============================================================================
517
518#[cfg(all(unix, feature = "unix"))]
519impl Client<turbomcp_transport::unix::UnixTransport> {
520    /// Connect to a Unix socket MCP server (convenience method)
521    ///
522    /// Convenient one-liner for Unix socket IPC - balanced DX.
523    ///
524    /// # Arguments
525    ///
526    /// * `path` - Socket file path (e.g., "/tmp/mcp.sock")
527    ///
528    /// # Returns
529    ///
530    /// Returns an initialized `Client` ready to use.
531    ///
532    /// # Examples
533    ///
534    /// ```rust,no_run
535    /// # #[cfg(all(unix, feature = "unix"))]
536    /// use turbomcp_client::Client;
537    ///
538    /// # async fn example() -> turbomcp_protocol::Result<()> {
539    /// let client = Client::connect_unix("/tmp/mcp.sock").await?;
540    /// let tools = client.list_tools().await?;
541    /// # Ok(())
542    /// # }
543    /// ```
544    pub async fn connect_unix(path: impl Into<std::path::PathBuf>) -> Result<Self> {
545        use turbomcp_transport::unix::UnixTransport;
546
547        let transport = UnixTransport::new_client(path.into());
548        let client = Self::new(transport);
549
550        client.initialize().await?;
551
552        Ok(client)
553    }
554}
555
556impl<T: Transport + 'static> Client<T> {
557    /// Register message handlers with the dispatcher
558    ///
559    /// This method sets up the callbacks that handle server-initiated requests
560    /// and notifications. The dispatcher's background task routes incoming
561    /// messages to these handlers.
562    ///
563    /// This is called automatically during Client construction (in `new()` and
564    /// `with_capabilities()`), so you don't need to call it manually.
565    ///
566    /// ## How It Works
567    ///
568    /// The handlers are synchronous closures that spawn async tasks to do the
569    /// actual work. This allows the dispatcher to continue routing messages
570    /// without blocking on handler execution.
571    fn register_dispatcher_handlers(&self) {
572        let dispatcher = self.inner.protocol.dispatcher();
573        let client_for_requests = self.clone();
574        let client_for_notifications = self.clone();
575
576        // Request handler (elicitation, sampling, etc.)
577        let semaphore = Arc::clone(&self.inner.handler_semaphore);
578        let request_handler = Arc::new(move |request: JsonRpcRequest| {
579            let client = client_for_requests.clone();
580            let method = request.method.clone();
581            let req_id = request.id.clone();
582            let semaphore = Arc::clone(&semaphore);
583
584            // ✅ Spawn async task with bounded concurrency
585            tokio::spawn(async move {
586                // Acquire permit (blocks if 100 requests already in flight)
587                let _permit = match semaphore.acquire().await {
588                    Ok(permit) => permit,
589                    Err(_) => {
590                        tracing::warn!(
591                            "Handler semaphore closed, dropping request: method={}",
592                            method
593                        );
594                        return;
595                    }
596                };
597
598                tracing::debug!(
599                    "[request_handler] Handling server-initiated request: method={}, id={:?}",
600                    method,
601                    req_id
602                );
603                if let Err(e) = client.handle_request(request).await {
604                    tracing::error!(
605                        "[request_handler] Error handling server request '{}': {}",
606                        method,
607                        e
608                    );
609                    tracing::error!("   Request ID: {:?}", req_id);
610                    tracing::error!("   Error kind: {:?}", e.kind);
611                } else {
612                    tracing::debug!(
613                        "[request_handler] Successfully handled server request: method={}, id={:?}",
614                        method,
615                        req_id
616                    );
617                }
618            });
619            Ok(())
620        });
621
622        // Notification handler
623        let semaphore_notif = Arc::clone(&self.inner.handler_semaphore);
624        let notification_handler = Arc::new(move |notification: JsonRpcNotification| {
625            let client = client_for_notifications.clone();
626            let semaphore = Arc::clone(&semaphore_notif);
627
628            // ✅ Spawn async task with bounded concurrency
629            tokio::spawn(async move {
630                // Acquire permit (blocks if 100 handlers already in flight)
631                let _permit = match semaphore.acquire().await {
632                    Ok(permit) => permit,
633                    Err(_) => {
634                        tracing::warn!("Handler semaphore closed, dropping notification");
635                        return;
636                    }
637                };
638
639                if let Err(e) = client.handle_notification(notification).await {
640                    tracing::error!("Error handling server notification: {}", e);
641                }
642                // Permit automatically released on drop ✅
643            });
644            Ok(())
645        });
646
647        // Register handlers synchronously - no race condition!
648        // The set_* methods are now synchronous with std::sync::Mutex
649        dispatcher.set_request_handler(request_handler);
650        dispatcher.set_notification_handler(notification_handler);
651        tracing::debug!("Dispatcher handlers registered successfully");
652    }
653
654    /// Handle server-initiated requests (elicitation, sampling, roots)
655    ///
656    /// This method is called by the MessageDispatcher when it receives a request
657    /// from the server. It routes the request to the appropriate handler based on
658    /// the method name.
659    async fn handle_request(&self, request: JsonRpcRequest) -> Result<()> {
660        match request.method.as_str() {
661            "sampling/createMessage" => {
662                let handler_opt = self.inner.sampling_handler.lock().clone();
663                if let Some(handler) = handler_opt {
664                    // Extract request ID for proper correlation
665                    let request_id = match &request.id {
666                        turbomcp_protocol::MessageId::String(s) => s.clone(),
667                        turbomcp_protocol::MessageId::Number(n) => n.to_string(),
668                        turbomcp_protocol::MessageId::Uuid(u) => u.to_string(),
669                    };
670
671                    let params: CreateMessageRequest =
672                        serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
673                            .map_err(|e| {
674                            Error::internal(format!("Invalid createMessage params: {}", e))
675                        })?;
676
677                    match handler.handle_create_message(request_id, params).await {
678                        Ok(result) => {
679                            let result_value = serde_json::to_value(result).map_err(|e| {
680                                Error::internal(format!("Failed to serialize response: {}", e))
681                            })?;
682                            let response = JsonRpcResponse::success(result_value, request.id);
683                            self.send_response(response).await?;
684                        }
685                        Err(e) => {
686                            tracing::warn!(
687                                "[handle_request] Sampling handler returned error: {}",
688                                e
689                            );
690
691                            // Preserve error semantics by checking actual error type
692                            // This allows proper error code propagation for retry logic
693                            let (code, message) = if let Some(handler_err) =
694                                e.downcast_ref::<HandlerError>()
695                            {
696                                // HandlerError has explicit JSON-RPC code mapping
697                                let json_err = handler_err.into_jsonrpc_error();
698                                tracing::debug!(
699                                    "[handle_request] HandlerError mapped to JSON-RPC code: {}",
700                                    json_err.code
701                                );
702                                (json_err.code, json_err.message)
703                            } else if let Some(proto_err) =
704                                e.downcast_ref::<turbomcp_protocol::Error>()
705                            {
706                                // Protocol errors have ErrorKind-based mapping
707                                tracing::debug!(
708                                    "[handle_request] Protocol error mapped to code: {}",
709                                    proto_err.jsonrpc_error_code()
710                                );
711                                (proto_err.jsonrpc_error_code(), proto_err.to_string())
712                            } else {
713                                // Generic errors default to Internal (-32603)
714                                // Log the error type for debugging (should rarely hit this path)
715                                tracing::warn!(
716                                    "[handle_request] Sampling handler returned unknown error type (not HandlerError or Protocol error): {}",
717                                    std::any::type_name_of_val(&*e)
718                                );
719                                (-32603, format!("Sampling handler error: {}", e))
720                            };
721
722                            let error = turbomcp_protocol::jsonrpc::JsonRpcError {
723                                code,
724                                message,
725                                data: None,
726                            };
727                            let response =
728                                JsonRpcResponse::error_response(error, request.id.clone());
729
730                            tracing::debug!(
731                                "[handle_request] Sending error response for request: {:?}",
732                                request.id
733                            );
734                            self.send_response(response).await?;
735                        }
736                    }
737                } else {
738                    // No handler configured
739                    let error = turbomcp_protocol::jsonrpc::JsonRpcError {
740                        code: -32601,
741                        message: "Sampling not supported".to_string(),
742                        data: None,
743                    };
744                    let response = JsonRpcResponse::error_response(error, request.id);
745                    self.send_response(response).await?;
746                }
747            }
748            "roots/list" => {
749                // Handle roots/list request from server
750                // Clone the handler Arc to avoid holding mutex across await
751                let handler_opt = self.inner.handlers.lock().roots.clone();
752
753                let roots_result = if let Some(handler) = handler_opt {
754                    handler.handle_roots_request().await
755                } else {
756                    // No handler - return empty list per MCP spec
757                    Ok(Vec::new())
758                };
759
760                match roots_result {
761                    Ok(roots) => {
762                        let result_value =
763                            serde_json::to_value(turbomcp_protocol::types::ListRootsResult {
764                                roots,
765                                _meta: None,
766                            })
767                            .map_err(|e| {
768                                Error::internal(format!(
769                                    "Failed to serialize roots response: {}",
770                                    e
771                                ))
772                            })?;
773                        let response = JsonRpcResponse::success(result_value, request.id);
774                        self.send_response(response).await?;
775                    }
776                    Err(e) => {
777                        // FIXED: Extract error code from HandlerError instead of hardcoding -32603
778                        // Roots handler returns HandlerResult<Vec<Root>>, so error is HandlerError
779                        let json_err = e.into_jsonrpc_error();
780                        let response = JsonRpcResponse::error_response(json_err, request.id);
781                        self.send_response(response).await?;
782                    }
783                }
784            }
785            "elicitation/create" => {
786                // Clone handler Arc before await to avoid holding mutex across await
787                let handler_opt = self.inner.handlers.lock().elicitation.clone();
788                if let Some(handler) = handler_opt {
789                    // Parse elicitation request params as MCP protocol type
790                    let proto_params: turbomcp_protocol::types::ElicitRequestParams =
791                        serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
792                            .map_err(|e| {
793                            Error::internal(format!("Invalid elicitation params: {}", e))
794                        })?;
795
796                    // Wrap protocol params with ID for handler (preserves type safety!)
797                    let handler_request =
798                        crate::handlers::ElicitationRequest::new(request.id.clone(), proto_params);
799
800                    // Call the registered elicitation handler
801                    match handler.handle_elicitation(handler_request).await {
802                        Ok(elicit_response) => {
803                            // Convert handler response back to protocol type
804                            let proto_result = elicit_response.into_protocol();
805                            let result_value = serde_json::to_value(proto_result).map_err(|e| {
806                                Error::internal(format!(
807                                    "Failed to serialize elicitation response: {}",
808                                    e
809                                ))
810                            })?;
811                            let response = JsonRpcResponse::success(result_value, request.id);
812                            self.send_response(response).await?;
813                        }
814                        Err(e) => {
815                            // Convert handler error to JSON-RPC error using centralized mapping
816                            let response =
817                                JsonRpcResponse::error_response(e.into_jsonrpc_error(), request.id);
818                            self.send_response(response).await?;
819                        }
820                    }
821                } else {
822                    // No handler configured - elicitation not supported
823                    let error = turbomcp_protocol::jsonrpc::JsonRpcError {
824                        code: -32601,
825                        message: "Elicitation not supported - no handler registered".to_string(),
826                        data: None,
827                    };
828                    let response = JsonRpcResponse::error_response(error, request.id);
829                    self.send_response(response).await?;
830                }
831            }
832            _ => {
833                // Unknown method
834                let error = turbomcp_protocol::jsonrpc::JsonRpcError {
835                    code: -32601,
836                    message: format!("Method not found: {}", request.method),
837                    data: None,
838                };
839                let response = JsonRpcResponse::error_response(error, request.id);
840                self.send_response(response).await?;
841            }
842        }
843        Ok(())
844    }
845
846    /// Handle server-initiated notifications
847    ///
848    /// Routes notifications to appropriate handlers based on method name.
849    /// MCP defines several notification types that servers can send to clients:
850    ///
851    /// - `notifications/progress` - Progress updates for long-running operations
852    /// - `notifications/message` - Log messages from server
853    /// - `notifications/resources/updated` - Resource content changed
854    /// - `notifications/resources/list_changed` - Resource list changed
855    async fn handle_notification(&self, notification: JsonRpcNotification) -> Result<()> {
856        match notification.method.as_str() {
857            "notifications/progress" => {
858                // Route to progress handler
859                let handler_opt = self.inner.handlers.lock().get_progress_handler();
860
861                if let Some(handler) = handler_opt {
862                    let progress: crate::handlers::ProgressNotification = serde_json::from_value(
863                        notification.params.unwrap_or(serde_json::Value::Null),
864                    )
865                    .map_err(|e| {
866                        Error::internal(format!("Invalid progress notification: {}", e))
867                    })?;
868
869                    if let Err(e) = handler.handle_progress(progress).await {
870                        tracing::error!("Progress handler error: {}", e);
871                    }
872                } else {
873                    tracing::debug!("Progress notification received (no handler registered)");
874                }
875            }
876
877            "notifications/message" => {
878                // Route to log handler
879                let handler_opt = self.inner.handlers.lock().get_log_handler();
880
881                if let Some(handler) = handler_opt {
882                    // Parse log message
883                    let log: crate::handlers::LoggingNotification = serde_json::from_value(
884                        notification.params.unwrap_or(serde_json::Value::Null),
885                    )
886                    .map_err(|e| Error::internal(format!("Invalid log notification: {}", e)))?;
887
888                    // Call handler
889                    if let Err(e) = handler.handle_log(log).await {
890                        tracing::error!("Log handler error: {}", e);
891                    }
892                } else {
893                    tracing::debug!("Received log notification but no handler registered");
894                }
895            }
896
897            "notifications/resources/updated" => {
898                // Route to resource update handler
899                let handler_opt = self.inner.handlers.lock().get_resource_update_handler();
900
901                if let Some(handler) = handler_opt {
902                    // Parse resource update notification
903                    let update: crate::handlers::ResourceUpdatedNotification =
904                        serde_json::from_value(
905                            notification.params.unwrap_or(serde_json::Value::Null),
906                        )
907                        .map_err(|e| {
908                            Error::internal(format!("Invalid resource update notification: {}", e))
909                        })?;
910
911                    // Call handler
912                    if let Err(e) = handler.handle_resource_update(update).await {
913                        tracing::error!("Resource update handler error: {}", e);
914                    }
915                } else {
916                    tracing::debug!(
917                        "Received resource update notification but no handler registered"
918                    );
919                }
920            }
921
922            "notifications/resources/list_changed" => {
923                // Route to resource list changed handler
924                let handler_opt = self
925                    .inner
926                    .handlers
927                    .lock()
928                    .get_resource_list_changed_handler();
929
930                if let Some(handler) = handler_opt {
931                    if let Err(e) = handler.handle_resource_list_changed().await {
932                        tracing::error!("Resource list changed handler error: {}", e);
933                    }
934                } else {
935                    tracing::debug!(
936                        "Resource list changed notification received (no handler registered)"
937                    );
938                }
939            }
940
941            "notifications/prompts/list_changed" => {
942                // Route to prompt list changed handler
943                let handler_opt = self.inner.handlers.lock().get_prompt_list_changed_handler();
944
945                if let Some(handler) = handler_opt {
946                    if let Err(e) = handler.handle_prompt_list_changed().await {
947                        tracing::error!("Prompt list changed handler error: {}", e);
948                    }
949                } else {
950                    tracing::debug!(
951                        "Prompt list changed notification received (no handler registered)"
952                    );
953                }
954            }
955
956            "notifications/tools/list_changed" => {
957                // Route to tool list changed handler
958                let handler_opt = self.inner.handlers.lock().get_tool_list_changed_handler();
959
960                if let Some(handler) = handler_opt {
961                    if let Err(e) = handler.handle_tool_list_changed().await {
962                        tracing::error!("Tool list changed handler error: {}", e);
963                    }
964                } else {
965                    tracing::debug!(
966                        "Tool list changed notification received (no handler registered)"
967                    );
968                }
969            }
970
971            "notifications/cancelled" => {
972                // Route to cancellation handler
973                let handler_opt = self.inner.handlers.lock().get_cancellation_handler();
974
975                if let Some(handler) = handler_opt {
976                    // Parse cancellation notification
977                    let cancellation: crate::handlers::CancelledNotification =
978                        serde_json::from_value(
979                            notification.params.unwrap_or(serde_json::Value::Null),
980                        )
981                        .map_err(|e| {
982                            Error::internal(format!("Invalid cancellation notification: {}", e))
983                        })?;
984
985                    // Call handler
986                    if let Err(e) = handler.handle_cancellation(cancellation).await {
987                        tracing::error!("Cancellation handler error: {}", e);
988                    }
989                } else {
990                    tracing::debug!("Cancellation notification received (no handler registered)");
991                }
992            }
993
994            _ => {
995                // Unknown notification type
996                tracing::debug!("Received unknown notification: {}", notification.method);
997            }
998        }
999
1000        Ok(())
1001    }
1002
1003    async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
1004        tracing::debug!(
1005            response_id = ?response.id,
1006            "Sending JSON-RPC response"
1007        );
1008
1009        let payload = serde_json::to_vec(&response).map_err(|e| {
1010            tracing::error!("send_response failed to serialize: {e}");
1011            Error::internal(format!("Failed to serialize response: {}", e))
1012        })?;
1013
1014        // Server-initiated responses (sampling/createMessage, elicitation/create)
1015        // can carry sensitive LLM output, user-elicited credentials, or PII.
1016        // Log only size+id at debug; the body is `trace!`-only so operators
1017        // must opt in explicitly via RUST_LOG.
1018        tracing::debug!(
1019            response_id = ?response.id,
1020            payload_bytes = payload.len(),
1021            "Response serialized"
1022        );
1023        tracing::trace!(
1024            response_id = ?response.id,
1025            response_json = %String::from_utf8_lossy(&payload),
1026            "Response payload (trace-only; may contain sensitive data)"
1027        );
1028
1029        let message = TransportMessage::new(
1030            turbomcp_protocol::MessageId::from("response".to_string()),
1031            payload.into(),
1032        );
1033
1034        self.inner
1035            .protocol
1036            .transport()
1037            .send(message)
1038            .await
1039            .map_err(|e| {
1040                tracing::error!("send_response transport send failed: {e}");
1041                Error::transport(format!("Failed to send response: {}", e))
1042            })?;
1043
1044        tracing::debug!(response_id = ?response.id, "Response sent");
1045        Ok(())
1046    }
1047
1048    /// Initialize the connection with the MCP server
1049    ///
1050    /// Performs the initialization handshake with the server, negotiating capabilities
1051    /// and establishing the protocol version. This method must be called before
1052    /// any other operations can be performed.
1053    ///
1054    /// # Returns
1055    ///
1056    /// Returns an `InitializeResult` containing server information and negotiated capabilities.
1057    ///
1058    /// # Errors
1059    ///
1060    /// Returns an error if:
1061    /// - The transport connection fails
1062    /// - The server rejects the initialization request
1063    /// - Protocol negotiation fails
1064    ///
1065    /// # Examples
1066    ///
1067    /// ```rust,no_run
1068    /// # use turbomcp_client::Client;
1069    /// # use turbomcp_transport::stdio::StdioTransport;
1070    /// # async fn example() -> turbomcp_protocol::Result<()> {
1071    /// let mut client = Client::new(StdioTransport::new());
1072    ///
1073    /// let result = client.initialize().await?;
1074    /// println!("Server: {} v{}", result.server_info.name, result.server_info.version);
1075    /// # Ok(())
1076    /// # }
1077    /// ```
1078    pub async fn initialize(&self) -> Result<InitializeResult> {
1079        // Build client capabilities based on registered handlers (automatic detection)
1080        let mut client_caps = ProtocolClientCapabilities::default();
1081
1082        // Detect sampling capability from handler
1083        if let Some(sampling_caps) = self.get_sampling_capabilities() {
1084            client_caps.sampling = Some(sampling_caps);
1085        }
1086
1087        // Detect elicitation capability from handler
1088        if let Some(elicitation_caps) = self.get_elicitation_capabilities() {
1089            client_caps.elicitation = Some(elicitation_caps);
1090        }
1091
1092        // Detect roots capability from handler
1093        if let Some(roots_caps) = self.get_roots_capabilities() {
1094            client_caps.roots = Some(roots_caps);
1095        }
1096
1097        let request = InitializeRequest {
1098            protocol_version: PROTOCOL_VERSION.into(),
1099            capabilities: client_caps,
1100            client_info: turbomcp_protocol::types::Implementation {
1101                name: "turbomcp-client".to_string(),
1102                version: env!("CARGO_PKG_VERSION").to_string(),
1103                title: Some("TurboMCP Client".to_string()),
1104                ..Default::default()
1105            },
1106            meta: None,
1107        };
1108
1109        self.initialize_with_request(request).await
1110    }
1111
1112    /// Whether [`Client::initialize`] has completed for this client.
1113    ///
1114    /// Lock-free; safe to call from any task. Useful for callers that
1115    /// previously had to pattern-match on
1116    /// `Error::invalid_request("Client not initialized")` to detect the
1117    /// initialized state.
1118    #[must_use]
1119    pub fn is_initialized(&self) -> bool {
1120        self.inner.initialized.load(Ordering::Relaxed)
1121    }
1122
1123    /// Initialize the MCP session with an explicit initialize request.
1124    ///
1125    /// This is the opt-in path for draft protocol versions and capability
1126    /// fields such as official MCP `extensions`.
1127    pub async fn initialize_with_request(
1128        &self,
1129        request: InitializeRequest,
1130    ) -> Result<InitializeResult> {
1131        // Auto-connect transport if not already connected
1132        // This provides consistent DX across all transports (Stdio, TCP, HTTP, WebSocket, Unix)
1133        let transport = self.inner.protocol.transport();
1134        let transport_state = transport.state().await;
1135        if !matches!(
1136            transport_state,
1137            turbomcp_transport::TransportState::Connected
1138        ) {
1139            tracing::debug!(
1140                "Auto-connecting transport (current state: {:?})",
1141                transport_state
1142            );
1143            transport
1144                .connect()
1145                .await
1146                .map_err(|e| Error::transport(format!("Failed to connect transport: {}", e)))?;
1147            tracing::info!("Transport connected successfully");
1148        }
1149
1150        let protocol_response: ProtocolInitializeResult = self
1151            .inner
1152            .protocol
1153            .request("initialize", Some(serde_json::to_value(request)?))
1154            .await?;
1155
1156        // AtomicBool: lock-free store with Ordering::Relaxed
1157        self.inner.initialized.store(true, Ordering::Relaxed);
1158
1159        // Send initialized notification
1160        self.inner
1161            .protocol
1162            .notify("notifications/initialized", None)
1163            .await?;
1164
1165        // Convert protocol response to client response type
1166        Ok(InitializeResult {
1167            server_info: protocol_response.server_info,
1168            server_capabilities: protocol_response.capabilities,
1169        })
1170    }
1171
1172    /// Subscribe to resource change notifications
1173    ///
1174    /// Registers interest in receiving notifications when the specified
1175    /// resource changes. The server will send notifications when the
1176    /// resource is modified, created, or deleted.
1177    ///
1178    /// # Arguments
1179    ///
1180    /// * `uri` - The URI of the resource to monitor
1181    ///
1182    /// # Returns
1183    ///
1184    /// Returns `EmptyResult` on successful subscription.
1185    ///
1186    /// # Errors
1187    ///
1188    /// Returns an error if:
1189    /// - The client is not initialized
1190    /// - The URI is invalid or empty
1191    /// - The server doesn't support subscriptions
1192    /// - The request fails
1193    ///
1194    /// # Examples
1195    ///
1196    /// ```rust,no_run
1197    /// # use turbomcp_client::Client;
1198    /// # use turbomcp_transport::stdio::StdioTransport;
1199    /// # async fn example() -> turbomcp_protocol::Result<()> {
1200    /// let mut client = Client::new(StdioTransport::new());
1201    /// client.initialize().await?;
1202    ///
1203    /// // Subscribe to file changes
1204    /// client.subscribe("file:///watch/directory").await?;
1205    /// println!("Subscribed to resource changes");
1206    /// # Ok(())
1207    /// # }
1208    /// ```
1209    pub async fn subscribe(&self, uri: &str) -> Result<EmptyResult> {
1210        if !self.inner.initialized.load(Ordering::Relaxed) {
1211            return Err(Error::invalid_request("Client not initialized"));
1212        }
1213
1214        if uri.is_empty() {
1215            return Err(Error::invalid_request("Subscription URI cannot be empty"));
1216        }
1217
1218        // Send resources/subscribe request
1219        let request = SubscribeRequest {
1220            uri: uri.into(),
1221            _meta: None,
1222        };
1223
1224        self.inner
1225            .protocol
1226            .request(
1227                "resources/subscribe",
1228                Some(serde_json::to_value(request).map_err(|e| {
1229                    Error::internal(format!("Failed to serialize subscribe request: {}", e))
1230                })?),
1231            )
1232            .await
1233    }
1234
1235    /// Unsubscribe from resource change notifications
1236    ///
1237    /// Cancels a previous subscription to resource changes. After unsubscribing,
1238    /// the client will no longer receive notifications for the specified resource.
1239    ///
1240    /// # Arguments
1241    ///
1242    /// * `uri` - The URI of the resource to stop monitoring
1243    ///
1244    /// # Returns
1245    ///
1246    /// Returns `EmptyResult` on successful unsubscription.
1247    ///
1248    /// # Errors
1249    ///
1250    /// Returns an error if:
1251    /// - The client is not initialized
1252    /// - The URI is invalid or empty
1253    /// - No active subscription exists for the URI
1254    /// - The request fails
1255    ///
1256    /// # Examples
1257    ///
1258    /// ```rust,no_run
1259    /// # use turbomcp_client::Client;
1260    /// # use turbomcp_transport::stdio::StdioTransport;
1261    /// # async fn example() -> turbomcp_protocol::Result<()> {
1262    /// let mut client = Client::new(StdioTransport::new());
1263    /// client.initialize().await?;
1264    ///
1265    /// // Unsubscribe from file changes
1266    /// client.unsubscribe("file:///watch/directory").await?;
1267    /// println!("Unsubscribed from resource changes");
1268    /// # Ok(())
1269    /// # }
1270    /// ```
1271    pub async fn unsubscribe(&self, uri: &str) -> Result<EmptyResult> {
1272        if !self.inner.initialized.load(Ordering::Relaxed) {
1273            return Err(Error::invalid_request("Client not initialized"));
1274        }
1275
1276        if uri.is_empty() {
1277            return Err(Error::invalid_request("Unsubscription URI cannot be empty"));
1278        }
1279
1280        // Send resources/unsubscribe request
1281        let request = UnsubscribeRequest {
1282            uri: uri.into(),
1283            _meta: None,
1284        };
1285
1286        self.inner
1287            .protocol
1288            .request(
1289                "resources/unsubscribe",
1290                Some(serde_json::to_value(request).map_err(|e| {
1291                    Error::internal(format!("Failed to serialize unsubscribe request: {}", e))
1292                })?),
1293            )
1294            .await
1295    }
1296
1297    /// Get the client's capabilities configuration
1298    #[must_use]
1299    pub fn capabilities(&self) -> &ClientCapabilities {
1300        &self.inner.capabilities
1301    }
1302
1303    // ============================================================================
1304    // Tasks API Methods (MCP 2025-11-25 Draft - SEP-1686)
1305    // ============================================================================
1306
1307    /// Retrieve the status of a task (tasks/get)
1308    ///
1309    /// Polls the server for the current status of a specific task.
1310    ///
1311    /// # Arguments
1312    ///
1313    /// * `task_id` - The ID of the task to query
1314    ///
1315    /// # Returns
1316    ///
1317    /// Returns the current `Task` state including status, timestamps, and messages.
1318    #[cfg(feature = "experimental-tasks")]
1319    pub async fn get_task(&self, task_id: &str) -> Result<Task> {
1320        if !self.inner.initialized.load(Ordering::Relaxed) {
1321            return Err(Error::invalid_request("Client not initialized"));
1322        }
1323        let request = GetTaskRequest {
1324            task_id: task_id.to_string(),
1325        };
1326
1327        self.inner
1328            .protocol
1329            .request(
1330                "tasks/get",
1331                Some(serde_json::to_value(request).map_err(|e| {
1332                    Error::internal(format!("Failed to serialize get_task request: {}", e))
1333                })?),
1334            )
1335            .await
1336    }
1337
1338    /// Cancel a running task (tasks/cancel)
1339    ///
1340    /// Attempts to cancel a task execution. This is a best-effort operation.
1341    ///
1342    /// # Arguments
1343    ///
1344    /// * `task_id` - The ID of the task to cancel
1345    ///
1346    /// # Returns
1347    ///
1348    /// Returns the updated `Task` state (typically with status "cancelled").
1349    #[cfg(feature = "experimental-tasks")]
1350    pub async fn cancel_task(&self, task_id: &str) -> Result<Task> {
1351        if !self.inner.initialized.load(Ordering::Relaxed) {
1352            return Err(Error::invalid_request("Client not initialized"));
1353        }
1354        let request = CancelTaskRequest {
1355            task_id: task_id.to_string(),
1356        };
1357
1358        self.inner
1359            .protocol
1360            .request(
1361                "tasks/cancel",
1362                Some(serde_json::to_value(request).map_err(|e| {
1363                    Error::internal(format!("Failed to serialize cancel_task request: {}", e))
1364                })?),
1365            )
1366            .await
1367    }
1368
1369    /// List all tasks (tasks/list)
1370    ///
1371    /// Retrieves a paginated list of tasks known to the server.
1372    ///
1373    /// # Arguments
1374    ///
1375    /// * `cursor` - Optional pagination cursor from a previous response
1376    /// * `limit` - Optional maximum number of tasks to return
1377    ///
1378    /// # Returns
1379    ///
1380    /// Returns a `ListTasksResult` containing the list of tasks and next cursor.
1381    #[cfg(feature = "experimental-tasks")]
1382    pub async fn list_tasks(
1383        &self,
1384        cursor: Option<String>,
1385        limit: Option<usize>,
1386    ) -> Result<ListTasksResult> {
1387        if !self.inner.initialized.load(Ordering::Relaxed) {
1388            return Err(Error::invalid_request("Client not initialized"));
1389        }
1390        let request = ListTasksRequest { cursor, limit };
1391
1392        self.inner
1393            .protocol
1394            .request(
1395                "tasks/list",
1396                Some(serde_json::to_value(request).map_err(|e| {
1397                    Error::internal(format!("Failed to serialize list_tasks request: {}", e))
1398                })?),
1399            )
1400            .await
1401    }
1402
1403    /// Retrieve the result of a completed task (tasks/result)
1404    ///
1405    /// Blocks until the task reaches a terminal state (completed, failed, or cancelled),
1406    /// then returns the operation result.
1407    ///
1408    /// # Arguments
1409    ///
1410    /// * `task_id` - The ID of the task to retrieve results for
1411    ///
1412    /// # Returns
1413    ///
1414    /// Returns a `GetTaskPayloadResult` containing the operation result (e.g. CallToolResult).
1415    #[cfg(feature = "experimental-tasks")]
1416    pub async fn get_task_result(&self, task_id: &str) -> Result<GetTaskPayloadResult> {
1417        if !self.inner.initialized.load(Ordering::Relaxed) {
1418            return Err(Error::invalid_request("Client not initialized"));
1419        }
1420        let request = GetTaskPayloadRequest {
1421            task_id: task_id.to_string(),
1422        };
1423
1424        self.inner
1425            .protocol
1426            .request(
1427                "tasks/result",
1428                Some(serde_json::to_value(request).map_err(|e| {
1429                    Error::internal(format!(
1430                        "Failed to serialize get_task_result request: {}",
1431                        e
1432                    ))
1433                })?),
1434            )
1435            .await
1436    }
1437
1438    // Note: Capability detection methods (has_*_handler, get_*_capabilities)
1439    // are defined in their respective operation modules:
1440    // - sampling.rs: has_sampling_handler, get_sampling_capabilities
1441    // - handlers.rs: has_elicitation_handler, has_roots_handler
1442    //
1443    // Additional capability getters for elicitation and roots added below
1444    // since they're used during initialization
1445
1446    /// Get elicitation capabilities if handler is registered
1447    /// Automatically detects capability based on registered handler
1448    fn get_elicitation_capabilities(
1449        &self,
1450    ) -> Option<turbomcp_protocol::types::ElicitationCapabilities> {
1451        if self.has_elicitation_handler() {
1452            // Currently returns default capabilities. In the future, schema_validation support
1453            // could be detected from handler traits by adding a HasSchemaValidation marker trait
1454            // that handlers could implement. For now, handlers validate schemas themselves.
1455            Some(turbomcp_protocol::types::ElicitationCapabilities::default())
1456        } else {
1457            None
1458        }
1459    }
1460
1461    /// Get roots capabilities if handler is registered
1462    fn get_roots_capabilities(&self) -> Option<turbomcp_protocol::types::RootsCapabilities> {
1463        if self.has_roots_handler() {
1464            // Roots capabilities indicate whether list can change
1465            Some(turbomcp_protocol::types::RootsCapabilities {
1466                list_changed: Some(true), // Support dynamic roots by default
1467            })
1468        } else {
1469            None
1470        }
1471    }
1472}
1473
1474#[cfg(test)]
1475mod tests {
1476    use super::*;
1477    use std::future::Future;
1478    use std::pin::Pin;
1479    use turbomcp_transport::{
1480        TransportCapabilities, TransportConfig, TransportMessage, TransportMetrics,
1481        TransportResult, TransportState, TransportType,
1482    };
1483
1484    #[derive(Debug, Default)]
1485    struct NoopTransport {
1486        capabilities: TransportCapabilities,
1487    }
1488
1489    impl Transport for NoopTransport {
1490        fn transport_type(&self) -> TransportType {
1491            TransportType::Stdio
1492        }
1493
1494        fn capabilities(&self) -> &TransportCapabilities {
1495            &self.capabilities
1496        }
1497
1498        fn state(&self) -> Pin<Box<dyn Future<Output = TransportState> + Send + '_>> {
1499            Box::pin(async { TransportState::Disconnected })
1500        }
1501
1502        fn connect(&self) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + '_>> {
1503            Box::pin(async { Ok(()) })
1504        }
1505
1506        fn disconnect(&self) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + '_>> {
1507            Box::pin(async { Ok(()) })
1508        }
1509
1510        fn send(
1511            &self,
1512            _message: TransportMessage,
1513        ) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + '_>> {
1514            Box::pin(async { Ok(()) })
1515        }
1516
1517        fn receive(
1518            &self,
1519        ) -> Pin<Box<dyn Future<Output = TransportResult<Option<TransportMessage>>> + Send + '_>>
1520        {
1521            Box::pin(async { Ok(None) })
1522        }
1523
1524        fn metrics(&self) -> Pin<Box<dyn Future<Output = TransportMetrics> + Send + '_>> {
1525            Box::pin(async { TransportMetrics::default() })
1526        }
1527
1528        fn configure(
1529            &self,
1530            _config: TransportConfig,
1531        ) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + '_>> {
1532            Box::pin(async { Ok(()) })
1533        }
1534    }
1535
1536    #[tokio::test]
1537    async fn test_with_capabilities_and_config_uses_handler_limit() {
1538        let capabilities = ClientCapabilities {
1539            max_concurrent_handlers: 7,
1540            ..Default::default()
1541        };
1542
1543        let client = Client::with_capabilities_and_config(
1544            NoopTransport::default(),
1545            capabilities,
1546            TransportConfig::default(),
1547        );
1548
1549        assert_eq!(client.inner.handler_semaphore.available_permits(), 7);
1550    }
1551
1552    #[tokio::test]
1553    async fn test_shutdown_sets_shutdown_flag() {
1554        let client = Client::new(NoopTransport::default());
1555        assert!(!client.inner.shutdown_requested.load(Ordering::Relaxed));
1556
1557        client.shutdown().await.expect("shutdown should succeed");
1558
1559        assert!(client.inner.shutdown_requested.load(Ordering::Relaxed));
1560    }
1561}