turbomcp_client/client/
core.rs

1//! Core Client implementation for MCP communication
2//!
3//! This module contains the main `Client<T>` struct and its implementation,
4//! providing the core MCP client functionality including:
5//!
6//! - Connection initialization and lifecycle management
7//! - Message processing and bidirectional communication
8//! - MCP operation support (tools, prompts, resources, sampling, etc.)
9//! - Plugin middleware integration
10//! - Handler registration and management
11//!
12//! # Architecture
13//!
14//! `Client<T>` is implemented as a cheaply-cloneable Arc wrapper with interior
15//! mutability (same pattern as reqwest and AWS SDK):
16//!
17//! - **AtomicBool** for initialized flag (lock-free)
18//! - **Arc<Mutex<...>>** for handlers/plugins (infrequent mutation)
19//! - **`Arc<ClientInner<T>>`** for cheap cloning
20
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::{Arc, Mutex as StdMutex};
23
24use turbomcp_protocol::jsonrpc::*;
25use turbomcp_protocol::types::{
26    ClientCapabilities as ProtocolClientCapabilities, InitializeResult as ProtocolInitializeResult,
27    *,
28};
29use turbomcp_protocol::{Error, PROTOCOL_VERSION, Result};
30use turbomcp_transport::{Transport, TransportMessage};
31
32use super::config::InitializeResult;
33use super::protocol::ProtocolClient;
34use crate::{
35    ClientCapabilities,
36    handlers::{HandlerError, HandlerRegistry},
37    sampling::SamplingHandler,
38};
39
40/// Inner client state with interior mutability
41///
42/// This structure contains the actual client state and is wrapped in Arc<...>
43/// to enable cheap cloning (same pattern as reqwest and AWS SDK).
44pub(super) struct ClientInner<T: Transport + 'static> {
45    /// Protocol client for low-level communication
46    pub(super) protocol: ProtocolClient<T>,
47
48    /// Client capabilities (immutable after construction)
49    pub(super) capabilities: ClientCapabilities,
50
51    /// Initialization state (lock-free atomic boolean)
52    pub(super) initialized: AtomicBool,
53
54    /// Optional sampling handler (mutex for dynamic updates)
55    pub(super) sampling_handler: Arc<StdMutex<Option<Arc<dyn SamplingHandler>>>>,
56
57    /// Handler registry for bidirectional communication (mutex for registration)
58    pub(super) handlers: Arc<StdMutex<HandlerRegistry>>,
59
60    /// Plugin registry for middleware (tokio mutex - holds across await)
61    pub(super) plugin_registry: Arc<tokio::sync::Mutex<crate::plugins::PluginRegistry>>,
62}
63
64/// The core MCP client implementation
65///
66/// Client provides a comprehensive interface for communicating with MCP servers,
67/// supporting all protocol features including tools, prompts, resources, sampling,
68/// elicitation, and bidirectional communication patterns.
69///
70/// # Clone Pattern
71///
72/// `Client<T>` is cheaply cloneable via Arc (same pattern as reqwest and AWS SDK).
73/// All clones share the same underlying connection and state:
74///
75/// ```rust,no_run
76/// use turbomcp_client::Client;
77/// use turbomcp_transport::stdio::StdioTransport;
78///
79/// # async fn example() -> turbomcp_protocol::Result<()> {
80/// let client = Client::new(StdioTransport::new());
81/// client.initialize().await?;
82///
83/// // Cheap clone - shares same connection
84/// let client2 = client.clone();
85/// tokio::spawn(async move {
86///     client2.list_tools().await.ok();
87/// });
88/// # Ok(())
89/// # }
90/// ```
91///
92/// The client must be initialized before use by calling `initialize()` to perform
93/// the MCP handshake and capability negotiation.
94///
95/// # Features
96///
97/// - **Protocol Compliance**: Full MCP 2025-06-18 specification support
98/// - **Bidirectional Communication**: Server-initiated requests and client responses
99/// - **Plugin Middleware**: Extensible request/response processing
100/// - **Handler Registry**: Callbacks for server-initiated operations
101/// - **Connection Management**: Robust error handling and recovery
102/// - **Type Safety**: Compile-time guarantees for MCP message types
103/// - **Cheap Cloning**: Arc-based sharing like reqwest/AWS SDK
104///
105/// # Examples
106///
107/// ```rust,no_run
108/// use turbomcp_client::Client;
109/// use turbomcp_transport::stdio::StdioTransport;
110/// use std::collections::HashMap;
111///
112/// # async fn example() -> turbomcp_protocol::Result<()> {
113/// // Create and initialize client (no mut needed!)
114/// let client = Client::new(StdioTransport::new());
115/// let init_result = client.initialize().await?;
116/// println!("Connected to: {}", init_result.server_info.name);
117///
118/// // Use MCP operations
119/// let tools = client.list_tools().await?;
120/// let mut args = HashMap::new();
121/// args.insert("input".to_string(), serde_json::json!("test"));
122/// let result = client.call_tool("my_tool", Some(args)).await?;
123/// # Ok(())
124/// # }
125/// ```
126pub struct Client<T: Transport + 'static> {
127    pub(super) inner: Arc<ClientInner<T>>,
128}
129
130/// Clone implementation via Arc (same pattern as reqwest/AWS SDK)
131///
132/// Cloning a Client is cheap (just an Arc clone) and all clones share
133/// the same underlying connection and state.
134impl<T: Transport + 'static> Clone for Client<T> {
135    fn clone(&self) -> Self {
136        Self {
137            inner: Arc::clone(&self.inner),
138        }
139    }
140}
141
142impl<T: Transport + 'static> Drop for ClientInner<T> {
143    fn drop(&mut self) {
144        // Best-effort cleanup if shutdown() wasn't called
145        // This is a safety net, but applications SHOULD call shutdown() explicitly
146
147        // Single warning to catch attention
148        tracing::warn!(
149            "MCP Client dropped without explicit shutdown() - call client.shutdown().await for clean resource cleanup"
150        );
151
152        // Details at debug level to avoid noise in production
153        tracing::debug!("   📘 RECOMMENDATION: Call client.shutdown().await before dropping");
154        tracing::debug!(
155            "   🐛 IMPACT: Background tasks (especially WebSocket reconnection) may continue running"
156        );
157        tracing::debug!("   💡 See client shutdown documentation for best practices");
158
159        // We can shutdown the dispatcher (it's synchronous)
160        self.protocol.dispatcher().shutdown();
161        tracing::debug!("   ✅ Message dispatcher stopped");
162
163        // ⚠️  LIMITATION: Cannot call transport.disconnect() from Drop because it's async
164        //    This means:
165        //    - WebSocket: Close frame not sent, reconnection tasks keep running
166        //    - HTTP: Connections may not close cleanly
167        //    - All transports: Background tasks may be orphaned
168        tracing::debug!("   ❌ Transport NOT disconnected (Drop cannot call async methods)");
169        tracing::debug!("   ⚠️  WebSocket reconnection and other background tasks may continue");
170    }
171}
172
173impl<T: Transport + 'static> Client<T> {
174    /// Create a new client with the specified transport
175    ///
176    /// Creates a new MCP client instance with default capabilities.
177    /// The client must be initialized before use by calling `initialize()`.
178    ///
179    /// # Arguments
180    ///
181    /// * `transport` - The transport implementation to use for communication
182    ///
183    /// # Examples
184    ///
185    /// ```rust,no_run
186    /// use turbomcp_client::Client;
187    /// use turbomcp_transport::stdio::StdioTransport;
188    ///
189    /// let transport = StdioTransport::new();
190    /// let client = Client::new(transport);
191    /// ```
192    pub fn new(transport: T) -> Self {
193        let client = Self {
194            inner: Arc::new(ClientInner {
195                protocol: ProtocolClient::new(transport),
196                capabilities: ClientCapabilities::default(),
197                initialized: AtomicBool::new(false),
198                sampling_handler: Arc::new(StdMutex::new(None)),
199                handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
200                plugin_registry: Arc::new(tokio::sync::Mutex::new(
201                    crate::plugins::PluginRegistry::new(),
202                )),
203            }),
204        };
205
206        // Register dispatcher handlers for bidirectional communication
207        client.register_dispatcher_handlers();
208
209        client
210    }
211
212    /// Create a new client with custom capabilities
213    ///
214    /// # Arguments
215    ///
216    /// * `transport` - The transport implementation to use
217    /// * `capabilities` - The client capabilities to negotiate
218    ///
219    /// # Examples
220    ///
221    /// ```rust,no_run
222    /// use turbomcp_client::{Client, ClientCapabilities};
223    /// use turbomcp_transport::stdio::StdioTransport;
224    ///
225    /// let capabilities = ClientCapabilities {
226    ///     tools: true,
227    ///     prompts: true,
228    ///     resources: false,
229    ///     sampling: false,
230    /// };
231    ///
232    /// let transport = StdioTransport::new();
233    /// let client = Client::with_capabilities(transport, capabilities);
234    /// ```
235    pub fn with_capabilities(transport: T, capabilities: ClientCapabilities) -> Self {
236        let client = Self {
237            inner: Arc::new(ClientInner {
238                protocol: ProtocolClient::new(transport),
239                capabilities,
240                initialized: AtomicBool::new(false),
241                sampling_handler: Arc::new(StdMutex::new(None)),
242                handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
243                plugin_registry: Arc::new(tokio::sync::Mutex::new(
244                    crate::plugins::PluginRegistry::new(),
245                )),
246            }),
247        };
248
249        // Register dispatcher handlers for bidirectional communication
250        client.register_dispatcher_handlers();
251
252        client
253    }
254
255    /// Shutdown the client and clean up all resources
256    ///
257    /// This method performs a **graceful shutdown** of the MCP client by:
258    /// 1. Stopping the message dispatcher background task
259    /// 2. Disconnecting the transport (closes connection, stops background tasks)
260    ///
261    /// **CRITICAL**: After calling `shutdown()`, the client can no longer be used.
262    ///
263    /// # Why This Method Exists
264    ///
265    /// The `Drop` implementation cannot call async methods like `transport.disconnect()`,
266    /// which is required for proper cleanup of WebSocket connections and background tasks.
267    /// Without calling `shutdown()`, WebSocket reconnection tasks will continue running.
268    ///
269    /// # Best Practices
270    ///
271    /// - **Always call `shutdown()` before dropping** for clean resource cleanup
272    /// - For applications: call in signal handlers (`SIGINT`, `SIGTERM`)
273    /// - For tests: call in cleanup/teardown
274    /// - If forgotten, Drop will log warnings and do best-effort cleanup
275    ///
276    /// # Examples
277    ///
278    /// ```rust,no_run
279    /// # use turbomcp_client::Client;
280    /// # use turbomcp_transport::stdio::StdioTransport;
281    /// # async fn example() -> turbomcp_protocol::Result<()> {
282    /// let client = Client::new(StdioTransport::new());
283    /// client.initialize().await?;
284    ///
285    /// // Use client...
286    /// let _tools = client.list_tools().await?;
287    ///
288    /// // ✅ Clean shutdown
289    /// client.shutdown().await?;
290    /// # Ok(())
291    /// # }
292    /// ```
293    pub async fn shutdown(&self) -> Result<()> {
294        tracing::info!("🛑 Shutting down MCP client");
295
296        // 1. Shutdown message dispatcher
297        self.inner.protocol.dispatcher().shutdown();
298        tracing::debug!("✅ Message dispatcher stopped");
299
300        // 2. Disconnect transport (WebSocket: stops reconnection, HTTP: closes connections)
301        match self.inner.protocol.transport().disconnect().await {
302            Ok(()) => {
303                tracing::info!("✅ Transport disconnected successfully");
304            }
305            Err(e) => {
306                tracing::warn!("Transport disconnect error (may already be closed): {}", e);
307            }
308        }
309
310        tracing::info!("✅ MCP client shutdown complete");
311        Ok(())
312    }
313}
314
315// ============================================================================
316// HTTP-Specific Convenience Constructors (Feature-Gated)
317// ============================================================================
318
319#[cfg(feature = "http")]
320impl Client<turbomcp_transport::streamable_http_client::StreamableHttpClientTransport> {
321    /// Connect to an HTTP MCP server (convenience method)
322    ///
323    /// This is a beautiful one-liner alternative to manual configuration.
324    /// Creates an HTTP client, connects, and initializes in one call.
325    ///
326    /// # Arguments
327    ///
328    /// * `url` - The base URL of the MCP server (e.g., "http://localhost:8080")
329    ///
330    /// # Returns
331    ///
332    /// Returns an initialized `Client` ready to use.
333    ///
334    /// # Errors
335    ///
336    /// Returns an error if:
337    /// - The URL is invalid
338    /// - Connection to the server fails
339    /// - Initialization handshake fails
340    ///
341    /// # Examples
342    ///
343    /// ```rust,no_run
344    /// use turbomcp_client::Client;
345    ///
346    /// # async fn example() -> turbomcp_protocol::Result<()> {
347    /// // Beautiful one-liner - balanced with server DX
348    /// let client = Client::connect_http("http://localhost:8080").await?;
349    ///
350    /// // Now use it directly
351    /// let tools = client.list_tools().await?;
352    /// # Ok(())
353    /// # }
354    /// ```
355    ///
356    /// Compare to verbose approach (10+ lines):
357    /// ```rust,no_run
358    /// use turbomcp_client::Client;
359    /// use turbomcp_transport::streamable_http_client::{
360    ///     StreamableHttpClientConfig, StreamableHttpClientTransport
361    /// };
362    ///
363    /// # async fn example() -> turbomcp_protocol::Result<()> {
364    /// let config = StreamableHttpClientConfig {
365    ///     base_url: "http://localhost:8080".to_string(),
366    ///     ..Default::default()
367    /// };
368    /// let transport = StreamableHttpClientTransport::new(config);
369    /// let client = Client::new(transport);
370    /// client.initialize().await?;
371    /// # Ok(())
372    /// # }
373    /// ```
374    pub async fn connect_http(url: impl Into<String>) -> Result<Self> {
375        use turbomcp_transport::streamable_http_client::{
376            StreamableHttpClientConfig, StreamableHttpClientTransport,
377        };
378
379        let config = StreamableHttpClientConfig {
380            base_url: url.into(),
381            ..Default::default()
382        };
383
384        let transport = StreamableHttpClientTransport::new(config);
385        let client = Self::new(transport);
386
387        // Initialize connection immediately
388        client.initialize().await?;
389
390        Ok(client)
391    }
392
393    /// Connect to an HTTP MCP server with custom configuration
394    ///
395    /// Provides more control than `connect_http()` while still being ergonomic.
396    ///
397    /// # Arguments
398    ///
399    /// * `url` - The base URL of the MCP server
400    /// * `config_fn` - Function to customize the configuration
401    ///
402    /// # Examples
403    ///
404    /// ```rust,no_run
405    /// use turbomcp_client::Client;
406    /// use std::time::Duration;
407    ///
408    /// # async fn example() -> turbomcp_protocol::Result<()> {
409    /// let client = Client::connect_http_with("http://localhost:8080", |config| {
410    ///     config.timeout = Duration::from_secs(60);
411    ///     config.endpoint_path = "/api/mcp".to_string();
412    /// }).await?;
413    /// # Ok(())
414    /// # }
415    /// ```
416    pub async fn connect_http_with<F>(url: impl Into<String>, config_fn: F) -> Result<Self>
417    where
418        F: FnOnce(&mut turbomcp_transport::streamable_http_client::StreamableHttpClientConfig),
419    {
420        use turbomcp_transport::streamable_http_client::{
421            StreamableHttpClientConfig, StreamableHttpClientTransport,
422        };
423
424        let mut config = StreamableHttpClientConfig {
425            base_url: url.into(),
426            ..Default::default()
427        };
428
429        config_fn(&mut config);
430
431        let transport = StreamableHttpClientTransport::new(config);
432        let client = Self::new(transport);
433
434        client.initialize().await?;
435
436        Ok(client)
437    }
438}
439
440// ============================================================================
441// TCP-Specific Convenience Constructors (Feature-Gated)
442// ============================================================================
443
444#[cfg(feature = "tcp")]
445impl Client<turbomcp_transport::tcp::TcpTransport> {
446    /// Connect to a TCP MCP server (convenience method)
447    ///
448    /// Beautiful one-liner for TCP connections - balanced DX.
449    ///
450    /// # Arguments
451    ///
452    /// * `addr` - Server address (e.g., "127.0.0.1:8765" or localhost:8765")
453    ///
454    /// # Returns
455    ///
456    /// Returns an initialized `Client` ready to use.
457    ///
458    /// # Examples
459    ///
460    /// ```rust,no_run
461    /// # #[cfg(feature = "tcp")]
462    /// use turbomcp_client::Client;
463    ///
464    /// # async fn example() -> turbomcp_protocol::Result<()> {
465    /// let client = Client::connect_tcp("127.0.0.1:8765").await?;
466    /// let tools = client.list_tools().await?;
467    /// # Ok(())
468    /// # }
469    /// ```
470    pub async fn connect_tcp(addr: impl AsRef<str>) -> Result<Self> {
471        use std::net::SocketAddr;
472        use turbomcp_transport::tcp::TcpTransport;
473
474        let server_addr: SocketAddr = addr
475            .as_ref()
476            .parse()
477            .map_err(|e| Error::bad_request(format!("Invalid address: {}", e)))?;
478
479        // Client binds to 0.0.0.0:0 (any available port)
480        let bind_addr: SocketAddr = if server_addr.is_ipv6() {
481            "[::]:0".parse().unwrap()
482        } else {
483            "0.0.0.0:0".parse().unwrap()
484        };
485
486        let transport = TcpTransport::new_client(bind_addr, server_addr);
487        let client = Self::new(transport);
488
489        client.initialize().await?;
490
491        Ok(client)
492    }
493}
494
495// ============================================================================
496// Unix Socket-Specific Convenience Constructors (Feature-Gated)
497// ============================================================================
498
499#[cfg(all(unix, feature = "unix"))]
500impl Client<turbomcp_transport::unix::UnixTransport> {
501    /// Connect to a Unix socket MCP server (convenience method)
502    ///
503    /// Beautiful one-liner for Unix socket IPC - balanced DX.
504    ///
505    /// # Arguments
506    ///
507    /// * `path` - Socket file path (e.g., "/tmp/mcp.sock")
508    ///
509    /// # Returns
510    ///
511    /// Returns an initialized `Client` ready to use.
512    ///
513    /// # Examples
514    ///
515    /// ```rust,no_run
516    /// # #[cfg(all(unix, feature = "unix"))]
517    /// use turbomcp_client::Client;
518    ///
519    /// # async fn example() -> turbomcp_protocol::Result<()> {
520    /// let client = Client::connect_unix("/tmp/mcp.sock").await?;
521    /// let tools = client.list_tools().await?;
522    /// # Ok(())
523    /// # }
524    /// ```
525    pub async fn connect_unix(path: impl Into<std::path::PathBuf>) -> Result<Self> {
526        use turbomcp_transport::unix::UnixTransport;
527
528        let transport = UnixTransport::new_client(path.into());
529        let client = Self::new(transport);
530
531        client.initialize().await?;
532
533        Ok(client)
534    }
535}
536
537impl<T: Transport + 'static> Client<T> {
538    /// Register message handlers with the dispatcher
539    ///
540    /// This method sets up the callbacks that handle server-initiated requests
541    /// and notifications. The dispatcher's background task routes incoming
542    /// messages to these handlers.
543    ///
544    /// This is called automatically during Client construction (in `new()` and
545    /// `with_capabilities()`), so you don't need to call it manually.
546    ///
547    /// ## How It Works
548    ///
549    /// The handlers are synchronous closures that spawn async tasks to do the
550    /// actual work. This allows the dispatcher to continue routing messages
551    /// without blocking on handler execution.
552    fn register_dispatcher_handlers(&self) {
553        let dispatcher = self.inner.protocol.dispatcher();
554        let client_for_requests = self.clone();
555        let client_for_notifications = self.clone();
556
557        // Request handler (elicitation, sampling, etc.)
558        let request_handler = Arc::new(move |request: JsonRpcRequest| {
559            let client = client_for_requests.clone();
560            let method = request.method.clone();
561            let req_id = request.id.clone();
562            // Spawn async task to handle the request
563            tokio::spawn(async move {
564                tracing::debug!(
565                    "🔄 [request_handler] Handling server-initiated request: method={}, id={:?}",
566                    method,
567                    req_id
568                );
569                if let Err(e) = client.handle_request(request).await {
570                    tracing::error!(
571                        "❌ [request_handler] Error handling server request '{}': {}",
572                        method,
573                        e
574                    );
575                    tracing::error!("   Request ID: {:?}", req_id);
576                    tracing::error!("   Error kind: {:?}", e.kind);
577                } else {
578                    tracing::debug!(
579                        "✅ [request_handler] Successfully handled server request: method={}, id={:?}",
580                        method,
581                        req_id
582                    );
583                }
584            });
585            Ok(())
586        });
587
588        // Notification handler
589        let notification_handler = Arc::new(move |notification: JsonRpcNotification| {
590            let client = client_for_notifications.clone();
591            // Spawn async task to handle the notification
592            tokio::spawn(async move {
593                if let Err(e) = client.handle_notification(notification).await {
594                    tracing::error!("Error handling server notification: {}", e);
595                }
596            });
597            Ok(())
598        });
599
600        // Register handlers synchronously - no race condition!
601        // The set_* methods are now synchronous with std::sync::Mutex
602        dispatcher.set_request_handler(request_handler);
603        dispatcher.set_notification_handler(notification_handler);
604        tracing::debug!("Dispatcher handlers registered successfully");
605    }
606
607    /// Handle server-initiated requests (elicitation, sampling, roots)
608    ///
609    /// This method is called by the MessageDispatcher when it receives a request
610    /// from the server. It routes the request to the appropriate handler based on
611    /// the method name.
612    async fn handle_request(&self, request: JsonRpcRequest) -> Result<()> {
613        match request.method.as_str() {
614            "sampling/createMessage" => {
615                let handler_opt = self
616                    .inner
617                    .sampling_handler
618                    .lock()
619                    .expect("sampling_handler mutex poisoned")
620                    .clone();
621                if let Some(handler) = handler_opt {
622                    // Extract request ID for proper correlation
623                    let request_id = match &request.id {
624                        turbomcp_protocol::MessageId::String(s) => s.clone(),
625                        turbomcp_protocol::MessageId::Number(n) => n.to_string(),
626                        turbomcp_protocol::MessageId::Uuid(u) => u.to_string(),
627                    };
628
629                    let params: CreateMessageRequest =
630                        serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
631                            .map_err(|e| {
632                            Error::protocol(format!("Invalid createMessage params: {}", e))
633                        })?;
634
635                    match handler.handle_create_message(request_id, params).await {
636                        Ok(result) => {
637                            let result_value = serde_json::to_value(result).map_err(|e| {
638                                Error::protocol(format!("Failed to serialize response: {}", e))
639                            })?;
640                            let response = JsonRpcResponse::success(result_value, request.id);
641                            self.send_response(response).await?;
642                        }
643                        Err(e) => {
644                            tracing::warn!(
645                                "⚠️  [handle_request] Sampling handler returned error: {}",
646                                e
647                            );
648
649                            // Preserve error semantics by checking actual error type
650                            // This allows proper error code propagation for retry logic
651                            let (code, message) = if let Some(handler_err) =
652                                e.downcast_ref::<HandlerError>()
653                            {
654                                // HandlerError has explicit JSON-RPC code mapping
655                                let json_err = handler_err.into_jsonrpc_error();
656                                tracing::info!(
657                                    "📋 [handle_request] HandlerError mapped to JSON-RPC code: {}",
658                                    json_err.code
659                                );
660                                (json_err.code, json_err.message)
661                            } else if let Some(proto_err) =
662                                e.downcast_ref::<turbomcp_protocol::Error>()
663                            {
664                                // Protocol errors have ErrorKind-based mapping
665                                tracing::info!(
666                                    "📋 [handle_request] Protocol error mapped to code: {}",
667                                    proto_err.jsonrpc_error_code()
668                                );
669                                (proto_err.jsonrpc_error_code(), proto_err.to_string())
670                            } else {
671                                // Generic errors default to Internal (-32603)
672                                // Log the error type for debugging (should rarely hit this path)
673                                tracing::warn!(
674                                    "📋 [handle_request] Sampling handler returned unknown error type (not HandlerError or Protocol error): {}",
675                                    std::any::type_name_of_val(&*e)
676                                );
677                                (-32603, format!("Sampling handler error: {}", e))
678                            };
679
680                            let error = turbomcp_protocol::jsonrpc::JsonRpcError {
681                                code,
682                                message,
683                                data: None,
684                            };
685                            let response =
686                                JsonRpcResponse::error_response(error, request.id.clone());
687
688                            tracing::info!(
689                                "🔄 [handle_request] Attempting to send error response for request: {:?}",
690                                request.id
691                            );
692                            self.send_response(response).await?;
693                            tracing::info!(
694                                "✅ [handle_request] Error response sent successfully for request: {:?}",
695                                request.id
696                            );
697                        }
698                    }
699                } else {
700                    // No handler configured
701                    let error = turbomcp_protocol::jsonrpc::JsonRpcError {
702                        code: -32601,
703                        message: "Sampling not supported".to_string(),
704                        data: None,
705                    };
706                    let response = JsonRpcResponse::error_response(error, request.id);
707                    self.send_response(response).await?;
708                }
709            }
710            "roots/list" => {
711                // Handle roots/list request from server
712                // Clone the handler Arc to avoid holding mutex across await
713                let handler_opt = self
714                    .inner
715                    .handlers
716                    .lock()
717                    .expect("handlers mutex poisoned")
718                    .roots
719                    .clone();
720
721                let roots_result = if let Some(handler) = handler_opt {
722                    handler.handle_roots_request().await
723                } else {
724                    // No handler - return empty list per MCP spec
725                    Ok(Vec::new())
726                };
727
728                match roots_result {
729                    Ok(roots) => {
730                        let result_value =
731                            serde_json::to_value(turbomcp_protocol::types::ListRootsResult {
732                                roots,
733                                _meta: None,
734                            })
735                            .map_err(|e| {
736                                Error::protocol(format!(
737                                    "Failed to serialize roots response: {}",
738                                    e
739                                ))
740                            })?;
741                        let response = JsonRpcResponse::success(result_value, request.id);
742                        self.send_response(response).await?;
743                    }
744                    Err(e) => {
745                        // FIXED: Extract error code from HandlerError instead of hardcoding -32603
746                        // Roots handler returns HandlerResult<Vec<Root>>, so error is HandlerError
747                        let json_err = e.into_jsonrpc_error();
748                        let response = JsonRpcResponse::error_response(json_err, request.id);
749                        self.send_response(response).await?;
750                    }
751                }
752            }
753            "elicitation/create" => {
754                // Clone handler Arc before await to avoid holding mutex across await
755                let handler_opt = self
756                    .inner
757                    .handlers
758                    .lock()
759                    .expect("handlers mutex poisoned")
760                    .elicitation
761                    .clone();
762                if let Some(handler) = handler_opt {
763                    // Parse elicitation request params as MCP protocol type
764                    let proto_request: turbomcp_protocol::types::ElicitRequest =
765                        serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
766                            .map_err(|e| {
767                            Error::protocol(format!("Invalid elicitation params: {}", e))
768                        })?;
769
770                    // Wrap protocol request with ID for handler (preserves type safety!)
771                    let handler_request =
772                        crate::handlers::ElicitationRequest::new(request.id.clone(), proto_request);
773
774                    // Call the registered elicitation handler
775                    match handler.handle_elicitation(handler_request).await {
776                        Ok(elicit_response) => {
777                            // Convert handler response back to protocol type
778                            let proto_result = elicit_response.into_protocol();
779                            let result_value = serde_json::to_value(proto_result).map_err(|e| {
780                                Error::protocol(format!(
781                                    "Failed to serialize elicitation response: {}",
782                                    e
783                                ))
784                            })?;
785                            let response = JsonRpcResponse::success(result_value, request.id);
786                            self.send_response(response).await?;
787                        }
788                        Err(e) => {
789                            // Convert handler error to JSON-RPC error using centralized mapping
790                            let response =
791                                JsonRpcResponse::error_response(e.into_jsonrpc_error(), request.id);
792                            self.send_response(response).await?;
793                        }
794                    }
795                } else {
796                    // No handler configured - elicitation not supported
797                    let error = turbomcp_protocol::jsonrpc::JsonRpcError {
798                        code: -32601,
799                        message: "Elicitation not supported - no handler registered".to_string(),
800                        data: None,
801                    };
802                    let response = JsonRpcResponse::error_response(error, request.id);
803                    self.send_response(response).await?;
804                }
805            }
806            _ => {
807                // Unknown method
808                let error = turbomcp_protocol::jsonrpc::JsonRpcError {
809                    code: -32601,
810                    message: format!("Method not found: {}", request.method),
811                    data: None,
812                };
813                let response = JsonRpcResponse::error_response(error, request.id);
814                self.send_response(response).await?;
815            }
816        }
817        Ok(())
818    }
819
820    /// Handle server-initiated notifications
821    ///
822    /// Routes notifications to appropriate handlers based on method name.
823    /// MCP defines several notification types that servers can send to clients:
824    ///
825    /// - `notifications/progress` - Progress updates for long-running operations
826    /// - `notifications/message` - Log messages from server
827    /// - `notifications/resources/updated` - Resource content changed
828    /// - `notifications/resources/list_changed` - Resource list changed
829    async fn handle_notification(&self, notification: JsonRpcNotification) -> Result<()> {
830        match notification.method.as_str() {
831            "notifications/progress" => {
832                // Route to progress handler
833                let handler_opt = self
834                    .inner
835                    .handlers
836                    .lock()
837                    .expect("handlers mutex poisoned")
838                    .get_progress_handler();
839
840                if let Some(handler) = handler_opt {
841                    // Parse progress notification
842                    let progress: crate::handlers::ProgressNotification = serde_json::from_value(
843                        notification.params.unwrap_or(serde_json::Value::Null),
844                    )
845                    .map_err(|e| {
846                        Error::protocol(format!("Invalid progress notification: {}", e))
847                    })?;
848
849                    // Call handler (errors are logged but don't fail the flow)
850                    if let Err(e) = handler.handle_progress(progress).await {
851                        tracing::error!("Progress handler error: {}", e);
852                    }
853                } else {
854                    tracing::debug!("Received progress notification but no handler registered");
855                }
856            }
857
858            "notifications/message" => {
859                // Route to log handler
860                let handler_opt = self
861                    .inner
862                    .handlers
863                    .lock()
864                    .expect("handlers mutex poisoned")
865                    .get_log_handler();
866
867                if let Some(handler) = handler_opt {
868                    // Parse log message
869                    let log: crate::handlers::LoggingNotification = serde_json::from_value(
870                        notification.params.unwrap_or(serde_json::Value::Null),
871                    )
872                    .map_err(|e| Error::protocol(format!("Invalid log notification: {}", e)))?;
873
874                    // Call handler
875                    if let Err(e) = handler.handle_log(log).await {
876                        tracing::error!("Log handler error: {}", e);
877                    }
878                } else {
879                    tracing::debug!("Received log notification but no handler registered");
880                }
881            }
882
883            "notifications/resources/updated" => {
884                // Route to resource update handler
885                let handler_opt = self
886                    .inner
887                    .handlers
888                    .lock()
889                    .expect("handlers mutex poisoned")
890                    .get_resource_update_handler();
891
892                if let Some(handler) = handler_opt {
893                    // Parse resource update notification
894                    let update: crate::handlers::ResourceUpdatedNotification =
895                        serde_json::from_value(
896                            notification.params.unwrap_or(serde_json::Value::Null),
897                        )
898                        .map_err(|e| {
899                            Error::protocol(format!("Invalid resource update notification: {}", e))
900                        })?;
901
902                    // Call handler
903                    if let Err(e) = handler.handle_resource_update(update).await {
904                        tracing::error!("Resource update handler error: {}", e);
905                    }
906                } else {
907                    tracing::debug!(
908                        "Received resource update notification but no handler registered"
909                    );
910                }
911            }
912
913            "notifications/resources/list_changed" => {
914                // Route to resource list changed handler
915                let handler_opt = self
916                    .inner
917                    .handlers
918                    .lock()
919                    .expect("handlers mutex poisoned")
920                    .get_resource_list_changed_handler();
921
922                if let Some(handler) = handler_opt {
923                    if let Err(e) = handler.handle_resource_list_changed().await {
924                        tracing::error!("Resource list changed handler error: {}", e);
925                    }
926                } else {
927                    tracing::debug!(
928                        "Resource list changed notification received (no handler registered)"
929                    );
930                }
931            }
932
933            "notifications/prompts/list_changed" => {
934                // Route to prompt list changed handler
935                let handler_opt = self
936                    .inner
937                    .handlers
938                    .lock()
939                    .expect("handlers mutex poisoned")
940                    .get_prompt_list_changed_handler();
941
942                if let Some(handler) = handler_opt {
943                    if let Err(e) = handler.handle_prompt_list_changed().await {
944                        tracing::error!("Prompt list changed handler error: {}", e);
945                    }
946                } else {
947                    tracing::debug!(
948                        "Prompt list changed notification received (no handler registered)"
949                    );
950                }
951            }
952
953            "notifications/tools/list_changed" => {
954                // Route to tool list changed handler
955                let handler_opt = self
956                    .inner
957                    .handlers
958                    .lock()
959                    .expect("handlers mutex poisoned")
960                    .get_tool_list_changed_handler();
961
962                if let Some(handler) = handler_opt {
963                    if let Err(e) = handler.handle_tool_list_changed().await {
964                        tracing::error!("Tool list changed handler error: {}", e);
965                    }
966                } else {
967                    tracing::debug!(
968                        "Tool list changed notification received (no handler registered)"
969                    );
970                }
971            }
972
973            "notifications/cancelled" => {
974                // Route to cancellation handler
975                let handler_opt = self
976                    .inner
977                    .handlers
978                    .lock()
979                    .expect("handlers mutex poisoned")
980                    .get_cancellation_handler();
981
982                if let Some(handler) = handler_opt {
983                    // Parse cancellation notification
984                    let cancellation: crate::handlers::CancelledNotification =
985                        serde_json::from_value(
986                            notification.params.unwrap_or(serde_json::Value::Null),
987                        )
988                        .map_err(|e| {
989                            Error::protocol(format!("Invalid cancellation notification: {}", e))
990                        })?;
991
992                    // Call handler
993                    if let Err(e) = handler.handle_cancellation(cancellation).await {
994                        tracing::error!("Cancellation handler error: {}", e);
995                    }
996                } else {
997                    tracing::debug!("Cancellation notification received (no handler registered)");
998                }
999            }
1000
1001            _ => {
1002                // Unknown notification type
1003                tracing::debug!("Received unknown notification: {}", notification.method);
1004            }
1005        }
1006
1007        Ok(())
1008    }
1009
1010    async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
1011        tracing::info!(
1012            "📤 [send_response] Sending JSON-RPC response: id={:?}",
1013            response.id
1014        );
1015
1016        let payload = serde_json::to_vec(&response).map_err(|e| {
1017            tracing::error!("❌ [send_response] Failed to serialize response: {}", e);
1018            Error::protocol(format!("Failed to serialize response: {}", e))
1019        })?;
1020
1021        tracing::debug!(
1022            "📤 [send_response] Response payload: {} bytes",
1023            payload.len()
1024        );
1025        tracing::debug!(
1026            "📤 [send_response] Response JSON: {}",
1027            String::from_utf8_lossy(&payload)
1028        );
1029
1030        let message = TransportMessage::new(
1031            turbomcp_protocol::MessageId::from("response".to_string()),
1032            payload.into(),
1033        );
1034
1035        self.inner
1036            .protocol
1037            .transport()
1038            .send(message)
1039            .await
1040            .map_err(|e| {
1041                tracing::error!("❌ [send_response] Transport send failed: {}", e);
1042                Error::transport(format!("Failed to send response: {}", e))
1043            })?;
1044
1045        tracing::info!(
1046            "✅ [send_response] Response sent successfully: id={:?}",
1047            response.id
1048        );
1049        Ok(())
1050    }
1051
1052    /// Initialize the connection with the MCP server
1053    ///
1054    /// Performs the initialization handshake with the server, negotiating capabilities
1055    /// and establishing the protocol version. This method must be called before
1056    /// any other operations can be performed.
1057    ///
1058    /// # Returns
1059    ///
1060    /// Returns an `InitializeResult` containing server information and negotiated capabilities.
1061    ///
1062    /// # Errors
1063    ///
1064    /// Returns an error if:
1065    /// - The transport connection fails
1066    /// - The server rejects the initialization request
1067    /// - Protocol negotiation fails
1068    ///
1069    /// # Examples
1070    ///
1071    /// ```rust,no_run
1072    /// # use turbomcp_client::Client;
1073    /// # use turbomcp_transport::stdio::StdioTransport;
1074    /// # async fn example() -> turbomcp_protocol::Result<()> {
1075    /// let mut client = Client::new(StdioTransport::new());
1076    ///
1077    /// let result = client.initialize().await?;
1078    /// println!("Server: {} v{}", result.server_info.name, result.server_info.version);
1079    /// # Ok(())
1080    /// # }
1081    /// ```
1082    pub async fn initialize(&self) -> Result<InitializeResult> {
1083        // Auto-connect transport if not already connected
1084        // This provides consistent DX across all transports (Stdio, TCP, HTTP, WebSocket, Unix)
1085        let transport = self.inner.protocol.transport();
1086        let transport_state = transport.state().await;
1087        if !matches!(
1088            transport_state,
1089            turbomcp_transport::TransportState::Connected
1090        ) {
1091            tracing::debug!(
1092                "Auto-connecting transport (current state: {:?})",
1093                transport_state
1094            );
1095            transport
1096                .connect()
1097                .await
1098                .map_err(|e| Error::transport(format!("Failed to connect transport: {}", e)))?;
1099            tracing::info!("Transport connected successfully");
1100        }
1101
1102        // Build client capabilities based on registered handlers (automatic detection)
1103        let mut client_caps = ProtocolClientCapabilities::default();
1104
1105        // Detect sampling capability from handler
1106        if let Some(sampling_caps) = self.get_sampling_capabilities() {
1107            client_caps.sampling = Some(sampling_caps);
1108        }
1109
1110        // Detect elicitation capability from handler
1111        if let Some(elicitation_caps) = self.get_elicitation_capabilities() {
1112            client_caps.elicitation = Some(elicitation_caps);
1113        }
1114
1115        // Detect roots capability from handler
1116        if let Some(roots_caps) = self.get_roots_capabilities() {
1117            client_caps.roots = Some(roots_caps);
1118        }
1119
1120        // Send MCP initialization request
1121        let request = InitializeRequest {
1122            protocol_version: PROTOCOL_VERSION.to_string(),
1123            capabilities: client_caps,
1124            client_info: turbomcp_protocol::types::Implementation {
1125                name: "turbomcp-client".to_string(),
1126                version: env!("CARGO_PKG_VERSION").to_string(),
1127                title: Some("TurboMCP Client".to_string()),
1128            },
1129            _meta: None,
1130        };
1131
1132        let protocol_response: ProtocolInitializeResult = self
1133            .inner
1134            .protocol
1135            .request("initialize", Some(serde_json::to_value(request)?))
1136            .await?;
1137
1138        // AtomicBool: lock-free store with Ordering::Relaxed
1139        self.inner.initialized.store(true, Ordering::Relaxed);
1140
1141        // Send initialized notification
1142        self.inner
1143            .protocol
1144            .notify("notifications/initialized", None)
1145            .await?;
1146
1147        // Convert protocol response to client response type
1148        Ok(InitializeResult {
1149            server_info: protocol_response.server_info,
1150            server_capabilities: protocol_response.capabilities,
1151        })
1152    }
1153
1154    /// Execute a protocol method with plugin middleware
1155    ///
1156    /// This is a generic helper for wrapping protocol calls with plugin middleware.
1157    pub(crate) async fn execute_with_plugins<R>(
1158        &self,
1159        method_name: &str,
1160        params: Option<serde_json::Value>,
1161    ) -> Result<R>
1162    where
1163        R: serde::de::DeserializeOwned + serde::Serialize + Clone,
1164    {
1165        // Create JSON-RPC request for plugin context
1166        let json_rpc_request = turbomcp_protocol::jsonrpc::JsonRpcRequest {
1167            jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
1168            id: turbomcp_protocol::MessageId::Number(1),
1169            method: method_name.to_string(),
1170            params: params.clone(),
1171        };
1172
1173        // 1. Create request context for plugins
1174        let mut req_ctx =
1175            crate::plugins::RequestContext::new(json_rpc_request, std::collections::HashMap::new());
1176
1177        // 2. Execute before_request plugin middleware
1178        if let Err(e) = self
1179            .inner
1180            .plugin_registry
1181            .lock()
1182            .await
1183            .execute_before_request(&mut req_ctx)
1184            .await
1185        {
1186            return Err(Error::bad_request(format!(
1187                "Plugin before_request failed: {}",
1188                e
1189            )));
1190        }
1191
1192        // 3. Execute the actual protocol call
1193        let start_time = std::time::Instant::now();
1194        let protocol_result: Result<R> = self
1195            .inner
1196            .protocol
1197            .request(method_name, req_ctx.params().cloned())
1198            .await;
1199        let duration = start_time.elapsed();
1200
1201        // 4. Prepare response context
1202        let mut resp_ctx = match protocol_result {
1203            Ok(ref response) => {
1204                let response_value = serde_json::to_value(response.clone())?;
1205                crate::plugins::ResponseContext::new(req_ctx, Some(response_value), None, duration)
1206            }
1207            Err(ref e) => {
1208                crate::plugins::ResponseContext::new(req_ctx, None, Some(*e.clone()), duration)
1209            }
1210        };
1211
1212        // 5. Execute after_response plugin middleware
1213        if let Err(e) = self
1214            .inner
1215            .plugin_registry
1216            .lock()
1217            .await
1218            .execute_after_response(&mut resp_ctx)
1219            .await
1220        {
1221            return Err(Error::bad_request(format!(
1222                "Plugin after_response failed: {}",
1223                e
1224            )));
1225        }
1226
1227        // 6. Return the final result, checking for plugin modifications
1228        match protocol_result {
1229            Ok(ref response) => {
1230                // Check if plugins modified the response
1231                if let Some(modified_response) = resp_ctx.response {
1232                    // Try to deserialize the modified response
1233                    if let Ok(modified_result) =
1234                        serde_json::from_value::<R>(modified_response.clone())
1235                    {
1236                        return Ok(modified_result);
1237                    }
1238                }
1239
1240                // No plugin modifications, use original response
1241                Ok(response.clone())
1242            }
1243            Err(e) => {
1244                // Check if plugins provided an error recovery response
1245                if let Some(recovery_response) = resp_ctx.response {
1246                    if let Ok(recovery_result) = serde_json::from_value::<R>(recovery_response) {
1247                        Ok(recovery_result)
1248                    } else {
1249                        Err(e)
1250                    }
1251                } else {
1252                    Err(e)
1253                }
1254            }
1255        }
1256    }
1257
1258    /// Subscribe to resource change notifications
1259    ///
1260    /// Registers interest in receiving notifications when the specified
1261    /// resource changes. The server will send notifications when the
1262    /// resource is modified, created, or deleted.
1263    ///
1264    /// # Arguments
1265    ///
1266    /// * `uri` - The URI of the resource to monitor
1267    ///
1268    /// # Returns
1269    ///
1270    /// Returns `EmptyResult` on successful subscription.
1271    ///
1272    /// # Errors
1273    ///
1274    /// Returns an error if:
1275    /// - The client is not initialized
1276    /// - The URI is invalid or empty
1277    /// - The server doesn't support subscriptions
1278    /// - The request fails
1279    ///
1280    /// # Examples
1281    ///
1282    /// ```rust,no_run
1283    /// # use turbomcp_client::Client;
1284    /// # use turbomcp_transport::stdio::StdioTransport;
1285    /// # async fn example() -> turbomcp_protocol::Result<()> {
1286    /// let mut client = Client::new(StdioTransport::new());
1287    /// client.initialize().await?;
1288    ///
1289    /// // Subscribe to file changes
1290    /// client.subscribe("file:///watch/directory").await?;
1291    /// println!("Subscribed to resource changes");
1292    /// # Ok(())
1293    /// # }
1294    /// ```
1295    pub async fn subscribe(&self, uri: &str) -> Result<EmptyResult> {
1296        if !self.inner.initialized.load(Ordering::Relaxed) {
1297            return Err(Error::bad_request("Client not initialized"));
1298        }
1299
1300        if uri.is_empty() {
1301            return Err(Error::bad_request("Subscription URI cannot be empty"));
1302        }
1303
1304        // Send resources/subscribe request with plugin middleware
1305        let request = SubscribeRequest {
1306            uri: uri.to_string(),
1307        };
1308
1309        self.execute_with_plugins(
1310            "resources/subscribe",
1311            Some(serde_json::to_value(request).map_err(|e| {
1312                Error::protocol(format!("Failed to serialize subscribe request: {}", e))
1313            })?),
1314        )
1315        .await
1316    }
1317
1318    /// Unsubscribe from resource change notifications
1319    ///
1320    /// Cancels a previous subscription to resource changes. After unsubscribing,
1321    /// the client will no longer receive notifications for the specified resource.
1322    ///
1323    /// # Arguments
1324    ///
1325    /// * `uri` - The URI of the resource to stop monitoring
1326    ///
1327    /// # Returns
1328    ///
1329    /// Returns `EmptyResult` on successful unsubscription.
1330    ///
1331    /// # Errors
1332    ///
1333    /// Returns an error if:
1334    /// - The client is not initialized
1335    /// - The URI is invalid or empty
1336    /// - No active subscription exists for the URI
1337    /// - The request fails
1338    ///
1339    /// # Examples
1340    ///
1341    /// ```rust,no_run
1342    /// # use turbomcp_client::Client;
1343    /// # use turbomcp_transport::stdio::StdioTransport;
1344    /// # async fn example() -> turbomcp_protocol::Result<()> {
1345    /// let mut client = Client::new(StdioTransport::new());
1346    /// client.initialize().await?;
1347    ///
1348    /// // Unsubscribe from file changes
1349    /// client.unsubscribe("file:///watch/directory").await?;
1350    /// println!("Unsubscribed from resource changes");
1351    /// # Ok(())
1352    /// # }
1353    /// ```
1354    pub async fn unsubscribe(&self, uri: &str) -> Result<EmptyResult> {
1355        if !self.inner.initialized.load(Ordering::Relaxed) {
1356            return Err(Error::bad_request("Client not initialized"));
1357        }
1358
1359        if uri.is_empty() {
1360            return Err(Error::bad_request("Unsubscription URI cannot be empty"));
1361        }
1362
1363        // Send resources/unsubscribe request with plugin middleware
1364        let request = UnsubscribeRequest {
1365            uri: uri.to_string(),
1366        };
1367
1368        self.execute_with_plugins(
1369            "resources/unsubscribe",
1370            Some(serde_json::to_value(request).map_err(|e| {
1371                Error::protocol(format!("Failed to serialize unsubscribe request: {}", e))
1372            })?),
1373        )
1374        .await
1375    }
1376
1377    /// Get the client's capabilities configuration
1378    pub fn capabilities(&self) -> &ClientCapabilities {
1379        &self.inner.capabilities
1380    }
1381
1382    /// Initialize all registered plugins
1383    ///
1384    /// This should be called after registration but before using the client.
1385    pub async fn initialize_plugins(&self) -> Result<()> {
1386        // Set up client context for plugins with actual client capabilities
1387        let mut capabilities = std::collections::HashMap::new();
1388        capabilities.insert(
1389            "protocol_version".to_string(),
1390            serde_json::json!("2024-11-05"),
1391        );
1392        capabilities.insert(
1393            "mcp_version".to_string(),
1394            serde_json::json!(env!("CARGO_PKG_VERSION")),
1395        );
1396        capabilities.insert(
1397            "supports_notifications".to_string(),
1398            serde_json::json!(true),
1399        );
1400        capabilities.insert(
1401            "supports_sampling".to_string(),
1402            serde_json::json!(self.has_sampling_handler()),
1403        );
1404        capabilities.insert("supports_progress".to_string(), serde_json::json!(true));
1405        capabilities.insert("supports_roots".to_string(), serde_json::json!(true));
1406
1407        // Extract client configuration
1408        let mut config = std::collections::HashMap::new();
1409        config.insert(
1410            "client_name".to_string(),
1411            serde_json::json!("turbomcp-client"),
1412        );
1413        config.insert(
1414            "initialized".to_string(),
1415            serde_json::json!(self.inner.initialized.load(Ordering::Relaxed)),
1416        );
1417        config.insert(
1418            "plugin_count".to_string(),
1419            serde_json::json!(self.inner.plugin_registry.lock().await.plugin_count()),
1420        );
1421
1422        let context = crate::plugins::PluginContext::new(
1423            "turbomcp-client".to_string(),
1424            env!("CARGO_PKG_VERSION").to_string(),
1425            capabilities,
1426            config,
1427            vec![], // Will be populated by the registry
1428        );
1429
1430        self.inner
1431            .plugin_registry
1432            .lock()
1433            .await
1434            .set_client_context(context);
1435
1436        // Note: Individual plugins are initialized automatically during registration
1437        // via PluginRegistry::register_plugin(). This method ensures the registry
1438        // has proper client context for any future plugin registrations.
1439        Ok(())
1440    }
1441
1442    /// Cleanup all registered plugins
1443    ///
1444    /// This should be called when the client is being shut down.
1445    pub async fn cleanup_plugins(&self) -> Result<()> {
1446        // Clear the plugin registry - plugins will be dropped and cleaned up automatically
1447        // The Rust ownership system ensures proper cleanup when the Arc<dyn ClientPlugin>
1448        // references are dropped.
1449
1450        // Note: The plugin system uses RAII (Resource Acquisition Is Initialization)
1451        // pattern where plugins clean up their resources in their Drop implementation.
1452        // Replace the registry with a fresh one (mutex ensures safe access)
1453        *self.inner.plugin_registry.lock().await = crate::plugins::PluginRegistry::new();
1454        Ok(())
1455    }
1456
1457    // Note: Capability detection methods (has_*_handler, get_*_capabilities)
1458    // are defined in their respective operation modules:
1459    // - sampling.rs: has_sampling_handler, get_sampling_capabilities
1460    // - handlers.rs: has_elicitation_handler, has_roots_handler
1461    //
1462    // Additional capability getters for elicitation and roots added below
1463    // since they're used during initialization
1464
1465    /// Get elicitation capabilities if handler is registered
1466    /// Automatically detects capability based on registered handler
1467    fn get_elicitation_capabilities(
1468        &self,
1469    ) -> Option<turbomcp_protocol::types::ElicitationCapabilities> {
1470        if self.has_elicitation_handler() {
1471            // Currently returns default capabilities. In the future, schema_validation support
1472            // could be detected from handler traits by adding a HasSchemaValidation marker trait
1473            // that handlers could implement. For now, handlers validate schemas themselves.
1474            Some(turbomcp_protocol::types::ElicitationCapabilities::default())
1475        } else {
1476            None
1477        }
1478    }
1479
1480    /// Get roots capabilities if handler is registered
1481    fn get_roots_capabilities(&self) -> Option<turbomcp_protocol::types::RootsCapabilities> {
1482        if self.has_roots_handler() {
1483            // Roots capabilities indicate whether list can change
1484            Some(turbomcp_protocol::types::RootsCapabilities {
1485                list_changed: Some(true), // Support dynamic roots by default
1486            })
1487        } else {
1488            None
1489        }
1490    }
1491}