Skip to main content

turbomcp_client/client/
core.rs

1//! Core Client implementation for MCP communication
2//!
3//! This module contains the main `Client<T>` struct and its implementation,
4//! providing the core MCP client functionality including:
5//!
6//! - Connection initialization and lifecycle management
7//! - Message processing and bidirectional communication
8//! - MCP operation support (tools, prompts, resources, sampling, etc.)
9//! - Plugin middleware integration
10//! - Handler registration and management
11//!
12//! # Architecture
13//!
14//! `Client<T>` is implemented as a cheaply-cloneable Arc wrapper with interior
15//! mutability (same pattern as reqwest and AWS SDK):
16//!
17//! - **AtomicBool** for initialized flag (lock-free)
18//! - **Arc<Mutex<...>>** for handlers/plugins (infrequent mutation)
19//! - **`Arc<ClientInner<T>>`** for cheap cloning
20
21use parking_lot::Mutex;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, Ordering};
24
25use tokio::sync::Semaphore;
26
27use turbomcp_protocol::jsonrpc::*;
28#[cfg(feature = "experimental-tasks")]
29use turbomcp_protocol::types::tasks::*;
30use turbomcp_protocol::types::{
31    ClientCapabilities as ProtocolClientCapabilities, InitializeResult as ProtocolInitializeResult,
32    *,
33};
34use turbomcp_protocol::{Error, PROTOCOL_VERSION, Result};
35use turbomcp_transport::{Transport, TransportMessage};
36
37use super::config::InitializeResult;
38use super::protocol::ProtocolClient;
39use crate::{
40    ClientCapabilities,
41    handlers::{HandlerError, HandlerRegistry},
42    sampling::SamplingHandler,
43};
44
45/// Inner client state with interior mutability
46///
47/// This structure contains the actual client state and is wrapped in Arc<...>
48/// to enable cheap cloning (same pattern as reqwest and AWS SDK).
49pub(super) struct ClientInner<T: Transport + 'static> {
50    /// Protocol client for low-level communication
51    pub(super) protocol: ProtocolClient<T>,
52
53    /// Client capabilities (immutable after construction)
54    pub(super) capabilities: ClientCapabilities,
55
56    /// Initialization state (lock-free atomic boolean)
57    pub(super) initialized: AtomicBool,
58
59    /// Optional sampling handler (mutex for dynamic updates)
60    pub(super) sampling_handler: Arc<Mutex<Option<Arc<dyn SamplingHandler>>>>,
61
62    /// Handler registry for bidirectional communication (mutex for registration)
63    pub(super) handlers: Arc<Mutex<HandlerRegistry>>,
64
65    /// ✅ Semaphore for bounded concurrency of request/notification handlers
66    /// Limits concurrent server-initiated request handlers to prevent resource exhaustion
67    pub(super) handler_semaphore: Arc<Semaphore>,
68}
69
70/// The core MCP client implementation
71///
72/// Client provides a comprehensive interface for communicating with MCP servers,
73/// supporting all protocol features including tools, prompts, resources, sampling,
74/// elicitation, and bidirectional communication patterns.
75///
76/// # Clone Pattern
77///
78/// `Client<T>` is cheaply cloneable via Arc (same pattern as reqwest and AWS SDK).
79/// All clones share the same underlying connection and state:
80///
81/// ```rust,no_run
82/// use turbomcp_client::Client;
83/// use turbomcp_transport::stdio::StdioTransport;
84///
85/// # async fn example() -> turbomcp_protocol::Result<()> {
86/// let client = Client::new(StdioTransport::new());
87/// client.initialize().await?;
88///
89/// // Cheap clone - shares same connection
90/// let client2 = client.clone();
91/// tokio::spawn(async move {
92///     client2.list_tools().await.ok();
93/// });
94/// # Ok(())
95/// # }
96/// ```
97///
98/// The client must be initialized before use by calling `initialize()` to perform
99/// the MCP handshake and capability negotiation.
100///
101/// # Features
102///
103/// - **Protocol Compliance**: Full MCP 2025-06-18 specification support
104/// - **Bidirectional Communication**: Server-initiated requests and client responses
105/// - **Plugin Middleware**: Extensible request/response processing
106/// - **Handler Registry**: Callbacks for server-initiated operations
107/// - **Connection Management**: Robust error handling and recovery
108/// - **Type Safety**: Compile-time guarantees for MCP message types
109/// - **Cheap Cloning**: Arc-based sharing like reqwest/AWS SDK
110///
111/// # Examples
112///
113/// ```rust,no_run
114/// use turbomcp_client::Client;
115/// use turbomcp_transport::stdio::StdioTransport;
116/// use std::collections::HashMap;
117///
118/// # async fn example() -> turbomcp_protocol::Result<()> {
119/// // Create and initialize client (no mut needed!)
120/// let client = Client::new(StdioTransport::new());
121/// let init_result = client.initialize().await?;
122/// println!("Connected to: {}", init_result.server_info.name);
123///
124/// // Use MCP operations
125/// let tools = client.list_tools().await?;
126/// let mut args = HashMap::new();
127/// args.insert("input".to_string(), serde_json::json!("test"));
128/// let result = client.call_tool("my_tool", Some(args), None).await?;
129/// # Ok(())
130/// # }
131/// ```
132pub struct Client<T: Transport + 'static> {
133    pub(super) inner: Arc<ClientInner<T>>,
134}
135
136/// Clone implementation via Arc (same pattern as reqwest/AWS SDK)
137///
138/// Cloning a Client is cheap (just an Arc clone) and all clones share
139/// the same underlying connection and state.
140impl<T: Transport + 'static> Clone for Client<T> {
141    fn clone(&self) -> Self {
142        Self {
143            inner: Arc::clone(&self.inner),
144        }
145    }
146}
147
148impl<T: Transport + 'static> Drop for ClientInner<T> {
149    fn drop(&mut self) {
150        // Best-effort cleanup if shutdown() wasn't called
151        // This is a safety net, but applications SHOULD call shutdown() explicitly
152
153        // Single clear warning
154        tracing::warn!(
155            "MCP Client dropped without explicit shutdown(). \
156             Call client.shutdown().await before dropping for clean resource cleanup. \
157             Background tasks (WebSocket reconnection) may continue running."
158        );
159
160        // We can shutdown the dispatcher (it's synchronous)
161        self.protocol.dispatcher().shutdown();
162    }
163}
164
165impl<T: Transport + 'static> Client<T> {
166    /// Create a new client with the specified transport
167    ///
168    /// Creates a new MCP client instance with default capabilities.
169    /// The client must be initialized before use by calling `initialize()`.
170    ///
171    /// # Arguments
172    ///
173    /// * `transport` - The transport implementation to use for communication
174    ///
175    /// # Examples
176    ///
177    /// ```rust,no_run
178    /// use turbomcp_client::Client;
179    /// use turbomcp_transport::stdio::StdioTransport;
180    ///
181    /// let transport = StdioTransport::new();
182    /// let client = Client::new(transport);
183    /// ```
184    pub fn new(transport: T) -> Self {
185        let capabilities = ClientCapabilities::default();
186        let client = Self {
187            inner: Arc::new(ClientInner {
188                protocol: ProtocolClient::new(transport),
189                capabilities: capabilities.clone(),
190                initialized: AtomicBool::new(false),
191                sampling_handler: Arc::new(Mutex::new(None)),
192                handlers: Arc::new(Mutex::new(HandlerRegistry::new())),
193                handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), // ✅ Configurable concurrent handlers
194            }),
195        };
196
197        // Register dispatcher handlers for bidirectional communication
198        client.register_dispatcher_handlers();
199
200        client
201    }
202
203    /// Create a new client with custom capabilities
204    ///
205    /// # Arguments
206    ///
207    /// * `transport` - The transport implementation to use
208    /// * `capabilities` - The client capabilities to negotiate
209    ///
210    /// # Examples
211    ///
212    /// ```rust,no_run
213    /// use turbomcp_client::{Client, ClientCapabilities};
214    /// use turbomcp_transport::stdio::StdioTransport;
215    ///
216    /// let capabilities = ClientCapabilities {
217    ///     tools: true,
218    ///     prompts: true,
219    ///     resources: false,
220    ///     sampling: false,
221    ///     max_concurrent_handlers: 100,
222    /// };
223    ///
224    /// let transport = StdioTransport::new();
225    /// let client = Client::with_capabilities(transport, capabilities);
226    /// ```
227    pub fn with_capabilities(transport: T, capabilities: ClientCapabilities) -> Self {
228        let client = Self {
229            inner: Arc::new(ClientInner {
230                protocol: ProtocolClient::new(transport),
231                capabilities: capabilities.clone(),
232                initialized: AtomicBool::new(false),
233                sampling_handler: Arc::new(Mutex::new(None)),
234                handlers: Arc::new(Mutex::new(HandlerRegistry::new())),
235                handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), // ✅ Configurable concurrent handlers
236            }),
237        };
238
239        // Register dispatcher handlers for bidirectional communication
240        client.register_dispatcher_handlers();
241
242        client
243    }
244
245    /// Shutdown the client and clean up all resources
246    ///
247    /// This method performs a **graceful shutdown** of the MCP client by:
248    /// 1. Stopping the message dispatcher background task
249    /// 2. Disconnecting the transport (closes connection, stops background tasks)
250    ///
251    /// **CRITICAL**: After calling `shutdown()`, the client can no longer be used.
252    ///
253    /// # Why This Method Exists
254    ///
255    /// The `Drop` implementation cannot call async methods like `transport.disconnect()`,
256    /// which is required for proper cleanup of WebSocket connections and background tasks.
257    /// Without calling `shutdown()`, WebSocket reconnection tasks will continue running.
258    ///
259    /// # Best Practices
260    ///
261    /// - **Always call `shutdown()` before dropping** for clean resource cleanup
262    /// - For applications: call in signal handlers (`SIGINT`, `SIGTERM`)
263    /// - For tests: call in cleanup/teardown
264    /// - If forgotten, Drop will log warnings and do best-effort cleanup
265    ///
266    /// # Examples
267    ///
268    /// ```rust,no_run
269    /// # use turbomcp_client::Client;
270    /// # use turbomcp_transport::stdio::StdioTransport;
271    /// # async fn example() -> turbomcp_protocol::Result<()> {
272    /// let client = Client::new(StdioTransport::new());
273    /// client.initialize().await?;
274    ///
275    /// // Use client...
276    /// let _tools = client.list_tools().await?;
277    ///
278    /// // ✅ Clean shutdown
279    /// client.shutdown().await?;
280    /// # Ok(())
281    /// # }
282    /// ```
283    pub async fn shutdown(&self) -> Result<()> {
284        tracing::info!("🛑 Shutting down MCP client");
285
286        // 1. Shutdown message dispatcher
287        self.inner.protocol.dispatcher().shutdown();
288        tracing::debug!("✅ Message dispatcher stopped");
289
290        // 2. Disconnect transport (WebSocket: stops reconnection, HTTP: closes connections)
291        match self.inner.protocol.transport().disconnect().await {
292            Ok(()) => {
293                tracing::info!("✅ Transport disconnected successfully");
294            }
295            Err(e) => {
296                tracing::warn!("Transport disconnect error (may already be closed): {}", e);
297            }
298        }
299
300        tracing::info!("✅ MCP client shutdown complete");
301        Ok(())
302    }
303}
304
305// ============================================================================
306// HTTP-Specific Convenience Constructors (Feature-Gated)
307// ============================================================================
308
309#[cfg(feature = "http")]
310impl Client<turbomcp_transport::streamable_http_client::StreamableHttpClientTransport> {
311    /// Connect to an HTTP MCP server (convenience method)
312    ///
313    /// This is a convenient one-liner alternative to manual configuration.
314    /// Creates an HTTP client, connects, and initializes in one call.
315    ///
316    /// # Arguments
317    ///
318    /// * `url` - The base URL of the MCP server (e.g., "http://localhost:8080")
319    ///
320    /// # Returns
321    ///
322    /// Returns an initialized `Client` ready to use.
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if:
327    /// - The URL is invalid
328    /// - Connection to the server fails
329    /// - Initialization handshake fails
330    ///
331    /// # Examples
332    ///
333    /// ```rust,no_run
334    /// use turbomcp_client::Client;
335    ///
336    /// # async fn example() -> turbomcp_protocol::Result<()> {
337    /// // Convenient one-liner - balanced with server DX
338    /// let client = Client::connect_http("http://localhost:8080").await?;
339    ///
340    /// // Now use it directly
341    /// let tools = client.list_tools().await?;
342    /// # Ok(())
343    /// # }
344    /// ```
345    ///
346    /// Compare to verbose approach (10+ lines):
347    /// ```rust,no_run
348    /// use turbomcp_client::Client;
349    /// use turbomcp_transport::streamable_http_client::{
350    ///     StreamableHttpClientConfig, StreamableHttpClientTransport
351    /// };
352    ///
353    /// # async fn example() -> turbomcp_protocol::Result<()> {
354    /// let config = StreamableHttpClientConfig {
355    ///     base_url: "http://localhost:8080".to_string(),
356    ///     ..Default::default()
357    /// };
358    /// let transport = StreamableHttpClientTransport::new(config);
359    /// let client = Client::new(transport);
360    /// client.initialize().await?;
361    /// # Ok(())
362    /// # }
363    /// ```
364    pub async fn connect_http(url: impl Into<String>) -> Result<Self> {
365        use turbomcp_transport::streamable_http_client::{
366            StreamableHttpClientConfig, StreamableHttpClientTransport,
367        };
368
369        let config = StreamableHttpClientConfig {
370            base_url: url.into(),
371            ..Default::default()
372        };
373
374        let transport = StreamableHttpClientTransport::new(config);
375        let client = Self::new(transport);
376
377        // Initialize connection immediately
378        client.initialize().await?;
379
380        Ok(client)
381    }
382
383    /// Connect to an HTTP MCP server with custom configuration
384    ///
385    /// Provides more control than `connect_http()` while still being ergonomic.
386    ///
387    /// # Arguments
388    ///
389    /// * `url` - The base URL of the MCP server
390    /// * `config_fn` - Function to customize the configuration
391    ///
392    /// # Examples
393    ///
394    /// ```rust,no_run
395    /// use turbomcp_client::Client;
396    /// use std::time::Duration;
397    ///
398    /// # async fn example() -> turbomcp_protocol::Result<()> {
399    /// let client = Client::connect_http_with("http://localhost:8080", |config| {
400    ///     config.timeout = Duration::from_secs(60);
401    ///     config.endpoint_path = "/api/mcp".to_string();
402    /// }).await?;
403    /// # Ok(())
404    /// # }
405    /// ```
406    pub async fn connect_http_with<F>(url: impl Into<String>, config_fn: F) -> Result<Self>
407    where
408        F: FnOnce(&mut turbomcp_transport::streamable_http_client::StreamableHttpClientConfig),
409    {
410        use turbomcp_transport::streamable_http_client::{
411            StreamableHttpClientConfig, StreamableHttpClientTransport,
412        };
413
414        let mut config = StreamableHttpClientConfig {
415            base_url: url.into(),
416            ..Default::default()
417        };
418
419        config_fn(&mut config);
420
421        let transport = StreamableHttpClientTransport::new(config);
422        let client = Self::new(transport);
423
424        client.initialize().await?;
425
426        Ok(client)
427    }
428}
429
430// ============================================================================
431// TCP-Specific Convenience Constructors (Feature-Gated)
432// ============================================================================
433
434#[cfg(feature = "tcp")]
435impl Client<turbomcp_transport::tcp::TcpTransport> {
436    /// Connect to a TCP MCP server (convenience method)
437    ///
438    /// Convenient one-liner for TCP connections - balanced DX.
439    ///
440    /// # Arguments
441    ///
442    /// * `addr` - Server address (e.g., "127.0.0.1:8765" or localhost:8765")
443    ///
444    /// # Returns
445    ///
446    /// Returns an initialized `Client` ready to use.
447    ///
448    /// # Examples
449    ///
450    /// ```rust,no_run
451    /// # #[cfg(feature = "tcp")]
452    /// use turbomcp_client::Client;
453    ///
454    /// # async fn example() -> turbomcp_protocol::Result<()> {
455    /// let client = Client::connect_tcp("127.0.0.1:8765").await?;
456    /// let tools = client.list_tools().await?;
457    /// # Ok(())
458    /// # }
459    /// ```
460    pub async fn connect_tcp(addr: impl AsRef<str>) -> Result<Self> {
461        use std::net::SocketAddr;
462        use turbomcp_transport::tcp::TcpTransport;
463
464        let server_addr: SocketAddr = addr
465            .as_ref()
466            .parse()
467            .map_err(|e| Error::invalid_request(format!("Invalid address: {}", e)))?;
468
469        // Client binds to 0.0.0.0:0 (any available port)
470        let bind_addr: SocketAddr = if server_addr.is_ipv6() {
471            "[::]:0".parse().expect("valid IPv6 any-port address")
472        } else {
473            "0.0.0.0:0".parse().expect("valid IPv4 any-port address")
474        };
475
476        let transport = TcpTransport::new_client(bind_addr, server_addr);
477        let client = Self::new(transport);
478
479        client.initialize().await?;
480
481        Ok(client)
482    }
483}
484
485// ============================================================================
486// Unix Socket-Specific Convenience Constructors (Feature-Gated)
487// ============================================================================
488
489#[cfg(all(unix, feature = "unix"))]
490impl Client<turbomcp_transport::unix::UnixTransport> {
491    /// Connect to a Unix socket MCP server (convenience method)
492    ///
493    /// Convenient one-liner for Unix socket IPC - balanced DX.
494    ///
495    /// # Arguments
496    ///
497    /// * `path` - Socket file path (e.g., "/tmp/mcp.sock")
498    ///
499    /// # Returns
500    ///
501    /// Returns an initialized `Client` ready to use.
502    ///
503    /// # Examples
504    ///
505    /// ```rust,no_run
506    /// # #[cfg(all(unix, feature = "unix"))]
507    /// use turbomcp_client::Client;
508    ///
509    /// # async fn example() -> turbomcp_protocol::Result<()> {
510    /// let client = Client::connect_unix("/tmp/mcp.sock").await?;
511    /// let tools = client.list_tools().await?;
512    /// # Ok(())
513    /// # }
514    /// ```
515    pub async fn connect_unix(path: impl Into<std::path::PathBuf>) -> Result<Self> {
516        use turbomcp_transport::unix::UnixTransport;
517
518        let transport = UnixTransport::new_client(path.into());
519        let client = Self::new(transport);
520
521        client.initialize().await?;
522
523        Ok(client)
524    }
525}
526
527impl<T: Transport + 'static> Client<T> {
528    /// Register message handlers with the dispatcher
529    ///
530    /// This method sets up the callbacks that handle server-initiated requests
531    /// and notifications. The dispatcher's background task routes incoming
532    /// messages to these handlers.
533    ///
534    /// This is called automatically during Client construction (in `new()` and
535    /// `with_capabilities()`), so you don't need to call it manually.
536    ///
537    /// ## How It Works
538    ///
539    /// The handlers are synchronous closures that spawn async tasks to do the
540    /// actual work. This allows the dispatcher to continue routing messages
541    /// without blocking on handler execution.
542    fn register_dispatcher_handlers(&self) {
543        let dispatcher = self.inner.protocol.dispatcher();
544        let client_for_requests = self.clone();
545        let client_for_notifications = self.clone();
546
547        // Request handler (elicitation, sampling, etc.)
548        let semaphore = Arc::clone(&self.inner.handler_semaphore);
549        let request_handler = Arc::new(move |request: JsonRpcRequest| {
550            let client = client_for_requests.clone();
551            let method = request.method.clone();
552            let req_id = request.id.clone();
553            let semaphore = Arc::clone(&semaphore);
554
555            // ✅ Spawn async task with bounded concurrency
556            tokio::spawn(async move {
557                // Acquire permit (blocks if 100 requests already in flight)
558                let _permit = match semaphore.acquire().await {
559                    Ok(permit) => permit,
560                    Err(_) => {
561                        tracing::warn!(
562                            "Handler semaphore closed, dropping request: method={}",
563                            method
564                        );
565                        return;
566                    }
567                };
568
569                tracing::debug!(
570                    "🔄 [request_handler] Handling server-initiated request: method={}, id={:?}",
571                    method,
572                    req_id
573                );
574                if let Err(e) = client.handle_request(request).await {
575                    tracing::error!(
576                        "❌ [request_handler] Error handling server request '{}': {}",
577                        method,
578                        e
579                    );
580                    tracing::error!("   Request ID: {:?}", req_id);
581                    tracing::error!("   Error kind: {:?}", e.kind);
582                } else {
583                    tracing::debug!(
584                        "✅ [request_handler] Successfully handled server request: method={}, id={:?}",
585                        method,
586                        req_id
587                    );
588                }
589                // Permit automatically released on drop ✅
590            });
591            Ok(())
592        });
593
594        // Notification handler
595        let semaphore_notif = Arc::clone(&self.inner.handler_semaphore);
596        let notification_handler = Arc::new(move |notification: JsonRpcNotification| {
597            let client = client_for_notifications.clone();
598            let semaphore = Arc::clone(&semaphore_notif);
599
600            // ✅ Spawn async task with bounded concurrency
601            tokio::spawn(async move {
602                // Acquire permit (blocks if 100 handlers already in flight)
603                let _permit = match semaphore.acquire().await {
604                    Ok(permit) => permit,
605                    Err(_) => {
606                        tracing::warn!("Handler semaphore closed, dropping notification");
607                        return;
608                    }
609                };
610
611                if let Err(e) = client.handle_notification(notification).await {
612                    tracing::error!("Error handling server notification: {}", e);
613                }
614                // Permit automatically released on drop ✅
615            });
616            Ok(())
617        });
618
619        // Register handlers synchronously - no race condition!
620        // The set_* methods are now synchronous with std::sync::Mutex
621        dispatcher.set_request_handler(request_handler);
622        dispatcher.set_notification_handler(notification_handler);
623        tracing::debug!("Dispatcher handlers registered successfully");
624    }
625
626    /// Handle server-initiated requests (elicitation, sampling, roots)
627    ///
628    /// This method is called by the MessageDispatcher when it receives a request
629    /// from the server. It routes the request to the appropriate handler based on
630    /// the method name.
631    async fn handle_request(&self, request: JsonRpcRequest) -> Result<()> {
632        match request.method.as_str() {
633            "sampling/createMessage" => {
634                let handler_opt = self.inner.sampling_handler.lock().clone();
635                if let Some(handler) = handler_opt {
636                    // Extract request ID for proper correlation
637                    let request_id = match &request.id {
638                        turbomcp_protocol::MessageId::String(s) => s.clone(),
639                        turbomcp_protocol::MessageId::Number(n) => n.to_string(),
640                        turbomcp_protocol::MessageId::Uuid(u) => u.to_string(),
641                    };
642
643                    let params: CreateMessageRequest =
644                        serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
645                            .map_err(|e| {
646                            Error::internal(format!("Invalid createMessage params: {}", e))
647                        })?;
648
649                    match handler.handle_create_message(request_id, params).await {
650                        Ok(result) => {
651                            let result_value = serde_json::to_value(result).map_err(|e| {
652                                Error::internal(format!("Failed to serialize response: {}", e))
653                            })?;
654                            let response = JsonRpcResponse::success(result_value, request.id);
655                            self.send_response(response).await?;
656                        }
657                        Err(e) => {
658                            tracing::warn!(
659                                "⚠️  [handle_request] Sampling handler returned error: {}",
660                                e
661                            );
662
663                            // Preserve error semantics by checking actual error type
664                            // This allows proper error code propagation for retry logic
665                            let (code, message) = if let Some(handler_err) =
666                                e.downcast_ref::<HandlerError>()
667                            {
668                                // HandlerError has explicit JSON-RPC code mapping
669                                let json_err = handler_err.into_jsonrpc_error();
670                                tracing::info!(
671                                    "📋 [handle_request] HandlerError mapped to JSON-RPC code: {}",
672                                    json_err.code
673                                );
674                                (json_err.code, json_err.message)
675                            } else if let Some(proto_err) =
676                                e.downcast_ref::<turbomcp_protocol::Error>()
677                            {
678                                // Protocol errors have ErrorKind-based mapping
679                                tracing::info!(
680                                    "📋 [handle_request] Protocol error mapped to code: {}",
681                                    proto_err.jsonrpc_error_code()
682                                );
683                                (proto_err.jsonrpc_error_code(), proto_err.to_string())
684                            } else {
685                                // Generic errors default to Internal (-32603)
686                                // Log the error type for debugging (should rarely hit this path)
687                                tracing::warn!(
688                                    "📋 [handle_request] Sampling handler returned unknown error type (not HandlerError or Protocol error): {}",
689                                    std::any::type_name_of_val(&*e)
690                                );
691                                (-32603, format!("Sampling handler error: {}", e))
692                            };
693
694                            let error = turbomcp_protocol::jsonrpc::JsonRpcError {
695                                code,
696                                message,
697                                data: None,
698                            };
699                            let response =
700                                JsonRpcResponse::error_response(error, request.id.clone());
701
702                            tracing::info!(
703                                "🔄 [handle_request] Attempting to send error response for request: {:?}",
704                                request.id
705                            );
706                            self.send_response(response).await?;
707                            tracing::info!(
708                                "✅ [handle_request] Error response sent successfully for request: {:?}",
709                                request.id
710                            );
711                        }
712                    }
713                } else {
714                    // No handler configured
715                    let error = turbomcp_protocol::jsonrpc::JsonRpcError {
716                        code: -32601,
717                        message: "Sampling not supported".to_string(),
718                        data: None,
719                    };
720                    let response = JsonRpcResponse::error_response(error, request.id);
721                    self.send_response(response).await?;
722                }
723            }
724            "roots/list" => {
725                // Handle roots/list request from server
726                // Clone the handler Arc to avoid holding mutex across await
727                let handler_opt = self.inner.handlers.lock().roots.clone();
728
729                let roots_result = if let Some(handler) = handler_opt {
730                    handler.handle_roots_request().await
731                } else {
732                    // No handler - return empty list per MCP spec
733                    Ok(Vec::new())
734                };
735
736                match roots_result {
737                    Ok(roots) => {
738                        let result_value =
739                            serde_json::to_value(turbomcp_protocol::types::ListRootsResult {
740                                roots,
741                                _meta: None,
742                            })
743                            .map_err(|e| {
744                                Error::internal(format!(
745                                    "Failed to serialize roots response: {}",
746                                    e
747                                ))
748                            })?;
749                        let response = JsonRpcResponse::success(result_value, request.id);
750                        self.send_response(response).await?;
751                    }
752                    Err(e) => {
753                        // FIXED: Extract error code from HandlerError instead of hardcoding -32603
754                        // Roots handler returns HandlerResult<Vec<Root>>, so error is HandlerError
755                        let json_err = e.into_jsonrpc_error();
756                        let response = JsonRpcResponse::error_response(json_err, request.id);
757                        self.send_response(response).await?;
758                    }
759                }
760            }
761            "elicitation/create" => {
762                // Clone handler Arc before await to avoid holding mutex across await
763                let handler_opt = self.inner.handlers.lock().elicitation.clone();
764                if let Some(handler) = handler_opt {
765                    // Parse elicitation request params as MCP protocol type
766                    let proto_request: turbomcp_protocol::types::ElicitRequest =
767                        serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
768                            .map_err(|e| {
769                            Error::internal(format!("Invalid elicitation params: {}", e))
770                        })?;
771
772                    // Wrap protocol request with ID for handler (preserves type safety!)
773                    let handler_request =
774                        crate::handlers::ElicitationRequest::new(request.id.clone(), proto_request);
775
776                    // Call the registered elicitation handler
777                    match handler.handle_elicitation(handler_request).await {
778                        Ok(elicit_response) => {
779                            // Convert handler response back to protocol type
780                            let proto_result = elicit_response.into_protocol();
781                            let result_value = serde_json::to_value(proto_result).map_err(|e| {
782                                Error::internal(format!(
783                                    "Failed to serialize elicitation response: {}",
784                                    e
785                                ))
786                            })?;
787                            let response = JsonRpcResponse::success(result_value, request.id);
788                            self.send_response(response).await?;
789                        }
790                        Err(e) => {
791                            // Convert handler error to JSON-RPC error using centralized mapping
792                            let response =
793                                JsonRpcResponse::error_response(e.into_jsonrpc_error(), request.id);
794                            self.send_response(response).await?;
795                        }
796                    }
797                } else {
798                    // No handler configured - elicitation not supported
799                    let error = turbomcp_protocol::jsonrpc::JsonRpcError {
800                        code: -32601,
801                        message: "Elicitation not supported - no handler registered".to_string(),
802                        data: None,
803                    };
804                    let response = JsonRpcResponse::error_response(error, request.id);
805                    self.send_response(response).await?;
806                }
807            }
808            _ => {
809                // Unknown method
810                let error = turbomcp_protocol::jsonrpc::JsonRpcError {
811                    code: -32601,
812                    message: format!("Method not found: {}", request.method),
813                    data: None,
814                };
815                let response = JsonRpcResponse::error_response(error, request.id);
816                self.send_response(response).await?;
817            }
818        }
819        Ok(())
820    }
821
822    /// Handle server-initiated notifications
823    ///
824    /// Routes notifications to appropriate handlers based on method name.
825    /// MCP defines several notification types that servers can send to clients:
826    ///
827    /// - `notifications/progress` - Progress updates for long-running operations
828    /// - `notifications/message` - Log messages from server
829    /// - `notifications/resources/updated` - Resource content changed
830    /// - `notifications/resources/list_changed` - Resource list changed
831    async fn handle_notification(&self, notification: JsonRpcNotification) -> Result<()> {
832        match notification.method.as_str() {
833            "notifications/progress" => {
834                // Progress tracking has been removed
835                tracing::debug!(
836                    "Received progress notification - progress tracking has been removed"
837                );
838            }
839
840            "notifications/message" => {
841                // Route to log handler
842                let handler_opt = self.inner.handlers.lock().get_log_handler();
843
844                if let Some(handler) = handler_opt {
845                    // Parse log message
846                    let log: crate::handlers::LoggingNotification = serde_json::from_value(
847                        notification.params.unwrap_or(serde_json::Value::Null),
848                    )
849                    .map_err(|e| Error::internal(format!("Invalid log notification: {}", e)))?;
850
851                    // Call handler
852                    if let Err(e) = handler.handle_log(log).await {
853                        tracing::error!("Log handler error: {}", e);
854                    }
855                } else {
856                    tracing::debug!("Received log notification but no handler registered");
857                }
858            }
859
860            "notifications/resources/updated" => {
861                // Route to resource update handler
862                let handler_opt = self.inner.handlers.lock().get_resource_update_handler();
863
864                if let Some(handler) = handler_opt {
865                    // Parse resource update notification
866                    let update: crate::handlers::ResourceUpdatedNotification =
867                        serde_json::from_value(
868                            notification.params.unwrap_or(serde_json::Value::Null),
869                        )
870                        .map_err(|e| {
871                            Error::internal(format!("Invalid resource update notification: {}", e))
872                        })?;
873
874                    // Call handler
875                    if let Err(e) = handler.handle_resource_update(update).await {
876                        tracing::error!("Resource update handler error: {}", e);
877                    }
878                } else {
879                    tracing::debug!(
880                        "Received resource update notification but no handler registered"
881                    );
882                }
883            }
884
885            "notifications/resources/list_changed" => {
886                // Route to resource list changed handler
887                let handler_opt = self
888                    .inner
889                    .handlers
890                    .lock()
891                    .get_resource_list_changed_handler();
892
893                if let Some(handler) = handler_opt {
894                    if let Err(e) = handler.handle_resource_list_changed().await {
895                        tracing::error!("Resource list changed handler error: {}", e);
896                    }
897                } else {
898                    tracing::debug!(
899                        "Resource list changed notification received (no handler registered)"
900                    );
901                }
902            }
903
904            "notifications/prompts/list_changed" => {
905                // Route to prompt list changed handler
906                let handler_opt = self.inner.handlers.lock().get_prompt_list_changed_handler();
907
908                if let Some(handler) = handler_opt {
909                    if let Err(e) = handler.handle_prompt_list_changed().await {
910                        tracing::error!("Prompt list changed handler error: {}", e);
911                    }
912                } else {
913                    tracing::debug!(
914                        "Prompt list changed notification received (no handler registered)"
915                    );
916                }
917            }
918
919            "notifications/tools/list_changed" => {
920                // Route to tool list changed handler
921                let handler_opt = self.inner.handlers.lock().get_tool_list_changed_handler();
922
923                if let Some(handler) = handler_opt {
924                    if let Err(e) = handler.handle_tool_list_changed().await {
925                        tracing::error!("Tool list changed handler error: {}", e);
926                    }
927                } else {
928                    tracing::debug!(
929                        "Tool list changed notification received (no handler registered)"
930                    );
931                }
932            }
933
934            "notifications/cancelled" => {
935                // Route to cancellation handler
936                let handler_opt = self.inner.handlers.lock().get_cancellation_handler();
937
938                if let Some(handler) = handler_opt {
939                    // Parse cancellation notification
940                    let cancellation: crate::handlers::CancelledNotification =
941                        serde_json::from_value(
942                            notification.params.unwrap_or(serde_json::Value::Null),
943                        )
944                        .map_err(|e| {
945                            Error::internal(format!("Invalid cancellation notification: {}", e))
946                        })?;
947
948                    // Call handler
949                    if let Err(e) = handler.handle_cancellation(cancellation).await {
950                        tracing::error!("Cancellation handler error: {}", e);
951                    }
952                } else {
953                    tracing::debug!("Cancellation notification received (no handler registered)");
954                }
955            }
956
957            _ => {
958                // Unknown notification type
959                tracing::debug!("Received unknown notification: {}", notification.method);
960            }
961        }
962
963        Ok(())
964    }
965
966    async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
967        tracing::info!(
968            "📤 [send_response] Sending JSON-RPC response: id={:?}",
969            response.id
970        );
971
972        let payload = serde_json::to_vec(&response).map_err(|e| {
973            tracing::error!("❌ [send_response] Failed to serialize response: {}", e);
974            Error::internal(format!("Failed to serialize response: {}", e))
975        })?;
976
977        tracing::debug!(
978            "📤 [send_response] Response payload: {} bytes",
979            payload.len()
980        );
981        tracing::debug!(
982            "📤 [send_response] Response JSON: {}",
983            String::from_utf8_lossy(&payload)
984        );
985
986        let message = TransportMessage::new(
987            turbomcp_protocol::MessageId::from("response".to_string()),
988            payload.into(),
989        );
990
991        self.inner
992            .protocol
993            .transport()
994            .send(message)
995            .await
996            .map_err(|e| {
997                tracing::error!("❌ [send_response] Transport send failed: {}", e);
998                Error::transport(format!("Failed to send response: {}", e))
999            })?;
1000
1001        tracing::info!(
1002            "✅ [send_response] Response sent successfully: id={:?}",
1003            response.id
1004        );
1005        Ok(())
1006    }
1007
1008    /// Initialize the connection with the MCP server
1009    ///
1010    /// Performs the initialization handshake with the server, negotiating capabilities
1011    /// and establishing the protocol version. This method must be called before
1012    /// any other operations can be performed.
1013    ///
1014    /// # Returns
1015    ///
1016    /// Returns an `InitializeResult` containing server information and negotiated capabilities.
1017    ///
1018    /// # Errors
1019    ///
1020    /// Returns an error if:
1021    /// - The transport connection fails
1022    /// - The server rejects the initialization request
1023    /// - Protocol negotiation fails
1024    ///
1025    /// # Examples
1026    ///
1027    /// ```rust,no_run
1028    /// # use turbomcp_client::Client;
1029    /// # use turbomcp_transport::stdio::StdioTransport;
1030    /// # async fn example() -> turbomcp_protocol::Result<()> {
1031    /// let mut client = Client::new(StdioTransport::new());
1032    ///
1033    /// let result = client.initialize().await?;
1034    /// println!("Server: {} v{}", result.server_info.name, result.server_info.version);
1035    /// # Ok(())
1036    /// # }
1037    /// ```
1038    pub async fn initialize(&self) -> Result<InitializeResult> {
1039        // Auto-connect transport if not already connected
1040        // This provides consistent DX across all transports (Stdio, TCP, HTTP, WebSocket, Unix)
1041        let transport = self.inner.protocol.transport();
1042        let transport_state = transport.state().await;
1043        if !matches!(
1044            transport_state,
1045            turbomcp_transport::TransportState::Connected
1046        ) {
1047            tracing::debug!(
1048                "Auto-connecting transport (current state: {:?})",
1049                transport_state
1050            );
1051            transport
1052                .connect()
1053                .await
1054                .map_err(|e| Error::transport(format!("Failed to connect transport: {}", e)))?;
1055            tracing::info!("Transport connected successfully");
1056        }
1057
1058        // Build client capabilities based on registered handlers (automatic detection)
1059        let mut client_caps = ProtocolClientCapabilities::default();
1060
1061        // Detect sampling capability from handler
1062        if let Some(sampling_caps) = self.get_sampling_capabilities() {
1063            client_caps.sampling = Some(sampling_caps);
1064        }
1065
1066        // Detect elicitation capability from handler
1067        if let Some(elicitation_caps) = self.get_elicitation_capabilities() {
1068            client_caps.elicitation = Some(elicitation_caps);
1069        }
1070
1071        // Detect roots capability from handler
1072        if let Some(roots_caps) = self.get_roots_capabilities() {
1073            client_caps.roots = Some(roots_caps);
1074        }
1075
1076        // Send MCP initialization request
1077        let request = InitializeRequest {
1078            protocol_version: PROTOCOL_VERSION.to_string(),
1079            capabilities: client_caps,
1080            client_info: turbomcp_protocol::types::Implementation {
1081                name: "turbomcp-client".to_string(),
1082                version: env!("CARGO_PKG_VERSION").to_string(),
1083                title: Some("TurboMCP Client".to_string()),
1084                ..Default::default()
1085            },
1086            _meta: None,
1087        };
1088
1089        let protocol_response: ProtocolInitializeResult = self
1090            .inner
1091            .protocol
1092            .request("initialize", Some(serde_json::to_value(request)?))
1093            .await?;
1094
1095        // AtomicBool: lock-free store with Ordering::Relaxed
1096        self.inner.initialized.store(true, Ordering::Relaxed);
1097
1098        // Send initialized notification
1099        self.inner
1100            .protocol
1101            .notify("notifications/initialized", None)
1102            .await?;
1103
1104        // Convert protocol response to client response type
1105        Ok(InitializeResult {
1106            server_info: protocol_response.server_info,
1107            server_capabilities: protocol_response.capabilities,
1108        })
1109    }
1110
1111    /// Subscribe to resource change notifications
1112    ///
1113    /// Registers interest in receiving notifications when the specified
1114    /// resource changes. The server will send notifications when the
1115    /// resource is modified, created, or deleted.
1116    ///
1117    /// # Arguments
1118    ///
1119    /// * `uri` - The URI of the resource to monitor
1120    ///
1121    /// # Returns
1122    ///
1123    /// Returns `EmptyResult` on successful subscription.
1124    ///
1125    /// # Errors
1126    ///
1127    /// Returns an error if:
1128    /// - The client is not initialized
1129    /// - The URI is invalid or empty
1130    /// - The server doesn't support subscriptions
1131    /// - The request fails
1132    ///
1133    /// # Examples
1134    ///
1135    /// ```rust,no_run
1136    /// # use turbomcp_client::Client;
1137    /// # use turbomcp_transport::stdio::StdioTransport;
1138    /// # async fn example() -> turbomcp_protocol::Result<()> {
1139    /// let mut client = Client::new(StdioTransport::new());
1140    /// client.initialize().await?;
1141    ///
1142    /// // Subscribe to file changes
1143    /// client.subscribe("file:///watch/directory").await?;
1144    /// println!("Subscribed to resource changes");
1145    /// # Ok(())
1146    /// # }
1147    /// ```
1148    pub async fn subscribe(&self, uri: &str) -> Result<EmptyResult> {
1149        if !self.inner.initialized.load(Ordering::Relaxed) {
1150            return Err(Error::invalid_request("Client not initialized"));
1151        }
1152
1153        if uri.is_empty() {
1154            return Err(Error::invalid_request("Subscription URI cannot be empty"));
1155        }
1156
1157        // Send resources/subscribe request
1158        let request = SubscribeRequest {
1159            uri: uri.to_string(),
1160        };
1161
1162        self.inner
1163            .protocol
1164            .request(
1165                "resources/subscribe",
1166                Some(serde_json::to_value(request).map_err(|e| {
1167                    Error::internal(format!("Failed to serialize subscribe request: {}", e))
1168                })?),
1169            )
1170            .await
1171    }
1172
1173    /// Unsubscribe from resource change notifications
1174    ///
1175    /// Cancels a previous subscription to resource changes. After unsubscribing,
1176    /// the client will no longer receive notifications for the specified resource.
1177    ///
1178    /// # Arguments
1179    ///
1180    /// * `uri` - The URI of the resource to stop monitoring
1181    ///
1182    /// # Returns
1183    ///
1184    /// Returns `EmptyResult` on successful unsubscription.
1185    ///
1186    /// # Errors
1187    ///
1188    /// Returns an error if:
1189    /// - The client is not initialized
1190    /// - The URI is invalid or empty
1191    /// - No active subscription exists for the URI
1192    /// - The request fails
1193    ///
1194    /// # Examples
1195    ///
1196    /// ```rust,no_run
1197    /// # use turbomcp_client::Client;
1198    /// # use turbomcp_transport::stdio::StdioTransport;
1199    /// # async fn example() -> turbomcp_protocol::Result<()> {
1200    /// let mut client = Client::new(StdioTransport::new());
1201    /// client.initialize().await?;
1202    ///
1203    /// // Unsubscribe from file changes
1204    /// client.unsubscribe("file:///watch/directory").await?;
1205    /// println!("Unsubscribed from resource changes");
1206    /// # Ok(())
1207    /// # }
1208    /// ```
1209    pub async fn unsubscribe(&self, uri: &str) -> Result<EmptyResult> {
1210        if !self.inner.initialized.load(Ordering::Relaxed) {
1211            return Err(Error::invalid_request("Client not initialized"));
1212        }
1213
1214        if uri.is_empty() {
1215            return Err(Error::invalid_request("Unsubscription URI cannot be empty"));
1216        }
1217
1218        // Send resources/unsubscribe request
1219        let request = UnsubscribeRequest {
1220            uri: uri.to_string(),
1221        };
1222
1223        self.inner
1224            .protocol
1225            .request(
1226                "resources/unsubscribe",
1227                Some(serde_json::to_value(request).map_err(|e| {
1228                    Error::internal(format!("Failed to serialize unsubscribe request: {}", e))
1229                })?),
1230            )
1231            .await
1232    }
1233
1234    /// Get the client's capabilities configuration
1235    #[must_use]
1236    pub fn capabilities(&self) -> &ClientCapabilities {
1237        &self.inner.capabilities
1238    }
1239
1240    // ============================================================================
1241    // Tasks API Methods (MCP 2025-11-25 Draft - SEP-1686)
1242    // ============================================================================
1243
1244    /// Retrieve the status of a task (tasks/get)
1245    ///
1246    /// Polls the server for the current status of a specific task.
1247    ///
1248    /// # Arguments
1249    ///
1250    /// * `task_id` - The ID of the task to query
1251    ///
1252    /// # Returns
1253    ///
1254    /// Returns the current `Task` state including status, timestamps, and messages.
1255    #[cfg(feature = "experimental-tasks")]
1256    pub async fn get_task(&self, task_id: &str) -> Result<Task> {
1257        let request = GetTaskRequest {
1258            task_id: task_id.to_string(),
1259        };
1260
1261        self.inner
1262            .protocol
1263            .request(
1264                "tasks/get",
1265                Some(serde_json::to_value(request).map_err(|e| {
1266                    Error::internal(format!("Failed to serialize get_task request: {}", e))
1267                })?),
1268            )
1269            .await
1270    }
1271
1272    /// Cancel a running task (tasks/cancel)
1273    ///
1274    /// Attempts to cancel a task execution. This is a best-effort operation.
1275    ///
1276    /// # Arguments
1277    ///
1278    /// * `task_id` - The ID of the task to cancel
1279    ///
1280    /// # Returns
1281    ///
1282    /// Returns the updated `Task` state (typically with status "cancelled").
1283    #[cfg(feature = "experimental-tasks")]
1284    pub async fn cancel_task(&self, task_id: &str) -> Result<Task> {
1285        let request = CancelTaskRequest {
1286            task_id: task_id.to_string(),
1287        };
1288
1289        self.inner
1290            .protocol
1291            .request(
1292                "tasks/cancel",
1293                Some(serde_json::to_value(request).map_err(|e| {
1294                    Error::internal(format!("Failed to serialize cancel_task request: {}", e))
1295                })?),
1296            )
1297            .await
1298    }
1299
1300    /// List all tasks (tasks/list)
1301    ///
1302    /// Retrieves a paginated list of tasks known to the server.
1303    ///
1304    /// # Arguments
1305    ///
1306    /// * `cursor` - Optional pagination cursor from a previous response
1307    /// * `limit` - Optional maximum number of tasks to return
1308    ///
1309    /// # Returns
1310    ///
1311    /// Returns a `ListTasksResult` containing the list of tasks and next cursor.
1312    #[cfg(feature = "experimental-tasks")]
1313    pub async fn list_tasks(
1314        &self,
1315        cursor: Option<String>,
1316        limit: Option<usize>,
1317    ) -> Result<ListTasksResult> {
1318        let request = ListTasksRequest { cursor, limit };
1319
1320        self.inner
1321            .protocol
1322            .request(
1323                "tasks/list",
1324                Some(serde_json::to_value(request).map_err(|e| {
1325                    Error::internal(format!("Failed to serialize list_tasks request: {}", e))
1326                })?),
1327            )
1328            .await
1329    }
1330
1331    /// Retrieve the result of a completed task (tasks/result)
1332    ///
1333    /// Blocks until the task reaches a terminal state (completed, failed, or cancelled),
1334    /// then returns the operation result.
1335    ///
1336    /// # Arguments
1337    ///
1338    /// * `task_id` - The ID of the task to retrieve results for
1339    ///
1340    /// # Returns
1341    ///
1342    /// Returns a `GetTaskPayloadResult` containing the operation result (e.g. CallToolResult).
1343    #[cfg(feature = "experimental-tasks")]
1344    pub async fn get_task_result(&self, task_id: &str) -> Result<GetTaskPayloadResult> {
1345        let request = GetTaskPayloadRequest {
1346            task_id: task_id.to_string(),
1347        };
1348
1349        self.inner
1350            .protocol
1351            .request(
1352                "tasks/result",
1353                Some(serde_json::to_value(request).map_err(|e| {
1354                    Error::internal(format!(
1355                        "Failed to serialize get_task_result request: {}",
1356                        e
1357                    ))
1358                })?),
1359            )
1360            .await
1361    }
1362
1363    // Note: Capability detection methods (has_*_handler, get_*_capabilities)
1364    // are defined in their respective operation modules:
1365    // - sampling.rs: has_sampling_handler, get_sampling_capabilities
1366    // - handlers.rs: has_elicitation_handler, has_roots_handler
1367    //
1368    // Additional capability getters for elicitation and roots added below
1369    // since they're used during initialization
1370
1371    /// Get elicitation capabilities if handler is registered
1372    /// Automatically detects capability based on registered handler
1373    fn get_elicitation_capabilities(
1374        &self,
1375    ) -> Option<turbomcp_protocol::types::ElicitationCapabilities> {
1376        if self.has_elicitation_handler() {
1377            // Currently returns default capabilities. In the future, schema_validation support
1378            // could be detected from handler traits by adding a HasSchemaValidation marker trait
1379            // that handlers could implement. For now, handlers validate schemas themselves.
1380            Some(turbomcp_protocol::types::ElicitationCapabilities::default())
1381        } else {
1382            None
1383        }
1384    }
1385
1386    /// Get roots capabilities if handler is registered
1387    fn get_roots_capabilities(&self) -> Option<turbomcp_protocol::types::RootsCapabilities> {
1388        if self.has_roots_handler() {
1389            // Roots capabilities indicate whether list can change
1390            Some(turbomcp_protocol::types::RootsCapabilities {
1391                list_changed: Some(true), // Support dynamic roots by default
1392            })
1393        } else {
1394            None
1395        }
1396    }
1397}