turbomcp_client/client/
core.rs

1//! Core Client implementation for MCP communication
2//!
3//! This module contains the main `Client<T>` struct and its implementation,
4//! providing the core MCP client functionality including:
5//!
6//! - Connection initialization and lifecycle management
7//! - Message processing and bidirectional communication
8//! - MCP operation support (tools, prompts, resources, sampling, etc.)
9//! - Plugin middleware integration
10//! - Handler registration and management
11//!
12//! # Architecture
13//!
14//! `Client<T>` is implemented as a cheaply-cloneable Arc wrapper with interior
15//! mutability (same pattern as reqwest and AWS SDK):
16//!
17//! - **AtomicBool** for initialized flag (lock-free)
18//! - **Arc<Mutex<...>>** for handlers/plugins (infrequent mutation)
19//! - **`Arc<ClientInner<T>>`** for cheap cloning
20
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::{Arc, Mutex as StdMutex};
23
24use tokio::sync::Semaphore;
25
26use turbomcp_protocol::jsonrpc::*;
27#[cfg(feature = "mcp-tasks")]
28use turbomcp_protocol::types::tasks::*;
29use turbomcp_protocol::types::{
30    ClientCapabilities as ProtocolClientCapabilities, InitializeResult as ProtocolInitializeResult,
31    *,
32};
33use turbomcp_protocol::{Error, PROTOCOL_VERSION, Result};
34use turbomcp_transport::{Transport, TransportMessage};
35
36use super::config::InitializeResult;
37use super::protocol::ProtocolClient;
38use crate::{
39    ClientCapabilities,
40    handlers::{HandlerError, HandlerRegistry},
41    sampling::SamplingHandler,
42};
43
44/// Inner client state with interior mutability
45///
46/// This structure contains the actual client state and is wrapped in Arc<...>
47/// to enable cheap cloning (same pattern as reqwest and AWS SDK).
48pub(super) struct ClientInner<T: Transport + 'static> {
49    /// Protocol client for low-level communication
50    pub(super) protocol: ProtocolClient<T>,
51
52    /// Client capabilities (immutable after construction)
53    pub(super) capabilities: ClientCapabilities,
54
55    /// Initialization state (lock-free atomic boolean)
56    pub(super) initialized: AtomicBool,
57
58    /// Optional sampling handler (mutex for dynamic updates)
59    pub(super) sampling_handler: Arc<StdMutex<Option<Arc<dyn SamplingHandler>>>>,
60
61    /// Handler registry for bidirectional communication (mutex for registration)
62    pub(super) handlers: Arc<StdMutex<HandlerRegistry>>,
63
64    /// Plugin registry for middleware (tokio mutex - holds across await)
65    pub(super) plugin_registry: Arc<tokio::sync::Mutex<crate::plugins::PluginRegistry>>,
66
67    /// ✅ Semaphore for bounded concurrency of request/notification handlers
68    /// Limits concurrent server-initiated request handlers to prevent resource exhaustion
69    pub(super) handler_semaphore: Arc<Semaphore>,
70}
71
72/// The core MCP client implementation
73///
74/// Client provides a comprehensive interface for communicating with MCP servers,
75/// supporting all protocol features including tools, prompts, resources, sampling,
76/// elicitation, and bidirectional communication patterns.
77///
78/// # Clone Pattern
79///
80/// `Client<T>` is cheaply cloneable via Arc (same pattern as reqwest and AWS SDK).
81/// All clones share the same underlying connection and state:
82///
83/// ```rust,no_run
84/// use turbomcp_client::Client;
85/// use turbomcp_transport::stdio::StdioTransport;
86///
87/// # async fn example() -> turbomcp_protocol::Result<()> {
88/// let client = Client::new(StdioTransport::new());
89/// client.initialize().await?;
90///
91/// // Cheap clone - shares same connection
92/// let client2 = client.clone();
93/// tokio::spawn(async move {
94///     client2.list_tools().await.ok();
95/// });
96/// # Ok(())
97/// # }
98/// ```
99///
100/// The client must be initialized before use by calling `initialize()` to perform
101/// the MCP handshake and capability negotiation.
102///
103/// # Features
104///
105/// - **Protocol Compliance**: Full MCP 2025-06-18 specification support
106/// - **Bidirectional Communication**: Server-initiated requests and client responses
107/// - **Plugin Middleware**: Extensible request/response processing
108/// - **Handler Registry**: Callbacks for server-initiated operations
109/// - **Connection Management**: Robust error handling and recovery
110/// - **Type Safety**: Compile-time guarantees for MCP message types
111/// - **Cheap Cloning**: Arc-based sharing like reqwest/AWS SDK
112///
113/// # Examples
114///
115/// ```rust,no_run
116/// use turbomcp_client::Client;
117/// use turbomcp_transport::stdio::StdioTransport;
118/// use std::collections::HashMap;
119///
120/// # async fn example() -> turbomcp_protocol::Result<()> {
121/// // Create and initialize client (no mut needed!)
122/// let client = Client::new(StdioTransport::new());
123/// let init_result = client.initialize().await?;
124/// println!("Connected to: {}", init_result.server_info.name);
125///
126/// // Use MCP operations
127/// let tools = client.list_tools().await?;
128/// let mut args = HashMap::new();
129/// args.insert("input".to_string(), serde_json::json!("test"));
130/// let result = client.call_tool("my_tool", Some(args)).await?;
131/// # Ok(())
132/// # }
133/// ```
134pub struct Client<T: Transport + 'static> {
135    pub(super) inner: Arc<ClientInner<T>>,
136}
137
138/// Clone implementation via Arc (same pattern as reqwest/AWS SDK)
139///
140/// Cloning a Client is cheap (just an Arc clone) and all clones share
141/// the same underlying connection and state.
142impl<T: Transport + 'static> Clone for Client<T> {
143    fn clone(&self) -> Self {
144        Self {
145            inner: Arc::clone(&self.inner),
146        }
147    }
148}
149
150impl<T: Transport + 'static> Drop for ClientInner<T> {
151    fn drop(&mut self) {
152        // Best-effort cleanup if shutdown() wasn't called
153        // This is a safety net, but applications SHOULD call shutdown() explicitly
154
155        // Single warning to catch attention
156        tracing::warn!(
157            "MCP Client dropped without explicit shutdown() - call client.shutdown().await for clean resource cleanup"
158        );
159
160        // Details at debug level to avoid noise in production
161        tracing::debug!("   📘 RECOMMENDATION: Call client.shutdown().await before dropping");
162        tracing::debug!(
163            "   🐛 IMPACT: Background tasks (especially WebSocket reconnection) may continue running"
164        );
165        tracing::debug!("   💡 See client shutdown documentation for best practices");
166
167        // We can shutdown the dispatcher (it's synchronous)
168        self.protocol.dispatcher().shutdown();
169        tracing::debug!("   ✅ Message dispatcher stopped");
170
171        // ⚠️  LIMITATION: Cannot call transport.disconnect() from Drop because it's async
172        //    This means:
173        //    - WebSocket: Close frame not sent, reconnection tasks keep running
174        //    - HTTP: Connections may not close cleanly
175        //    - All transports: Background tasks may be orphaned
176        tracing::debug!("   ❌ Transport NOT disconnected (Drop cannot call async methods)");
177        tracing::debug!("   ⚠️  WebSocket reconnection and other background tasks may continue");
178    }
179}
180
181impl<T: Transport + 'static> Client<T> {
182    /// Create a new client with the specified transport
183    ///
184    /// Creates a new MCP client instance with default capabilities.
185    /// The client must be initialized before use by calling `initialize()`.
186    ///
187    /// # Arguments
188    ///
189    /// * `transport` - The transport implementation to use for communication
190    ///
191    /// # Examples
192    ///
193    /// ```rust,no_run
194    /// use turbomcp_client::Client;
195    /// use turbomcp_transport::stdio::StdioTransport;
196    ///
197    /// let transport = StdioTransport::new();
198    /// let client = Client::new(transport);
199    /// ```
200    pub fn new(transport: T) -> Self {
201        let capabilities = ClientCapabilities::default();
202        let client = Self {
203            inner: Arc::new(ClientInner {
204                protocol: ProtocolClient::new(transport),
205                capabilities: capabilities.clone(),
206                initialized: AtomicBool::new(false),
207                sampling_handler: Arc::new(StdMutex::new(None)),
208                handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
209                plugin_registry: Arc::new(tokio::sync::Mutex::new(
210                    crate::plugins::PluginRegistry::new(),
211                )),
212                handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), // ✅ Configurable concurrent handlers
213            }),
214        };
215
216        // Register dispatcher handlers for bidirectional communication
217        client.register_dispatcher_handlers();
218
219        client
220    }
221
222    /// Create a new client with custom capabilities
223    ///
224    /// # Arguments
225    ///
226    /// * `transport` - The transport implementation to use
227    /// * `capabilities` - The client capabilities to negotiate
228    ///
229    /// # Examples
230    ///
231    /// ```rust,no_run
232    /// use turbomcp_client::{Client, ClientCapabilities};
233    /// use turbomcp_transport::stdio::StdioTransport;
234    ///
235    /// let capabilities = ClientCapabilities {
236    ///     tools: true,
237    ///     prompts: true,
238    ///     resources: false,
239    ///     sampling: false,
240    ///     max_concurrent_handlers: 100,
241    /// };
242    ///
243    /// let transport = StdioTransport::new();
244    /// let client = Client::with_capabilities(transport, capabilities);
245    /// ```
246    pub fn with_capabilities(transport: T, capabilities: ClientCapabilities) -> Self {
247        let client = Self {
248            inner: Arc::new(ClientInner {
249                protocol: ProtocolClient::new(transport),
250                capabilities: capabilities.clone(),
251                initialized: AtomicBool::new(false),
252                sampling_handler: Arc::new(StdMutex::new(None)),
253                handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
254                plugin_registry: Arc::new(tokio::sync::Mutex::new(
255                    crate::plugins::PluginRegistry::new(),
256                )),
257                handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), // ✅ Configurable concurrent handlers
258            }),
259        };
260
261        // Register dispatcher handlers for bidirectional communication
262        client.register_dispatcher_handlers();
263
264        client
265    }
266
267    /// Shutdown the client and clean up all resources
268    ///
269    /// This method performs a **graceful shutdown** of the MCP client by:
270    /// 1. Stopping the message dispatcher background task
271    /// 2. Disconnecting the transport (closes connection, stops background tasks)
272    ///
273    /// **CRITICAL**: After calling `shutdown()`, the client can no longer be used.
274    ///
275    /// # Why This Method Exists
276    ///
277    /// The `Drop` implementation cannot call async methods like `transport.disconnect()`,
278    /// which is required for proper cleanup of WebSocket connections and background tasks.
279    /// Without calling `shutdown()`, WebSocket reconnection tasks will continue running.
280    ///
281    /// # Best Practices
282    ///
283    /// - **Always call `shutdown()` before dropping** for clean resource cleanup
284    /// - For applications: call in signal handlers (`SIGINT`, `SIGTERM`)
285    /// - For tests: call in cleanup/teardown
286    /// - If forgotten, Drop will log warnings and do best-effort cleanup
287    ///
288    /// # Examples
289    ///
290    /// ```rust,no_run
291    /// # use turbomcp_client::Client;
292    /// # use turbomcp_transport::stdio::StdioTransport;
293    /// # async fn example() -> turbomcp_protocol::Result<()> {
294    /// let client = Client::new(StdioTransport::new());
295    /// client.initialize().await?;
296    ///
297    /// // Use client...
298    /// let _tools = client.list_tools().await?;
299    ///
300    /// // ✅ Clean shutdown
301    /// client.shutdown().await?;
302    /// # Ok(())
303    /// # }
304    /// ```
305    pub async fn shutdown(&self) -> Result<()> {
306        tracing::info!("🛑 Shutting down MCP client");
307
308        // 1. Shutdown message dispatcher
309        self.inner.protocol.dispatcher().shutdown();
310        tracing::debug!("✅ Message dispatcher stopped");
311
312        // 2. Disconnect transport (WebSocket: stops reconnection, HTTP: closes connections)
313        match self.inner.protocol.transport().disconnect().await {
314            Ok(()) => {
315                tracing::info!("✅ Transport disconnected successfully");
316            }
317            Err(e) => {
318                tracing::warn!("Transport disconnect error (may already be closed): {}", e);
319            }
320        }
321
322        tracing::info!("✅ MCP client shutdown complete");
323        Ok(())
324    }
325}
326
327// ============================================================================
328// HTTP-Specific Convenience Constructors (Feature-Gated)
329// ============================================================================
330
331#[cfg(feature = "http")]
332impl Client<turbomcp_transport::streamable_http_client::StreamableHttpClientTransport> {
333    /// Connect to an HTTP MCP server (convenience method)
334    ///
335    /// This is a convenient one-liner alternative to manual configuration.
336    /// Creates an HTTP client, connects, and initializes in one call.
337    ///
338    /// # Arguments
339    ///
340    /// * `url` - The base URL of the MCP server (e.g., "http://localhost:8080")
341    ///
342    /// # Returns
343    ///
344    /// Returns an initialized `Client` ready to use.
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if:
349    /// - The URL is invalid
350    /// - Connection to the server fails
351    /// - Initialization handshake fails
352    ///
353    /// # Examples
354    ///
355    /// ```rust,no_run
356    /// use turbomcp_client::Client;
357    ///
358    /// # async fn example() -> turbomcp_protocol::Result<()> {
359    /// // Convenient one-liner - balanced with server DX
360    /// let client = Client::connect_http("http://localhost:8080").await?;
361    ///
362    /// // Now use it directly
363    /// let tools = client.list_tools().await?;
364    /// # Ok(())
365    /// # }
366    /// ```
367    ///
368    /// Compare to verbose approach (10+ lines):
369    /// ```rust,no_run
370    /// use turbomcp_client::Client;
371    /// use turbomcp_transport::streamable_http_client::{
372    ///     StreamableHttpClientConfig, StreamableHttpClientTransport
373    /// };
374    ///
375    /// # async fn example() -> turbomcp_protocol::Result<()> {
376    /// let config = StreamableHttpClientConfig {
377    ///     base_url: "http://localhost:8080".to_string(),
378    ///     ..Default::default()
379    /// };
380    /// let transport = StreamableHttpClientTransport::new(config);
381    /// let client = Client::new(transport);
382    /// client.initialize().await?;
383    /// # Ok(())
384    /// # }
385    /// ```
386    pub async fn connect_http(url: impl Into<String>) -> Result<Self> {
387        use turbomcp_transport::streamable_http_client::{
388            StreamableHttpClientConfig, StreamableHttpClientTransport,
389        };
390
391        let config = StreamableHttpClientConfig {
392            base_url: url.into(),
393            ..Default::default()
394        };
395
396        let transport = StreamableHttpClientTransport::new(config);
397        let client = Self::new(transport);
398
399        // Initialize connection immediately
400        client.initialize().await?;
401
402        Ok(client)
403    }
404
405    /// Connect to an HTTP MCP server with custom configuration
406    ///
407    /// Provides more control than `connect_http()` while still being ergonomic.
408    ///
409    /// # Arguments
410    ///
411    /// * `url` - The base URL of the MCP server
412    /// * `config_fn` - Function to customize the configuration
413    ///
414    /// # Examples
415    ///
416    /// ```rust,no_run
417    /// use turbomcp_client::Client;
418    /// use std::time::Duration;
419    ///
420    /// # async fn example() -> turbomcp_protocol::Result<()> {
421    /// let client = Client::connect_http_with("http://localhost:8080", |config| {
422    ///     config.timeout = Duration::from_secs(60);
423    ///     config.endpoint_path = "/api/mcp".to_string();
424    /// }).await?;
425    /// # Ok(())
426    /// # }
427    /// ```
428    pub async fn connect_http_with<F>(url: impl Into<String>, config_fn: F) -> Result<Self>
429    where
430        F: FnOnce(&mut turbomcp_transport::streamable_http_client::StreamableHttpClientConfig),
431    {
432        use turbomcp_transport::streamable_http_client::{
433            StreamableHttpClientConfig, StreamableHttpClientTransport,
434        };
435
436        let mut config = StreamableHttpClientConfig {
437            base_url: url.into(),
438            ..Default::default()
439        };
440
441        config_fn(&mut config);
442
443        let transport = StreamableHttpClientTransport::new(config);
444        let client = Self::new(transport);
445
446        client.initialize().await?;
447
448        Ok(client)
449    }
450}
451
452// ============================================================================
453// TCP-Specific Convenience Constructors (Feature-Gated)
454// ============================================================================
455
456#[cfg(feature = "tcp")]
457impl Client<turbomcp_transport::tcp::TcpTransport> {
458    /// Connect to a TCP MCP server (convenience method)
459    ///
460    /// Convenient one-liner for TCP connections - balanced DX.
461    ///
462    /// # Arguments
463    ///
464    /// * `addr` - Server address (e.g., "127.0.0.1:8765" or localhost:8765")
465    ///
466    /// # Returns
467    ///
468    /// Returns an initialized `Client` ready to use.
469    ///
470    /// # Examples
471    ///
472    /// ```rust,no_run
473    /// # #[cfg(feature = "tcp")]
474    /// use turbomcp_client::Client;
475    ///
476    /// # async fn example() -> turbomcp_protocol::Result<()> {
477    /// let client = Client::connect_tcp("127.0.0.1:8765").await?;
478    /// let tools = client.list_tools().await?;
479    /// # Ok(())
480    /// # }
481    /// ```
482    pub async fn connect_tcp(addr: impl AsRef<str>) -> Result<Self> {
483        use std::net::SocketAddr;
484        use turbomcp_transport::tcp::TcpTransport;
485
486        let server_addr: SocketAddr = addr
487            .as_ref()
488            .parse()
489            .map_err(|e| Error::bad_request(format!("Invalid address: {}", e)))?;
490
491        // Client binds to 0.0.0.0:0 (any available port)
492        let bind_addr: SocketAddr = if server_addr.is_ipv6() {
493            "[::]:0".parse().unwrap()
494        } else {
495            "0.0.0.0:0".parse().unwrap()
496        };
497
498        let transport = TcpTransport::new_client(bind_addr, server_addr);
499        let client = Self::new(transport);
500
501        client.initialize().await?;
502
503        Ok(client)
504    }
505}
506
507// ============================================================================
508// Unix Socket-Specific Convenience Constructors (Feature-Gated)
509// ============================================================================
510
511#[cfg(all(unix, feature = "unix"))]
512impl Client<turbomcp_transport::unix::UnixTransport> {
513    /// Connect to a Unix socket MCP server (convenience method)
514    ///
515    /// Convenient one-liner for Unix socket IPC - balanced DX.
516    ///
517    /// # Arguments
518    ///
519    /// * `path` - Socket file path (e.g., "/tmp/mcp.sock")
520    ///
521    /// # Returns
522    ///
523    /// Returns an initialized `Client` ready to use.
524    ///
525    /// # Examples
526    ///
527    /// ```rust,no_run
528    /// # #[cfg(all(unix, feature = "unix"))]
529    /// use turbomcp_client::Client;
530    ///
531    /// # async fn example() -> turbomcp_protocol::Result<()> {
532    /// let client = Client::connect_unix("/tmp/mcp.sock").await?;
533    /// let tools = client.list_tools().await?;
534    /// # Ok(())
535    /// # }
536    /// ```
537    pub async fn connect_unix(path: impl Into<std::path::PathBuf>) -> Result<Self> {
538        use turbomcp_transport::unix::UnixTransport;
539
540        let transport = UnixTransport::new_client(path.into());
541        let client = Self::new(transport);
542
543        client.initialize().await?;
544
545        Ok(client)
546    }
547}
548
549impl<T: Transport + 'static> Client<T> {
550    /// Register message handlers with the dispatcher
551    ///
552    /// This method sets up the callbacks that handle server-initiated requests
553    /// and notifications. The dispatcher's background task routes incoming
554    /// messages to these handlers.
555    ///
556    /// This is called automatically during Client construction (in `new()` and
557    /// `with_capabilities()`), so you don't need to call it manually.
558    ///
559    /// ## How It Works
560    ///
561    /// The handlers are synchronous closures that spawn async tasks to do the
562    /// actual work. This allows the dispatcher to continue routing messages
563    /// without blocking on handler execution.
564    fn register_dispatcher_handlers(&self) {
565        let dispatcher = self.inner.protocol.dispatcher();
566        let client_for_requests = self.clone();
567        let client_for_notifications = self.clone();
568
569        // Request handler (elicitation, sampling, etc.)
570        let semaphore = Arc::clone(&self.inner.handler_semaphore);
571        let request_handler = Arc::new(move |request: JsonRpcRequest| {
572            let client = client_for_requests.clone();
573            let method = request.method.clone();
574            let req_id = request.id.clone();
575            let semaphore = Arc::clone(&semaphore);
576
577            // ✅ Spawn async task with bounded concurrency
578            tokio::spawn(async move {
579                // Acquire permit (blocks if 100 requests already in flight)
580                let _permit = semaphore
581                    .acquire()
582                    .await
583                    .expect("Semaphore should not be closed");
584
585                tracing::debug!(
586                    "🔄 [request_handler] Handling server-initiated request: method={}, id={:?}",
587                    method,
588                    req_id
589                );
590                if let Err(e) = client.handle_request(request).await {
591                    tracing::error!(
592                        "❌ [request_handler] Error handling server request '{}': {}",
593                        method,
594                        e
595                    );
596                    tracing::error!("   Request ID: {:?}", req_id);
597                    tracing::error!("   Error kind: {:?}", e.kind);
598                } else {
599                    tracing::debug!(
600                        "✅ [request_handler] Successfully handled server request: method={}, id={:?}",
601                        method,
602                        req_id
603                    );
604                }
605                // Permit automatically released on drop ✅
606            });
607            Ok(())
608        });
609
610        // Notification handler
611        let semaphore_notif = Arc::clone(&self.inner.handler_semaphore);
612        let notification_handler = Arc::new(move |notification: JsonRpcNotification| {
613            let client = client_for_notifications.clone();
614            let semaphore = Arc::clone(&semaphore_notif);
615
616            // ✅ Spawn async task with bounded concurrency
617            tokio::spawn(async move {
618                // Acquire permit (blocks if 100 handlers already in flight)
619                let _permit = semaphore
620                    .acquire()
621                    .await
622                    .expect("Semaphore should not be closed");
623
624                if let Err(e) = client.handle_notification(notification).await {
625                    tracing::error!("Error handling server notification: {}", e);
626                }
627                // Permit automatically released on drop ✅
628            });
629            Ok(())
630        });
631
632        // Register handlers synchronously - no race condition!
633        // The set_* methods are now synchronous with std::sync::Mutex
634        dispatcher.set_request_handler(request_handler);
635        dispatcher.set_notification_handler(notification_handler);
636        tracing::debug!("Dispatcher handlers registered successfully");
637    }
638
639    /// Handle server-initiated requests (elicitation, sampling, roots)
640    ///
641    /// This method is called by the MessageDispatcher when it receives a request
642    /// from the server. It routes the request to the appropriate handler based on
643    /// the method name.
644    async fn handle_request(&self, request: JsonRpcRequest) -> Result<()> {
645        match request.method.as_str() {
646            "sampling/createMessage" => {
647                let handler_opt = self
648                    .inner
649                    .sampling_handler
650                    .lock()
651                    .expect("sampling_handler mutex poisoned")
652                    .clone();
653                if let Some(handler) = handler_opt {
654                    // Extract request ID for proper correlation
655                    let request_id = match &request.id {
656                        turbomcp_protocol::MessageId::String(s) => s.clone(),
657                        turbomcp_protocol::MessageId::Number(n) => n.to_string(),
658                        turbomcp_protocol::MessageId::Uuid(u) => u.to_string(),
659                    };
660
661                    let params: CreateMessageRequest =
662                        serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
663                            .map_err(|e| {
664                            Error::protocol(format!("Invalid createMessage params: {}", e))
665                        })?;
666
667                    match handler.handle_create_message(request_id, params).await {
668                        Ok(result) => {
669                            let result_value = serde_json::to_value(result).map_err(|e| {
670                                Error::protocol(format!("Failed to serialize response: {}", e))
671                            })?;
672                            let response = JsonRpcResponse::success(result_value, request.id);
673                            self.send_response(response).await?;
674                        }
675                        Err(e) => {
676                            tracing::warn!(
677                                "⚠️  [handle_request] Sampling handler returned error: {}",
678                                e
679                            );
680
681                            // Preserve error semantics by checking actual error type
682                            // This allows proper error code propagation for retry logic
683                            let (code, message) = if let Some(handler_err) =
684                                e.downcast_ref::<HandlerError>()
685                            {
686                                // HandlerError has explicit JSON-RPC code mapping
687                                let json_err = handler_err.into_jsonrpc_error();
688                                tracing::info!(
689                                    "📋 [handle_request] HandlerError mapped to JSON-RPC code: {}",
690                                    json_err.code
691                                );
692                                (json_err.code, json_err.message)
693                            } else if let Some(proto_err) =
694                                e.downcast_ref::<turbomcp_protocol::Error>()
695                            {
696                                // Protocol errors have ErrorKind-based mapping
697                                tracing::info!(
698                                    "📋 [handle_request] Protocol error mapped to code: {}",
699                                    proto_err.jsonrpc_error_code()
700                                );
701                                (proto_err.jsonrpc_error_code(), proto_err.to_string())
702                            } else {
703                                // Generic errors default to Internal (-32603)
704                                // Log the error type for debugging (should rarely hit this path)
705                                tracing::warn!(
706                                    "📋 [handle_request] Sampling handler returned unknown error type (not HandlerError or Protocol error): {}",
707                                    std::any::type_name_of_val(&*e)
708                                );
709                                (-32603, format!("Sampling handler error: {}", e))
710                            };
711
712                            let error = turbomcp_protocol::jsonrpc::JsonRpcError {
713                                code,
714                                message,
715                                data: None,
716                            };
717                            let response =
718                                JsonRpcResponse::error_response(error, request.id.clone());
719
720                            tracing::info!(
721                                "🔄 [handle_request] Attempting to send error response for request: {:?}",
722                                request.id
723                            );
724                            self.send_response(response).await?;
725                            tracing::info!(
726                                "✅ [handle_request] Error response sent successfully for request: {:?}",
727                                request.id
728                            );
729                        }
730                    }
731                } else {
732                    // No handler configured
733                    let error = turbomcp_protocol::jsonrpc::JsonRpcError {
734                        code: -32601,
735                        message: "Sampling not supported".to_string(),
736                        data: None,
737                    };
738                    let response = JsonRpcResponse::error_response(error, request.id);
739                    self.send_response(response).await?;
740                }
741            }
742            "roots/list" => {
743                // Handle roots/list request from server
744                // Clone the handler Arc to avoid holding mutex across await
745                let handler_opt = self
746                    .inner
747                    .handlers
748                    .lock()
749                    .expect("handlers mutex poisoned")
750                    .roots
751                    .clone();
752
753                let roots_result = if let Some(handler) = handler_opt {
754                    handler.handle_roots_request().await
755                } else {
756                    // No handler - return empty list per MCP spec
757                    Ok(Vec::new())
758                };
759
760                match roots_result {
761                    Ok(roots) => {
762                        let result_value =
763                            serde_json::to_value(turbomcp_protocol::types::ListRootsResult {
764                                roots,
765                                _meta: None,
766                            })
767                            .map_err(|e| {
768                                Error::protocol(format!(
769                                    "Failed to serialize roots response: {}",
770                                    e
771                                ))
772                            })?;
773                        let response = JsonRpcResponse::success(result_value, request.id);
774                        self.send_response(response).await?;
775                    }
776                    Err(e) => {
777                        // FIXED: Extract error code from HandlerError instead of hardcoding -32603
778                        // Roots handler returns HandlerResult<Vec<Root>>, so error is HandlerError
779                        let json_err = e.into_jsonrpc_error();
780                        let response = JsonRpcResponse::error_response(json_err, request.id);
781                        self.send_response(response).await?;
782                    }
783                }
784            }
785            "elicitation/create" => {
786                // Clone handler Arc before await to avoid holding mutex across await
787                let handler_opt = self
788                    .inner
789                    .handlers
790                    .lock()
791                    .expect("handlers mutex poisoned")
792                    .elicitation
793                    .clone();
794                if let Some(handler) = handler_opt {
795                    // Parse elicitation request params as MCP protocol type
796                    let proto_request: turbomcp_protocol::types::ElicitRequest =
797                        serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
798                            .map_err(|e| {
799                            Error::protocol(format!("Invalid elicitation params: {}", e))
800                        })?;
801
802                    // Wrap protocol request with ID for handler (preserves type safety!)
803                    let handler_request =
804                        crate::handlers::ElicitationRequest::new(request.id.clone(), proto_request);
805
806                    // Call the registered elicitation handler
807                    match handler.handle_elicitation(handler_request).await {
808                        Ok(elicit_response) => {
809                            // Convert handler response back to protocol type
810                            let proto_result = elicit_response.into_protocol();
811                            let result_value = serde_json::to_value(proto_result).map_err(|e| {
812                                Error::protocol(format!(
813                                    "Failed to serialize elicitation response: {}",
814                                    e
815                                ))
816                            })?;
817                            let response = JsonRpcResponse::success(result_value, request.id);
818                            self.send_response(response).await?;
819                        }
820                        Err(e) => {
821                            // Convert handler error to JSON-RPC error using centralized mapping
822                            let response =
823                                JsonRpcResponse::error_response(e.into_jsonrpc_error(), request.id);
824                            self.send_response(response).await?;
825                        }
826                    }
827                } else {
828                    // No handler configured - elicitation not supported
829                    let error = turbomcp_protocol::jsonrpc::JsonRpcError {
830                        code: -32601,
831                        message: "Elicitation not supported - no handler registered".to_string(),
832                        data: None,
833                    };
834                    let response = JsonRpcResponse::error_response(error, request.id);
835                    self.send_response(response).await?;
836                }
837            }
838            _ => {
839                // Unknown method
840                let error = turbomcp_protocol::jsonrpc::JsonRpcError {
841                    code: -32601,
842                    message: format!("Method not found: {}", request.method),
843                    data: None,
844                };
845                let response = JsonRpcResponse::error_response(error, request.id);
846                self.send_response(response).await?;
847            }
848        }
849        Ok(())
850    }
851
852    /// Handle server-initiated notifications
853    ///
854    /// Routes notifications to appropriate handlers based on method name.
855    /// MCP defines several notification types that servers can send to clients:
856    ///
857    /// - `notifications/progress` - Progress updates for long-running operations
858    /// - `notifications/message` - Log messages from server
859    /// - `notifications/resources/updated` - Resource content changed
860    /// - `notifications/resources/list_changed` - Resource list changed
861    async fn handle_notification(&self, notification: JsonRpcNotification) -> Result<()> {
862        match notification.method.as_str() {
863            "notifications/progress" => {
864                // Progress tracking has been removed
865                tracing::debug!(
866                    "Received progress notification - progress tracking has been removed"
867                );
868            }
869
870            "notifications/message" => {
871                // Route to log handler
872                let handler_opt = self
873                    .inner
874                    .handlers
875                    .lock()
876                    .expect("handlers mutex poisoned")
877                    .get_log_handler();
878
879                if let Some(handler) = handler_opt {
880                    // Parse log message
881                    let log: crate::handlers::LoggingNotification = serde_json::from_value(
882                        notification.params.unwrap_or(serde_json::Value::Null),
883                    )
884                    .map_err(|e| Error::protocol(format!("Invalid log notification: {}", e)))?;
885
886                    // Call handler
887                    if let Err(e) = handler.handle_log(log).await {
888                        tracing::error!("Log handler error: {}", e);
889                    }
890                } else {
891                    tracing::debug!("Received log notification but no handler registered");
892                }
893            }
894
895            "notifications/resources/updated" => {
896                // Route to resource update handler
897                let handler_opt = self
898                    .inner
899                    .handlers
900                    .lock()
901                    .expect("handlers mutex poisoned")
902                    .get_resource_update_handler();
903
904                if let Some(handler) = handler_opt {
905                    // Parse resource update notification
906                    let update: crate::handlers::ResourceUpdatedNotification =
907                        serde_json::from_value(
908                            notification.params.unwrap_or(serde_json::Value::Null),
909                        )
910                        .map_err(|e| {
911                            Error::protocol(format!("Invalid resource update notification: {}", e))
912                        })?;
913
914                    // Call handler
915                    if let Err(e) = handler.handle_resource_update(update).await {
916                        tracing::error!("Resource update handler error: {}", e);
917                    }
918                } else {
919                    tracing::debug!(
920                        "Received resource update notification but no handler registered"
921                    );
922                }
923            }
924
925            "notifications/resources/list_changed" => {
926                // Route to resource list changed handler
927                let handler_opt = self
928                    .inner
929                    .handlers
930                    .lock()
931                    .expect("handlers mutex poisoned")
932                    .get_resource_list_changed_handler();
933
934                if let Some(handler) = handler_opt {
935                    if let Err(e) = handler.handle_resource_list_changed().await {
936                        tracing::error!("Resource list changed handler error: {}", e);
937                    }
938                } else {
939                    tracing::debug!(
940                        "Resource list changed notification received (no handler registered)"
941                    );
942                }
943            }
944
945            "notifications/prompts/list_changed" => {
946                // Route to prompt list changed handler
947                let handler_opt = self
948                    .inner
949                    .handlers
950                    .lock()
951                    .expect("handlers mutex poisoned")
952                    .get_prompt_list_changed_handler();
953
954                if let Some(handler) = handler_opt {
955                    if let Err(e) = handler.handle_prompt_list_changed().await {
956                        tracing::error!("Prompt list changed handler error: {}", e);
957                    }
958                } else {
959                    tracing::debug!(
960                        "Prompt list changed notification received (no handler registered)"
961                    );
962                }
963            }
964
965            "notifications/tools/list_changed" => {
966                // Route to tool list changed handler
967                let handler_opt = self
968                    .inner
969                    .handlers
970                    .lock()
971                    .expect("handlers mutex poisoned")
972                    .get_tool_list_changed_handler();
973
974                if let Some(handler) = handler_opt {
975                    if let Err(e) = handler.handle_tool_list_changed().await {
976                        tracing::error!("Tool list changed handler error: {}", e);
977                    }
978                } else {
979                    tracing::debug!(
980                        "Tool list changed notification received (no handler registered)"
981                    );
982                }
983            }
984
985            "notifications/cancelled" => {
986                // Route to cancellation handler
987                let handler_opt = self
988                    .inner
989                    .handlers
990                    .lock()
991                    .expect("handlers mutex poisoned")
992                    .get_cancellation_handler();
993
994                if let Some(handler) = handler_opt {
995                    // Parse cancellation notification
996                    let cancellation: crate::handlers::CancelledNotification =
997                        serde_json::from_value(
998                            notification.params.unwrap_or(serde_json::Value::Null),
999                        )
1000                        .map_err(|e| {
1001                            Error::protocol(format!("Invalid cancellation notification: {}", e))
1002                        })?;
1003
1004                    // Call handler
1005                    if let Err(e) = handler.handle_cancellation(cancellation).await {
1006                        tracing::error!("Cancellation handler error: {}", e);
1007                    }
1008                } else {
1009                    tracing::debug!("Cancellation notification received (no handler registered)");
1010                }
1011            }
1012
1013            _ => {
1014                // Unknown notification type
1015                tracing::debug!("Received unknown notification: {}", notification.method);
1016            }
1017        }
1018
1019        Ok(())
1020    }
1021
1022    async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
1023        tracing::info!(
1024            "📤 [send_response] Sending JSON-RPC response: id={:?}",
1025            response.id
1026        );
1027
1028        let payload = serde_json::to_vec(&response).map_err(|e| {
1029            tracing::error!("❌ [send_response] Failed to serialize response: {}", e);
1030            Error::protocol(format!("Failed to serialize response: {}", e))
1031        })?;
1032
1033        tracing::debug!(
1034            "📤 [send_response] Response payload: {} bytes",
1035            payload.len()
1036        );
1037        tracing::debug!(
1038            "📤 [send_response] Response JSON: {}",
1039            String::from_utf8_lossy(&payload)
1040        );
1041
1042        let message = TransportMessage::new(
1043            turbomcp_protocol::MessageId::from("response".to_string()),
1044            payload.into(),
1045        );
1046
1047        self.inner
1048            .protocol
1049            .transport()
1050            .send(message)
1051            .await
1052            .map_err(|e| {
1053                tracing::error!("❌ [send_response] Transport send failed: {}", e);
1054                Error::transport(format!("Failed to send response: {}", e))
1055            })?;
1056
1057        tracing::info!(
1058            "✅ [send_response] Response sent successfully: id={:?}",
1059            response.id
1060        );
1061        Ok(())
1062    }
1063
1064    /// Initialize the connection with the MCP server
1065    ///
1066    /// Performs the initialization handshake with the server, negotiating capabilities
1067    /// and establishing the protocol version. This method must be called before
1068    /// any other operations can be performed.
1069    ///
1070    /// # Returns
1071    ///
1072    /// Returns an `InitializeResult` containing server information and negotiated capabilities.
1073    ///
1074    /// # Errors
1075    ///
1076    /// Returns an error if:
1077    /// - The transport connection fails
1078    /// - The server rejects the initialization request
1079    /// - Protocol negotiation fails
1080    ///
1081    /// # Examples
1082    ///
1083    /// ```rust,no_run
1084    /// # use turbomcp_client::Client;
1085    /// # use turbomcp_transport::stdio::StdioTransport;
1086    /// # async fn example() -> turbomcp_protocol::Result<()> {
1087    /// let mut client = Client::new(StdioTransport::new());
1088    ///
1089    /// let result = client.initialize().await?;
1090    /// println!("Server: {} v{}", result.server_info.name, result.server_info.version);
1091    /// # Ok(())
1092    /// # }
1093    /// ```
1094    pub async fn initialize(&self) -> Result<InitializeResult> {
1095        // Auto-connect transport if not already connected
1096        // This provides consistent DX across all transports (Stdio, TCP, HTTP, WebSocket, Unix)
1097        let transport = self.inner.protocol.transport();
1098        let transport_state = transport.state().await;
1099        if !matches!(
1100            transport_state,
1101            turbomcp_transport::TransportState::Connected
1102        ) {
1103            tracing::debug!(
1104                "Auto-connecting transport (current state: {:?})",
1105                transport_state
1106            );
1107            transport
1108                .connect()
1109                .await
1110                .map_err(|e| Error::transport(format!("Failed to connect transport: {}", e)))?;
1111            tracing::info!("Transport connected successfully");
1112        }
1113
1114        // Build client capabilities based on registered handlers (automatic detection)
1115        let mut client_caps = ProtocolClientCapabilities::default();
1116
1117        // Detect sampling capability from handler
1118        if let Some(sampling_caps) = self.get_sampling_capabilities() {
1119            client_caps.sampling = Some(sampling_caps);
1120        }
1121
1122        // Detect elicitation capability from handler
1123        if let Some(elicitation_caps) = self.get_elicitation_capabilities() {
1124            client_caps.elicitation = Some(elicitation_caps);
1125        }
1126
1127        // Detect roots capability from handler
1128        if let Some(roots_caps) = self.get_roots_capabilities() {
1129            client_caps.roots = Some(roots_caps);
1130        }
1131
1132        // Send MCP initialization request
1133        let request = InitializeRequest {
1134            protocol_version: PROTOCOL_VERSION.to_string(),
1135            capabilities: client_caps,
1136            client_info: turbomcp_protocol::types::Implementation {
1137                name: "turbomcp-client".to_string(),
1138                version: env!("CARGO_PKG_VERSION").to_string(),
1139                title: Some("TurboMCP Client".to_string()),
1140                ..Default::default()
1141            },
1142            _meta: None,
1143        };
1144
1145        let protocol_response: ProtocolInitializeResult = self
1146            .inner
1147            .protocol
1148            .request("initialize", Some(serde_json::to_value(request)?))
1149            .await?;
1150
1151        // AtomicBool: lock-free store with Ordering::Relaxed
1152        self.inner.initialized.store(true, Ordering::Relaxed);
1153
1154        // Send initialized notification
1155        self.inner
1156            .protocol
1157            .notify("notifications/initialized", None)
1158            .await?;
1159
1160        // Convert protocol response to client response type
1161        Ok(InitializeResult {
1162            server_info: protocol_response.server_info,
1163            server_capabilities: protocol_response.capabilities,
1164        })
1165    }
1166
1167    /// Execute a protocol method with plugin middleware
1168    ///
1169    /// This is a generic helper for wrapping protocol calls with plugin middleware.
1170    pub(crate) async fn execute_with_plugins<R>(
1171        &self,
1172        method_name: &str,
1173        params: Option<serde_json::Value>,
1174    ) -> Result<R>
1175    where
1176        R: serde::de::DeserializeOwned + serde::Serialize + Clone,
1177    {
1178        // Create JSON-RPC request for plugin context
1179        let json_rpc_request = turbomcp_protocol::jsonrpc::JsonRpcRequest {
1180            jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
1181            id: turbomcp_protocol::MessageId::Number(1),
1182            method: method_name.to_string(),
1183            params: params.clone(),
1184        };
1185
1186        // 1. Create request context for plugins
1187        let mut req_ctx =
1188            crate::plugins::RequestContext::new(json_rpc_request, std::collections::HashMap::new());
1189
1190        // 2. Execute before_request plugin middleware
1191        if let Err(e) = self
1192            .inner
1193            .plugin_registry
1194            .lock()
1195            .await
1196            .execute_before_request(&mut req_ctx)
1197            .await
1198        {
1199            return Err(Error::bad_request(format!(
1200                "Plugin before_request failed: {}",
1201                e
1202            )));
1203        }
1204
1205        // 3. Execute the actual protocol call
1206        let start_time = std::time::Instant::now();
1207        let protocol_result: Result<R> = self
1208            .inner
1209            .protocol
1210            .request(method_name, req_ctx.params().cloned())
1211            .await;
1212        let duration = start_time.elapsed();
1213
1214        // 4. Prepare response context
1215        let mut resp_ctx = match protocol_result {
1216            Ok(ref response) => {
1217                let response_value = serde_json::to_value(response.clone())?;
1218                crate::plugins::ResponseContext::new(req_ctx, Some(response_value), None, duration)
1219            }
1220            Err(ref e) => {
1221                crate::plugins::ResponseContext::new(req_ctx, None, Some(*e.clone()), duration)
1222            }
1223        };
1224
1225        // 5. Execute after_response plugin middleware
1226        if let Err(e) = self
1227            .inner
1228            .plugin_registry
1229            .lock()
1230            .await
1231            .execute_after_response(&mut resp_ctx)
1232            .await
1233        {
1234            return Err(Error::bad_request(format!(
1235                "Plugin after_response failed: {}",
1236                e
1237            )));
1238        }
1239
1240        // 6. Return the final result, checking for plugin modifications
1241        match protocol_result {
1242            Ok(ref response) => {
1243                // Check if plugins modified the response
1244                if let Some(modified_response) = resp_ctx.response {
1245                    // Try to deserialize the modified response
1246                    if let Ok(modified_result) =
1247                        serde_json::from_value::<R>(modified_response.clone())
1248                    {
1249                        return Ok(modified_result);
1250                    }
1251                }
1252
1253                // No plugin modifications, use original response
1254                Ok(response.clone())
1255            }
1256            Err(e) => {
1257                // Check if plugins provided an error recovery response
1258                if let Some(recovery_response) = resp_ctx.response {
1259                    if let Ok(recovery_result) = serde_json::from_value::<R>(recovery_response) {
1260                        Ok(recovery_result)
1261                    } else {
1262                        Err(e)
1263                    }
1264                } else {
1265                    Err(e)
1266                }
1267            }
1268        }
1269    }
1270
1271    /// Subscribe to resource change notifications
1272    ///
1273    /// Registers interest in receiving notifications when the specified
1274    /// resource changes. The server will send notifications when the
1275    /// resource is modified, created, or deleted.
1276    ///
1277    /// # Arguments
1278    ///
1279    /// * `uri` - The URI of the resource to monitor
1280    ///
1281    /// # Returns
1282    ///
1283    /// Returns `EmptyResult` on successful subscription.
1284    ///
1285    /// # Errors
1286    ///
1287    /// Returns an error if:
1288    /// - The client is not initialized
1289    /// - The URI is invalid or empty
1290    /// - The server doesn't support subscriptions
1291    /// - The request fails
1292    ///
1293    /// # Examples
1294    ///
1295    /// ```rust,no_run
1296    /// # use turbomcp_client::Client;
1297    /// # use turbomcp_transport::stdio::StdioTransport;
1298    /// # async fn example() -> turbomcp_protocol::Result<()> {
1299    /// let mut client = Client::new(StdioTransport::new());
1300    /// client.initialize().await?;
1301    ///
1302    /// // Subscribe to file changes
1303    /// client.subscribe("file:///watch/directory").await?;
1304    /// println!("Subscribed to resource changes");
1305    /// # Ok(())
1306    /// # }
1307    /// ```
1308    pub async fn subscribe(&self, uri: &str) -> Result<EmptyResult> {
1309        if !self.inner.initialized.load(Ordering::Relaxed) {
1310            return Err(Error::bad_request("Client not initialized"));
1311        }
1312
1313        if uri.is_empty() {
1314            return Err(Error::bad_request("Subscription URI cannot be empty"));
1315        }
1316
1317        // Send resources/subscribe request with plugin middleware
1318        let request = SubscribeRequest {
1319            uri: uri.to_string(),
1320        };
1321
1322        self.execute_with_plugins(
1323            "resources/subscribe",
1324            Some(serde_json::to_value(request).map_err(|e| {
1325                Error::protocol(format!("Failed to serialize subscribe request: {}", e))
1326            })?),
1327        )
1328        .await
1329    }
1330
1331    /// Unsubscribe from resource change notifications
1332    ///
1333    /// Cancels a previous subscription to resource changes. After unsubscribing,
1334    /// the client will no longer receive notifications for the specified resource.
1335    ///
1336    /// # Arguments
1337    ///
1338    /// * `uri` - The URI of the resource to stop monitoring
1339    ///
1340    /// # Returns
1341    ///
1342    /// Returns `EmptyResult` on successful unsubscription.
1343    ///
1344    /// # Errors
1345    ///
1346    /// Returns an error if:
1347    /// - The client is not initialized
1348    /// - The URI is invalid or empty
1349    /// - No active subscription exists for the URI
1350    /// - The request fails
1351    ///
1352    /// # Examples
1353    ///
1354    /// ```rust,no_run
1355    /// # use turbomcp_client::Client;
1356    /// # use turbomcp_transport::stdio::StdioTransport;
1357    /// # async fn example() -> turbomcp_protocol::Result<()> {
1358    /// let mut client = Client::new(StdioTransport::new());
1359    /// client.initialize().await?;
1360    ///
1361    /// // Unsubscribe from file changes
1362    /// client.unsubscribe("file:///watch/directory").await?;
1363    /// println!("Unsubscribed from resource changes");
1364    /// # Ok(())
1365    /// # }
1366    /// ```
1367    pub async fn unsubscribe(&self, uri: &str) -> Result<EmptyResult> {
1368        if !self.inner.initialized.load(Ordering::Relaxed) {
1369            return Err(Error::bad_request("Client not initialized"));
1370        }
1371
1372        if uri.is_empty() {
1373            return Err(Error::bad_request("Unsubscription URI cannot be empty"));
1374        }
1375
1376        // Send resources/unsubscribe request with plugin middleware
1377        let request = UnsubscribeRequest {
1378            uri: uri.to_string(),
1379        };
1380
1381        self.execute_with_plugins(
1382            "resources/unsubscribe",
1383            Some(serde_json::to_value(request).map_err(|e| {
1384                Error::protocol(format!("Failed to serialize unsubscribe request: {}", e))
1385            })?),
1386        )
1387        .await
1388    }
1389
1390    /// Get the client's capabilities configuration
1391    #[must_use]
1392    pub fn capabilities(&self) -> &ClientCapabilities {
1393        &self.inner.capabilities
1394    }
1395
1396    /// Initialize all registered plugins
1397    ///
1398    /// This should be called after registration but before using the client.
1399    pub async fn initialize_plugins(&self) -> Result<()> {
1400        // Set up client context for plugins with actual client capabilities
1401        let mut capabilities = std::collections::HashMap::new();
1402        capabilities.insert(
1403            "protocol_version".to_string(),
1404            serde_json::json!("2024-11-05"),
1405        );
1406        capabilities.insert(
1407            "mcp_version".to_string(),
1408            serde_json::json!(env!("CARGO_PKG_VERSION")),
1409        );
1410        capabilities.insert(
1411            "supports_notifications".to_string(),
1412            serde_json::json!(true),
1413        );
1414        capabilities.insert(
1415            "supports_sampling".to_string(),
1416            serde_json::json!(self.has_sampling_handler()),
1417        );
1418        capabilities.insert("supports_progress".to_string(), serde_json::json!(true));
1419        capabilities.insert("supports_roots".to_string(), serde_json::json!(true));
1420
1421        // Extract client configuration
1422        let mut config = std::collections::HashMap::new();
1423        config.insert(
1424            "client_name".to_string(),
1425            serde_json::json!("turbomcp-client"),
1426        );
1427        config.insert(
1428            "initialized".to_string(),
1429            serde_json::json!(self.inner.initialized.load(Ordering::Relaxed)),
1430        );
1431        config.insert(
1432            "plugin_count".to_string(),
1433            serde_json::json!(self.inner.plugin_registry.lock().await.plugin_count()),
1434        );
1435
1436        let context = crate::plugins::PluginContext::new(
1437            "turbomcp-client".to_string(),
1438            env!("CARGO_PKG_VERSION").to_string(),
1439            capabilities,
1440            config,
1441            vec![], // Will be populated by the registry
1442        );
1443
1444        self.inner
1445            .plugin_registry
1446            .lock()
1447            .await
1448            .set_client_context(context);
1449
1450        // Note: Individual plugins are initialized automatically during registration
1451        // via PluginRegistry::register_plugin(). This method ensures the registry
1452        // has proper client context for any future plugin registrations.
1453        Ok(())
1454    }
1455
1456    /// Cleanup all registered plugins
1457    ///
1458    /// This should be called when the client is being shut down.
1459    pub async fn cleanup_plugins(&self) -> Result<()> {
1460        // Clear the plugin registry - plugins will be dropped and cleaned up automatically
1461        // The Rust ownership system ensures proper cleanup when the Arc<dyn ClientPlugin>
1462        // references are dropped.
1463
1464        // Note: The plugin system uses RAII (Resource Acquisition Is Initialization)
1465        // pattern where plugins clean up their resources in their Drop implementation.
1466        // Replace the registry with a fresh one (mutex ensures safe access)
1467        *self.inner.plugin_registry.lock().await = crate::plugins::PluginRegistry::new();
1468        Ok(())
1469    }
1470
1471    // ============================================================================
1472    // Tasks API Methods (MCP 2025-11-25 Draft - SEP-1686)
1473    // ============================================================================
1474
1475    /// Retrieve the status of a task (tasks/get)
1476    ///
1477    /// Polls the server for the current status of a specific task.
1478    ///
1479    /// # Arguments
1480    ///
1481    /// * `task_id` - The ID of the task to query
1482    ///
1483    /// # Returns
1484    ///
1485    /// Returns the current `Task` state including status, timestamps, and messages.
1486    #[cfg(feature = "mcp-tasks")]
1487    pub async fn get_task(&self, task_id: &str) -> Result<Task> {
1488        let request = GetTaskRequest {
1489            task_id: task_id.to_string(),
1490        };
1491
1492        self.execute_with_plugins(
1493            "tasks/get",
1494            Some(serde_json::to_value(request).map_err(|e| {
1495                Error::protocol(format!("Failed to serialize get_task request: {}", e))
1496            })?),
1497        )
1498        .await
1499    }
1500
1501    /// Cancel a running task (tasks/cancel)
1502    ///
1503    /// Attempts to cancel a task execution. This is a best-effort operation.
1504    ///
1505    /// # Arguments
1506    ///
1507    /// * `task_id` - The ID of the task to cancel
1508    ///
1509    /// # Returns
1510    ///
1511    /// Returns the updated `Task` state (typically with status "cancelled").
1512    #[cfg(feature = "mcp-tasks")]
1513    pub async fn cancel_task(&self, task_id: &str) -> Result<Task> {
1514        let request = CancelTaskRequest {
1515            task_id: task_id.to_string(),
1516        };
1517
1518        self.execute_with_plugins(
1519            "tasks/cancel",
1520            Some(serde_json::to_value(request).map_err(|e| {
1521                Error::protocol(format!("Failed to serialize cancel_task request: {}", e))
1522            })?),
1523        )
1524        .await
1525    }
1526
1527    /// List all tasks (tasks/list)
1528    ///
1529    /// Retrieves a paginated list of tasks known to the server.
1530    ///
1531    /// # Arguments
1532    ///
1533    /// * `cursor` - Optional pagination cursor from a previous response
1534    /// * `limit` - Optional maximum number of tasks to return
1535    ///
1536    /// # Returns
1537    ///
1538    /// Returns a `ListTasksResult` containing the list of tasks and next cursor.
1539    #[cfg(feature = "mcp-tasks")]
1540    pub async fn list_tasks(
1541        &self,
1542        cursor: Option<String>,
1543        limit: Option<usize>,
1544    ) -> Result<ListTasksResult> {
1545        let request = ListTasksRequest { cursor, limit };
1546
1547        self.execute_with_plugins(
1548            "tasks/list",
1549            Some(serde_json::to_value(request).map_err(|e| {
1550                Error::protocol(format!("Failed to serialize list_tasks request: {}", e))
1551            })?),
1552        )
1553        .await
1554    }
1555
1556    /// Retrieve the result of a completed task (tasks/result)
1557    ///
1558    /// Blocks until the task reaches a terminal state (completed, failed, or cancelled),
1559    /// then returns the operation result.
1560    ///
1561    /// # Arguments
1562    ///
1563    /// * `task_id` - The ID of the task to retrieve results for
1564    ///
1565    /// # Returns
1566    ///
1567    /// Returns a `GetTaskPayloadResult` containing the operation result (e.g. CallToolResult).
1568    #[cfg(feature = "mcp-tasks")]
1569    pub async fn get_task_result(&self, task_id: &str) -> Result<GetTaskPayloadResult> {
1570        let request = GetTaskPayloadRequest {
1571            task_id: task_id.to_string(),
1572        };
1573
1574        self.execute_with_plugins(
1575            "tasks/result",
1576            Some(serde_json::to_value(request).map_err(|e| {
1577                Error::protocol(format!(
1578                    "Failed to serialize get_task_result request: {}",
1579                    e
1580                ))
1581            })?),
1582        )
1583        .await
1584    }
1585
1586    // Note: Capability detection methods (has_*_handler, get_*_capabilities)
1587    // are defined in their respective operation modules:
1588    // - sampling.rs: has_sampling_handler, get_sampling_capabilities
1589    // - handlers.rs: has_elicitation_handler, has_roots_handler
1590    //
1591    // Additional capability getters for elicitation and roots added below
1592    // since they're used during initialization
1593
1594    /// Get elicitation capabilities if handler is registered
1595    /// Automatically detects capability based on registered handler
1596    fn get_elicitation_capabilities(
1597        &self,
1598    ) -> Option<turbomcp_protocol::types::ElicitationCapabilities> {
1599        if self.has_elicitation_handler() {
1600            // Currently returns default capabilities. In the future, schema_validation support
1601            // could be detected from handler traits by adding a HasSchemaValidation marker trait
1602            // that handlers could implement. For now, handlers validate schemas themselves.
1603            Some(turbomcp_protocol::types::ElicitationCapabilities::default())
1604        } else {
1605            None
1606        }
1607    }
1608
1609    /// Get roots capabilities if handler is registered
1610    fn get_roots_capabilities(&self) -> Option<turbomcp_protocol::types::RootsCapabilities> {
1611        if self.has_roots_handler() {
1612            // Roots capabilities indicate whether list can change
1613            Some(turbomcp_protocol::types::RootsCapabilities {
1614                list_changed: Some(true), // Support dynamic roots by default
1615            })
1616        } else {
1617            None
1618        }
1619    }
1620}