turbomcp_client/client/
core.rs

1//! Core Client implementation for MCP communication
2//!
3//! This module contains the main `Client<T>` struct and its implementation,
4//! providing the core MCP client functionality including:
5//!
6//! - Connection initialization and lifecycle management
7//! - Message processing and bidirectional communication
8//! - MCP operation support (tools, prompts, resources, sampling, etc.)
9//! - Plugin middleware integration
10//! - Handler registration and management
11//!
12//! # Architecture
13//!
14//! `Client<T>` is implemented as a cheaply-cloneable Arc wrapper with interior
15//! mutability (same pattern as reqwest and AWS SDK):
16//!
17//! - **AtomicBool** for initialized flag (lock-free)
18//! - **Arc<Mutex<...>>** for handlers/plugins (infrequent mutation)
19//! - **`Arc<ClientInner<T>>`** for cheap cloning
20
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::{Arc, Mutex as StdMutex};
23
24use turbomcp_protocol::jsonrpc::*;
25use turbomcp_protocol::types::{
26    ClientCapabilities as ProtocolClientCapabilities, InitializeResult as ProtocolInitializeResult,
27    *,
28};
29use turbomcp_protocol::{Error, PROTOCOL_VERSION, Result};
30use turbomcp_transport::{Transport, TransportMessage};
31
32use super::config::InitializeResult;
33use super::protocol::ProtocolClient;
34use crate::{
35    ClientCapabilities,
36    handlers::{HandlerError, HandlerRegistry},
37    sampling::SamplingHandler,
38};
39
40/// Inner client state with interior mutability
41///
42/// This structure contains the actual client state and is wrapped in Arc<...>
43/// to enable cheap cloning (same pattern as reqwest and AWS SDK).
44pub(super) struct ClientInner<T: Transport + 'static> {
45    /// Protocol client for low-level communication
46    pub(super) protocol: ProtocolClient<T>,
47
48    /// Client capabilities (immutable after construction)
49    pub(super) capabilities: ClientCapabilities,
50
51    /// Initialization state (lock-free atomic boolean)
52    pub(super) initialized: AtomicBool,
53
54    /// Optional sampling handler (mutex for dynamic updates)
55    pub(super) sampling_handler: Arc<StdMutex<Option<Arc<dyn SamplingHandler>>>>,
56
57    /// Handler registry for bidirectional communication (mutex for registration)
58    pub(super) handlers: Arc<StdMutex<HandlerRegistry>>,
59
60    /// Plugin registry for middleware (tokio mutex - holds across await)
61    pub(super) plugin_registry: Arc<tokio::sync::Mutex<crate::plugins::PluginRegistry>>,
62}
63
64/// The core MCP client implementation
65///
66/// Client provides a comprehensive interface for communicating with MCP servers,
67/// supporting all protocol features including tools, prompts, resources, sampling,
68/// elicitation, and bidirectional communication patterns.
69///
70/// # Clone Pattern
71///
72/// `Client<T>` is cheaply cloneable via Arc (same pattern as reqwest and AWS SDK).
73/// All clones share the same underlying connection and state:
74///
75/// ```rust,no_run
76/// use turbomcp_client::Client;
77/// use turbomcp_transport::stdio::StdioTransport;
78///
79/// # async fn example() -> turbomcp_protocol::Result<()> {
80/// let client = Client::new(StdioTransport::new());
81/// client.initialize().await?;
82///
83/// // Cheap clone - shares same connection
84/// let client2 = client.clone();
85/// tokio::spawn(async move {
86///     client2.list_tools().await.ok();
87/// });
88/// # Ok(())
89/// # }
90/// ```
91///
92/// The client must be initialized before use by calling `initialize()` to perform
93/// the MCP handshake and capability negotiation.
94///
95/// # Features
96///
97/// - **Protocol Compliance**: Full MCP 2025-06-18 specification support
98/// - **Bidirectional Communication**: Server-initiated requests and client responses
99/// - **Plugin Middleware**: Extensible request/response processing
100/// - **Handler Registry**: Callbacks for server-initiated operations
101/// - **Connection Management**: Robust error handling and recovery
102/// - **Type Safety**: Compile-time guarantees for MCP message types
103/// - **Cheap Cloning**: Arc-based sharing like reqwest/AWS SDK
104///
105/// # Examples
106///
107/// ```rust,no_run
108/// use turbomcp_client::Client;
109/// use turbomcp_transport::stdio::StdioTransport;
110/// use std::collections::HashMap;
111///
112/// # async fn example() -> turbomcp_protocol::Result<()> {
113/// // Create and initialize client (no mut needed!)
114/// let client = Client::new(StdioTransport::new());
115/// let init_result = client.initialize().await?;
116/// println!("Connected to: {}", init_result.server_info.name);
117///
118/// // Use MCP operations
119/// let tools = client.list_tools().await?;
120/// let mut args = HashMap::new();
121/// args.insert("input".to_string(), serde_json::json!("test"));
122/// let result = client.call_tool("my_tool", Some(args)).await?;
123/// # Ok(())
124/// # }
125/// ```
126pub struct Client<T: Transport + 'static> {
127    pub(super) inner: Arc<ClientInner<T>>,
128}
129
130/// Clone implementation via Arc (same pattern as reqwest/AWS SDK)
131///
132/// Cloning a Client is cheap (just an Arc clone) and all clones share
133/// the same underlying connection and state.
134impl<T: Transport + 'static> Clone for Client<T> {
135    fn clone(&self) -> Self {
136        Self {
137            inner: Arc::clone(&self.inner),
138        }
139    }
140}
141
142impl<T: Transport + 'static> Drop for ClientInner<T> {
143    fn drop(&mut self) {
144        // Best-effort cleanup if shutdown() wasn't called
145        // This is a safety net, but applications SHOULD call shutdown() explicitly
146
147        // Single warning to catch attention
148        tracing::warn!(
149            "MCP Client dropped without explicit shutdown() - call client.shutdown().await for clean resource cleanup"
150        );
151
152        // Details at debug level to avoid noise in production
153        tracing::debug!("   📘 RECOMMENDATION: Call client.shutdown().await before dropping");
154        tracing::debug!(
155            "   🐛 IMPACT: Background tasks (especially WebSocket reconnection) may continue running"
156        );
157        tracing::debug!("   💡 See client shutdown documentation for best practices");
158
159        // We can shutdown the dispatcher (it's synchronous)
160        self.protocol.dispatcher().shutdown();
161        tracing::debug!("   ✅ Message dispatcher stopped");
162
163        // ⚠️  LIMITATION: Cannot call transport.disconnect() from Drop because it's async
164        //    This means:
165        //    - WebSocket: Close frame not sent, reconnection tasks keep running
166        //    - HTTP: Connections may not close cleanly
167        //    - All transports: Background tasks may be orphaned
168        tracing::debug!("   ❌ Transport NOT disconnected (Drop cannot call async methods)");
169        tracing::debug!("   ⚠️  WebSocket reconnection and other background tasks may continue");
170    }
171}
172
173impl<T: Transport + 'static> Client<T> {
174    /// Create a new client with the specified transport
175    ///
176    /// Creates a new MCP client instance with default capabilities.
177    /// The client must be initialized before use by calling `initialize()`.
178    ///
179    /// # Arguments
180    ///
181    /// * `transport` - The transport implementation to use for communication
182    ///
183    /// # Examples
184    ///
185    /// ```rust,no_run
186    /// use turbomcp_client::Client;
187    /// use turbomcp_transport::stdio::StdioTransport;
188    ///
189    /// let transport = StdioTransport::new();
190    /// let client = Client::new(transport);
191    /// ```
192    pub fn new(transport: T) -> Self {
193        let client = Self {
194            inner: Arc::new(ClientInner {
195                protocol: ProtocolClient::new(transport),
196                capabilities: ClientCapabilities::default(),
197                initialized: AtomicBool::new(false),
198                sampling_handler: Arc::new(StdMutex::new(None)),
199                handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
200                plugin_registry: Arc::new(tokio::sync::Mutex::new(
201                    crate::plugins::PluginRegistry::new(),
202                )),
203            }),
204        };
205
206        // Register dispatcher handlers for bidirectional communication
207        client.register_dispatcher_handlers();
208
209        client
210    }
211
212    /// Create a new client with custom capabilities
213    ///
214    /// # Arguments
215    ///
216    /// * `transport` - The transport implementation to use
217    /// * `capabilities` - The client capabilities to negotiate
218    ///
219    /// # Examples
220    ///
221    /// ```rust,no_run
222    /// use turbomcp_client::{Client, ClientCapabilities};
223    /// use turbomcp_transport::stdio::StdioTransport;
224    ///
225    /// let capabilities = ClientCapabilities {
226    ///     tools: true,
227    ///     prompts: true,
228    ///     resources: false,
229    ///     sampling: false,
230    /// };
231    ///
232    /// let transport = StdioTransport::new();
233    /// let client = Client::with_capabilities(transport, capabilities);
234    /// ```
235    pub fn with_capabilities(transport: T, capabilities: ClientCapabilities) -> Self {
236        let client = Self {
237            inner: Arc::new(ClientInner {
238                protocol: ProtocolClient::new(transport),
239                capabilities,
240                initialized: AtomicBool::new(false),
241                sampling_handler: Arc::new(StdMutex::new(None)),
242                handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
243                plugin_registry: Arc::new(tokio::sync::Mutex::new(
244                    crate::plugins::PluginRegistry::new(),
245                )),
246            }),
247        };
248
249        // Register dispatcher handlers for bidirectional communication
250        client.register_dispatcher_handlers();
251
252        client
253    }
254
255    /// Shutdown the client and clean up all resources
256    ///
257    /// This method performs a **graceful shutdown** of the MCP client by:
258    /// 1. Stopping the message dispatcher background task
259    /// 2. Disconnecting the transport (closes connection, stops background tasks)
260    ///
261    /// **CRITICAL**: After calling `shutdown()`, the client can no longer be used.
262    ///
263    /// # Why This Method Exists
264    ///
265    /// The `Drop` implementation cannot call async methods like `transport.disconnect()`,
266    /// which is required for proper cleanup of WebSocket connections and background tasks.
267    /// Without calling `shutdown()`, WebSocket reconnection tasks will continue running.
268    ///
269    /// # Best Practices
270    ///
271    /// - **Always call `shutdown()` before dropping** for clean resource cleanup
272    /// - For applications: call in signal handlers (`SIGINT`, `SIGTERM`)
273    /// - For tests: call in cleanup/teardown
274    /// - If forgotten, Drop will log warnings and do best-effort cleanup
275    ///
276    /// # Examples
277    ///
278    /// ```rust,no_run
279    /// # use turbomcp_client::Client;
280    /// # use turbomcp_transport::stdio::StdioTransport;
281    /// # async fn example() -> turbomcp_protocol::Result<()> {
282    /// let client = Client::new(StdioTransport::new());
283    /// client.initialize().await?;
284    ///
285    /// // Use client...
286    /// let _tools = client.list_tools().await?;
287    ///
288    /// // ✅ Clean shutdown
289    /// client.shutdown().await?;
290    /// # Ok(())
291    /// # }
292    /// ```
293    pub async fn shutdown(&self) -> Result<()> {
294        tracing::info!("🛑 Shutting down MCP client");
295
296        // 1. Shutdown message dispatcher
297        self.inner.protocol.dispatcher().shutdown();
298        tracing::debug!("✅ Message dispatcher stopped");
299
300        // 2. Disconnect transport (WebSocket: stops reconnection, HTTP: closes connections)
301        match self.inner.protocol.transport().disconnect().await {
302            Ok(()) => {
303                tracing::info!("✅ Transport disconnected successfully");
304            }
305            Err(e) => {
306                tracing::warn!("Transport disconnect error (may already be closed): {}", e);
307            }
308        }
309
310        tracing::info!("✅ MCP client shutdown complete");
311        Ok(())
312    }
313}
314
315// ============================================================================
316// HTTP-Specific Convenience Constructors (Feature-Gated)
317// ============================================================================
318
319#[cfg(feature = "http")]
320impl Client<turbomcp_transport::streamable_http_client::StreamableHttpClientTransport> {
321    /// Connect to an HTTP MCP server (convenience method)
322    ///
323    /// This is a beautiful one-liner alternative to manual configuration.
324    /// Creates an HTTP client, connects, and initializes in one call.
325    ///
326    /// # Arguments
327    ///
328    /// * `url` - The base URL of the MCP server (e.g., "http://localhost:8080")
329    ///
330    /// # Returns
331    ///
332    /// Returns an initialized `Client` ready to use.
333    ///
334    /// # Errors
335    ///
336    /// Returns an error if:
337    /// - The URL is invalid
338    /// - Connection to the server fails
339    /// - Initialization handshake fails
340    ///
341    /// # Examples
342    ///
343    /// ```rust,no_run
344    /// use turbomcp_client::Client;
345    ///
346    /// # async fn example() -> turbomcp_protocol::Result<()> {
347    /// // Beautiful one-liner - balanced with server DX
348    /// let client = Client::connect_http("http://localhost:8080").await?;
349    ///
350    /// // Now use it directly
351    /// let tools = client.list_tools().await?;
352    /// # Ok(())
353    /// # }
354    /// ```
355    ///
356    /// Compare to verbose approach (10+ lines):
357    /// ```rust,no_run
358    /// use turbomcp_client::Client;
359    /// use turbomcp_transport::streamable_http_client::{
360    ///     StreamableHttpClientConfig, StreamableHttpClientTransport
361    /// };
362    ///
363    /// # async fn example() -> turbomcp_protocol::Result<()> {
364    /// let config = StreamableHttpClientConfig {
365    ///     base_url: "http://localhost:8080".to_string(),
366    ///     ..Default::default()
367    /// };
368    /// let transport = StreamableHttpClientTransport::new(config);
369    /// let client = Client::new(transport);
370    /// client.initialize().await?;
371    /// # Ok(())
372    /// # }
373    /// ```
374    pub async fn connect_http(url: impl Into<String>) -> Result<Self> {
375        use turbomcp_transport::streamable_http_client::{
376            StreamableHttpClientConfig, StreamableHttpClientTransport,
377        };
378
379        let config = StreamableHttpClientConfig {
380            base_url: url.into(),
381            ..Default::default()
382        };
383
384        let transport = StreamableHttpClientTransport::new(config);
385        let client = Self::new(transport);
386
387        // Initialize connection immediately
388        client.initialize().await?;
389
390        Ok(client)
391    }
392
393    /// Connect to an HTTP MCP server with custom configuration
394    ///
395    /// Provides more control than `connect_http()` while still being ergonomic.
396    ///
397    /// # Arguments
398    ///
399    /// * `url` - The base URL of the MCP server
400    /// * `config_fn` - Function to customize the configuration
401    ///
402    /// # Examples
403    ///
404    /// ```rust,no_run
405    /// use turbomcp_client::Client;
406    /// use std::time::Duration;
407    ///
408    /// # async fn example() -> turbomcp_protocol::Result<()> {
409    /// let client = Client::connect_http_with("http://localhost:8080", |config| {
410    ///     config.timeout = Duration::from_secs(60);
411    ///     config.endpoint_path = "/api/mcp".to_string();
412    /// }).await?;
413    /// # Ok(())
414    /// # }
415    /// ```
416    pub async fn connect_http_with<F>(url: impl Into<String>, config_fn: F) -> Result<Self>
417    where
418        F: FnOnce(&mut turbomcp_transport::streamable_http_client::StreamableHttpClientConfig),
419    {
420        use turbomcp_transport::streamable_http_client::{
421            StreamableHttpClientConfig, StreamableHttpClientTransport,
422        };
423
424        let mut config = StreamableHttpClientConfig {
425            base_url: url.into(),
426            ..Default::default()
427        };
428
429        config_fn(&mut config);
430
431        let transport = StreamableHttpClientTransport::new(config);
432        let client = Self::new(transport);
433
434        client.initialize().await?;
435
436        Ok(client)
437    }
438}
439
440// ============================================================================
441// TCP-Specific Convenience Constructors (Feature-Gated)
442// ============================================================================
443
444#[cfg(feature = "tcp")]
445impl Client<turbomcp_transport::tcp::TcpTransport> {
446    /// Connect to a TCP MCP server (convenience method)
447    ///
448    /// Beautiful one-liner for TCP connections - balanced DX.
449    ///
450    /// # Arguments
451    ///
452    /// * `addr` - Server address (e.g., "127.0.0.1:8765" or localhost:8765")
453    ///
454    /// # Returns
455    ///
456    /// Returns an initialized `Client` ready to use.
457    ///
458    /// # Examples
459    ///
460    /// ```rust,no_run
461    /// # #[cfg(feature = "tcp")]
462    /// use turbomcp_client::Client;
463    ///
464    /// # async fn example() -> turbomcp_protocol::Result<()> {
465    /// let client = Client::connect_tcp("127.0.0.1:8765").await?;
466    /// let tools = client.list_tools().await?;
467    /// # Ok(())
468    /// # }
469    /// ```
470    pub async fn connect_tcp(addr: impl AsRef<str>) -> Result<Self> {
471        use std::net::SocketAddr;
472        use turbomcp_transport::tcp::TcpTransport;
473
474        let server_addr: SocketAddr = addr
475            .as_ref()
476            .parse()
477            .map_err(|e| Error::bad_request(format!("Invalid address: {}", e)))?;
478
479        // Client binds to 0.0.0.0:0 (any available port)
480        let bind_addr: SocketAddr = if server_addr.is_ipv6() {
481            "[::]:0".parse().unwrap()
482        } else {
483            "0.0.0.0:0".parse().unwrap()
484        };
485
486        let transport = TcpTransport::new_client(bind_addr, server_addr);
487        let client = Self::new(transport);
488
489        client.initialize().await?;
490
491        Ok(client)
492    }
493}
494
495// ============================================================================
496// Unix Socket-Specific Convenience Constructors (Feature-Gated)
497// ============================================================================
498
499#[cfg(all(unix, feature = "unix"))]
500impl Client<turbomcp_transport::unix::UnixTransport> {
501    /// Connect to a Unix socket MCP server (convenience method)
502    ///
503    /// Beautiful one-liner for Unix socket IPC - balanced DX.
504    ///
505    /// # Arguments
506    ///
507    /// * `path` - Socket file path (e.g., "/tmp/mcp.sock")
508    ///
509    /// # Returns
510    ///
511    /// Returns an initialized `Client` ready to use.
512    ///
513    /// # Examples
514    ///
515    /// ```rust,no_run
516    /// # #[cfg(all(unix, feature = "unix"))]
517    /// use turbomcp_client::Client;
518    ///
519    /// # async fn example() -> turbomcp_protocol::Result<()> {
520    /// let client = Client::connect_unix("/tmp/mcp.sock").await?;
521    /// let tools = client.list_tools().await?;
522    /// # Ok(())
523    /// # }
524    /// ```
525    pub async fn connect_unix(path: impl Into<std::path::PathBuf>) -> Result<Self> {
526        use turbomcp_transport::unix::UnixTransport;
527
528        let transport = UnixTransport::new_client(path.into());
529        let client = Self::new(transport);
530
531        client.initialize().await?;
532
533        Ok(client)
534    }
535}
536
537impl<T: Transport + 'static> Client<T> {
538    /// Register message handlers with the dispatcher
539    ///
540    /// This method sets up the callbacks that handle server-initiated requests
541    /// and notifications. The dispatcher's background task routes incoming
542    /// messages to these handlers.
543    ///
544    /// This is called automatically during Client construction (in `new()` and
545    /// `with_capabilities()`), so you don't need to call it manually.
546    ///
547    /// ## How It Works
548    ///
549    /// The handlers are synchronous closures that spawn async tasks to do the
550    /// actual work. This allows the dispatcher to continue routing messages
551    /// without blocking on handler execution.
552    fn register_dispatcher_handlers(&self) {
553        let dispatcher = self.inner.protocol.dispatcher();
554        let client_for_requests = self.clone();
555        let client_for_notifications = self.clone();
556
557        // Request handler (elicitation, sampling, etc.)
558        let request_handler = Arc::new(move |request: JsonRpcRequest| {
559            let client = client_for_requests.clone();
560            let method = request.method.clone();
561            let req_id = request.id.clone();
562            // Spawn async task to handle the request
563            tokio::spawn(async move {
564                tracing::debug!(
565                    "🔄 [request_handler] Handling server-initiated request: method={}, id={:?}",
566                    method,
567                    req_id
568                );
569                if let Err(e) = client.handle_request(request).await {
570                    tracing::error!(
571                        "❌ [request_handler] Error handling server request '{}': {}",
572                        method,
573                        e
574                    );
575                    tracing::error!("   Request ID: {:?}", req_id);
576                    tracing::error!("   Error kind: {:?}", e.kind);
577                } else {
578                    tracing::debug!(
579                        "✅ [request_handler] Successfully handled server request: method={}, id={:?}",
580                        method,
581                        req_id
582                    );
583                }
584            });
585            Ok(())
586        });
587
588        // Notification handler
589        let notification_handler = Arc::new(move |notification: JsonRpcNotification| {
590            let client = client_for_notifications.clone();
591            // Spawn async task to handle the notification
592            tokio::spawn(async move {
593                if let Err(e) = client.handle_notification(notification).await {
594                    tracing::error!("Error handling server notification: {}", e);
595                }
596            });
597            Ok(())
598        });
599
600        // Register handlers synchronously - no race condition!
601        // The set_* methods are now synchronous with std::sync::Mutex
602        dispatcher.set_request_handler(request_handler);
603        dispatcher.set_notification_handler(notification_handler);
604        tracing::debug!("Dispatcher handlers registered successfully");
605    }
606
607    /// Handle server-initiated requests (elicitation, sampling, roots)
608    ///
609    /// This method is called by the MessageDispatcher when it receives a request
610    /// from the server. It routes the request to the appropriate handler based on
611    /// the method name.
612    async fn handle_request(&self, request: JsonRpcRequest) -> Result<()> {
613        match request.method.as_str() {
614            "sampling/createMessage" => {
615                let handler_opt = self
616                    .inner
617                    .sampling_handler
618                    .lock()
619                    .expect("sampling_handler mutex poisoned")
620                    .clone();
621                if let Some(handler) = handler_opt {
622                    // Extract request ID for proper correlation
623                    let request_id = match &request.id {
624                        turbomcp_protocol::MessageId::String(s) => s.clone(),
625                        turbomcp_protocol::MessageId::Number(n) => n.to_string(),
626                        turbomcp_protocol::MessageId::Uuid(u) => u.to_string(),
627                    };
628
629                    let params: CreateMessageRequest =
630                        serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
631                            .map_err(|e| {
632                            Error::protocol(format!("Invalid createMessage params: {}", e))
633                        })?;
634
635                    match handler.handle_create_message(request_id, params).await {
636                        Ok(result) => {
637                            let result_value = serde_json::to_value(result).map_err(|e| {
638                                Error::protocol(format!("Failed to serialize response: {}", e))
639                            })?;
640                            let response = JsonRpcResponse::success(result_value, request.id);
641                            self.send_response(response).await?;
642                        }
643                        Err(e) => {
644                            tracing::warn!(
645                                "⚠️  [handle_request] Sampling handler returned error: {}",
646                                e
647                            );
648
649                            // Preserve error semantics by checking actual error type
650                            // This allows proper error code propagation for retry logic
651                            let (code, message) = if let Some(handler_err) =
652                                e.downcast_ref::<HandlerError>()
653                            {
654                                // HandlerError has explicit JSON-RPC code mapping
655                                let json_err = handler_err.into_jsonrpc_error();
656                                tracing::info!(
657                                    "📋 [handle_request] HandlerError mapped to JSON-RPC code: {}",
658                                    json_err.code
659                                );
660                                (json_err.code, json_err.message)
661                            } else if let Some(proto_err) =
662                                e.downcast_ref::<turbomcp_protocol::Error>()
663                            {
664                                // Protocol errors have ErrorKind-based mapping
665                                tracing::info!(
666                                    "📋 [handle_request] Protocol error mapped to code: {}",
667                                    proto_err.jsonrpc_error_code()
668                                );
669                                (proto_err.jsonrpc_error_code(), proto_err.to_string())
670                            } else {
671                                // Generic errors default to Internal (-32603)
672                                // Log the error type for debugging (should rarely hit this path)
673                                tracing::warn!(
674                                    "📋 [handle_request] Sampling handler returned unknown error type (not HandlerError or Protocol error): {}",
675                                    std::any::type_name_of_val(&*e)
676                                );
677                                (-32603, format!("Sampling handler error: {}", e))
678                            };
679
680                            let error = turbomcp_protocol::jsonrpc::JsonRpcError {
681                                code,
682                                message,
683                                data: None,
684                            };
685                            let response =
686                                JsonRpcResponse::error_response(error, request.id.clone());
687
688                            tracing::info!(
689                                "🔄 [handle_request] Attempting to send error response for request: {:?}",
690                                request.id
691                            );
692                            self.send_response(response).await?;
693                            tracing::info!(
694                                "✅ [handle_request] Error response sent successfully for request: {:?}",
695                                request.id
696                            );
697                        }
698                    }
699                } else {
700                    // No handler configured
701                    let error = turbomcp_protocol::jsonrpc::JsonRpcError {
702                        code: -32601,
703                        message: "Sampling not supported".to_string(),
704                        data: None,
705                    };
706                    let response = JsonRpcResponse::error_response(error, request.id);
707                    self.send_response(response).await?;
708                }
709            }
710            "roots/list" => {
711                // Handle roots/list request from server
712                // Clone the handler Arc to avoid holding mutex across await
713                let handler_opt = self
714                    .inner
715                    .handlers
716                    .lock()
717                    .expect("handlers mutex poisoned")
718                    .roots
719                    .clone();
720
721                let roots_result = if let Some(handler) = handler_opt {
722                    handler.handle_roots_request().await
723                } else {
724                    // No handler - return empty list per MCP spec
725                    Ok(Vec::new())
726                };
727
728                match roots_result {
729                    Ok(roots) => {
730                        let result_value =
731                            serde_json::to_value(turbomcp_protocol::types::ListRootsResult {
732                                roots,
733                                _meta: None,
734                            })
735                            .map_err(|e| {
736                                Error::protocol(format!(
737                                    "Failed to serialize roots response: {}",
738                                    e
739                                ))
740                            })?;
741                        let response = JsonRpcResponse::success(result_value, request.id);
742                        self.send_response(response).await?;
743                    }
744                    Err(e) => {
745                        // FIXED: Extract error code from HandlerError instead of hardcoding -32603
746                        // Roots handler returns HandlerResult<Vec<Root>>, so error is HandlerError
747                        let json_err = e.into_jsonrpc_error();
748                        let response = JsonRpcResponse::error_response(json_err, request.id);
749                        self.send_response(response).await?;
750                    }
751                }
752            }
753            "elicitation/create" => {
754                // Clone handler Arc before await to avoid holding mutex across await
755                let handler_opt = self
756                    .inner
757                    .handlers
758                    .lock()
759                    .expect("handlers mutex poisoned")
760                    .elicitation
761                    .clone();
762                if let Some(handler) = handler_opt {
763                    // Parse elicitation request params as MCP protocol type
764                    let proto_request: turbomcp_protocol::types::ElicitRequest =
765                        serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
766                            .map_err(|e| {
767                            Error::protocol(format!("Invalid elicitation params: {}", e))
768                        })?;
769
770                    // Wrap protocol request with ID for handler (preserves type safety!)
771                    let handler_request =
772                        crate::handlers::ElicitationRequest::new(request.id.clone(), proto_request);
773
774                    // Call the registered elicitation handler
775                    match handler.handle_elicitation(handler_request).await {
776                        Ok(elicit_response) => {
777                            // Convert handler response back to protocol type
778                            let proto_result = elicit_response.into_protocol();
779                            let result_value = serde_json::to_value(proto_result).map_err(|e| {
780                                Error::protocol(format!(
781                                    "Failed to serialize elicitation response: {}",
782                                    e
783                                ))
784                            })?;
785                            let response = JsonRpcResponse::success(result_value, request.id);
786                            self.send_response(response).await?;
787                        }
788                        Err(e) => {
789                            // Convert handler error to JSON-RPC error using centralized mapping
790                            let response =
791                                JsonRpcResponse::error_response(e.into_jsonrpc_error(), request.id);
792                            self.send_response(response).await?;
793                        }
794                    }
795                } else {
796                    // No handler configured - elicitation not supported
797                    let error = turbomcp_protocol::jsonrpc::JsonRpcError {
798                        code: -32601,
799                        message: "Elicitation not supported - no handler registered".to_string(),
800                        data: None,
801                    };
802                    let response = JsonRpcResponse::error_response(error, request.id);
803                    self.send_response(response).await?;
804                }
805            }
806            _ => {
807                // Unknown method
808                let error = turbomcp_protocol::jsonrpc::JsonRpcError {
809                    code: -32601,
810                    message: format!("Method not found: {}", request.method),
811                    data: None,
812                };
813                let response = JsonRpcResponse::error_response(error, request.id);
814                self.send_response(response).await?;
815            }
816        }
817        Ok(())
818    }
819
820    /// Handle server-initiated notifications
821    ///
822    /// Routes notifications to appropriate handlers based on method name.
823    /// MCP defines several notification types that servers can send to clients:
824    ///
825    /// - `notifications/progress` - Progress updates for long-running operations
826    /// - `notifications/message` - Log messages from server
827    /// - `notifications/resources/updated` - Resource content changed
828    /// - `notifications/resources/list_changed` - Resource list changed
829    async fn handle_notification(&self, notification: JsonRpcNotification) -> Result<()> {
830        match notification.method.as_str() {
831            "notifications/progress" => {
832                // Progress tracking has been removed
833                tracing::debug!(
834                    "Received progress notification - progress tracking has been removed"
835                );
836            }
837
838            "notifications/message" => {
839                // Route to log handler
840                let handler_opt = self
841                    .inner
842                    .handlers
843                    .lock()
844                    .expect("handlers mutex poisoned")
845                    .get_log_handler();
846
847                if let Some(handler) = handler_opt {
848                    // Parse log message
849                    let log: crate::handlers::LoggingNotification = serde_json::from_value(
850                        notification.params.unwrap_or(serde_json::Value::Null),
851                    )
852                    .map_err(|e| Error::protocol(format!("Invalid log notification: {}", e)))?;
853
854                    // Call handler
855                    if let Err(e) = handler.handle_log(log).await {
856                        tracing::error!("Log handler error: {}", e);
857                    }
858                } else {
859                    tracing::debug!("Received log notification but no handler registered");
860                }
861            }
862
863            "notifications/resources/updated" => {
864                // Route to resource update handler
865                let handler_opt = self
866                    .inner
867                    .handlers
868                    .lock()
869                    .expect("handlers mutex poisoned")
870                    .get_resource_update_handler();
871
872                if let Some(handler) = handler_opt {
873                    // Parse resource update notification
874                    let update: crate::handlers::ResourceUpdatedNotification =
875                        serde_json::from_value(
876                            notification.params.unwrap_or(serde_json::Value::Null),
877                        )
878                        .map_err(|e| {
879                            Error::protocol(format!("Invalid resource update notification: {}", e))
880                        })?;
881
882                    // Call handler
883                    if let Err(e) = handler.handle_resource_update(update).await {
884                        tracing::error!("Resource update handler error: {}", e);
885                    }
886                } else {
887                    tracing::debug!(
888                        "Received resource update notification but no handler registered"
889                    );
890                }
891            }
892
893            "notifications/resources/list_changed" => {
894                // Route to resource list changed handler
895                let handler_opt = self
896                    .inner
897                    .handlers
898                    .lock()
899                    .expect("handlers mutex poisoned")
900                    .get_resource_list_changed_handler();
901
902                if let Some(handler) = handler_opt {
903                    if let Err(e) = handler.handle_resource_list_changed().await {
904                        tracing::error!("Resource list changed handler error: {}", e);
905                    }
906                } else {
907                    tracing::debug!(
908                        "Resource list changed notification received (no handler registered)"
909                    );
910                }
911            }
912
913            "notifications/prompts/list_changed" => {
914                // Route to prompt list changed handler
915                let handler_opt = self
916                    .inner
917                    .handlers
918                    .lock()
919                    .expect("handlers mutex poisoned")
920                    .get_prompt_list_changed_handler();
921
922                if let Some(handler) = handler_opt {
923                    if let Err(e) = handler.handle_prompt_list_changed().await {
924                        tracing::error!("Prompt list changed handler error: {}", e);
925                    }
926                } else {
927                    tracing::debug!(
928                        "Prompt list changed notification received (no handler registered)"
929                    );
930                }
931            }
932
933            "notifications/tools/list_changed" => {
934                // Route to tool list changed handler
935                let handler_opt = self
936                    .inner
937                    .handlers
938                    .lock()
939                    .expect("handlers mutex poisoned")
940                    .get_tool_list_changed_handler();
941
942                if let Some(handler) = handler_opt {
943                    if let Err(e) = handler.handle_tool_list_changed().await {
944                        tracing::error!("Tool list changed handler error: {}", e);
945                    }
946                } else {
947                    tracing::debug!(
948                        "Tool list changed notification received (no handler registered)"
949                    );
950                }
951            }
952
953            "notifications/cancelled" => {
954                // Route to cancellation handler
955                let handler_opt = self
956                    .inner
957                    .handlers
958                    .lock()
959                    .expect("handlers mutex poisoned")
960                    .get_cancellation_handler();
961
962                if let Some(handler) = handler_opt {
963                    // Parse cancellation notification
964                    let cancellation: crate::handlers::CancelledNotification =
965                        serde_json::from_value(
966                            notification.params.unwrap_or(serde_json::Value::Null),
967                        )
968                        .map_err(|e| {
969                            Error::protocol(format!("Invalid cancellation notification: {}", e))
970                        })?;
971
972                    // Call handler
973                    if let Err(e) = handler.handle_cancellation(cancellation).await {
974                        tracing::error!("Cancellation handler error: {}", e);
975                    }
976                } else {
977                    tracing::debug!("Cancellation notification received (no handler registered)");
978                }
979            }
980
981            _ => {
982                // Unknown notification type
983                tracing::debug!("Received unknown notification: {}", notification.method);
984            }
985        }
986
987        Ok(())
988    }
989
990    async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
991        tracing::info!(
992            "📤 [send_response] Sending JSON-RPC response: id={:?}",
993            response.id
994        );
995
996        let payload = serde_json::to_vec(&response).map_err(|e| {
997            tracing::error!("❌ [send_response] Failed to serialize response: {}", e);
998            Error::protocol(format!("Failed to serialize response: {}", e))
999        })?;
1000
1001        tracing::debug!(
1002            "📤 [send_response] Response payload: {} bytes",
1003            payload.len()
1004        );
1005        tracing::debug!(
1006            "📤 [send_response] Response JSON: {}",
1007            String::from_utf8_lossy(&payload)
1008        );
1009
1010        let message = TransportMessage::new(
1011            turbomcp_protocol::MessageId::from("response".to_string()),
1012            payload.into(),
1013        );
1014
1015        self.inner
1016            .protocol
1017            .transport()
1018            .send(message)
1019            .await
1020            .map_err(|e| {
1021                tracing::error!("❌ [send_response] Transport send failed: {}", e);
1022                Error::transport(format!("Failed to send response: {}", e))
1023            })?;
1024
1025        tracing::info!(
1026            "✅ [send_response] Response sent successfully: id={:?}",
1027            response.id
1028        );
1029        Ok(())
1030    }
1031
1032    /// Initialize the connection with the MCP server
1033    ///
1034    /// Performs the initialization handshake with the server, negotiating capabilities
1035    /// and establishing the protocol version. This method must be called before
1036    /// any other operations can be performed.
1037    ///
1038    /// # Returns
1039    ///
1040    /// Returns an `InitializeResult` containing server information and negotiated capabilities.
1041    ///
1042    /// # Errors
1043    ///
1044    /// Returns an error if:
1045    /// - The transport connection fails
1046    /// - The server rejects the initialization request
1047    /// - Protocol negotiation fails
1048    ///
1049    /// # Examples
1050    ///
1051    /// ```rust,no_run
1052    /// # use turbomcp_client::Client;
1053    /// # use turbomcp_transport::stdio::StdioTransport;
1054    /// # async fn example() -> turbomcp_protocol::Result<()> {
1055    /// let mut client = Client::new(StdioTransport::new());
1056    ///
1057    /// let result = client.initialize().await?;
1058    /// println!("Server: {} v{}", result.server_info.name, result.server_info.version);
1059    /// # Ok(())
1060    /// # }
1061    /// ```
1062    pub async fn initialize(&self) -> Result<InitializeResult> {
1063        // Auto-connect transport if not already connected
1064        // This provides consistent DX across all transports (Stdio, TCP, HTTP, WebSocket, Unix)
1065        let transport = self.inner.protocol.transport();
1066        let transport_state = transport.state().await;
1067        if !matches!(
1068            transport_state,
1069            turbomcp_transport::TransportState::Connected
1070        ) {
1071            tracing::debug!(
1072                "Auto-connecting transport (current state: {:?})",
1073                transport_state
1074            );
1075            transport
1076                .connect()
1077                .await
1078                .map_err(|e| Error::transport(format!("Failed to connect transport: {}", e)))?;
1079            tracing::info!("Transport connected successfully");
1080        }
1081
1082        // Build client capabilities based on registered handlers (automatic detection)
1083        let mut client_caps = ProtocolClientCapabilities::default();
1084
1085        // Detect sampling capability from handler
1086        if let Some(sampling_caps) = self.get_sampling_capabilities() {
1087            client_caps.sampling = Some(sampling_caps);
1088        }
1089
1090        // Detect elicitation capability from handler
1091        if let Some(elicitation_caps) = self.get_elicitation_capabilities() {
1092            client_caps.elicitation = Some(elicitation_caps);
1093        }
1094
1095        // Detect roots capability from handler
1096        if let Some(roots_caps) = self.get_roots_capabilities() {
1097            client_caps.roots = Some(roots_caps);
1098        }
1099
1100        // Send MCP initialization request
1101        let request = InitializeRequest {
1102            protocol_version: PROTOCOL_VERSION.to_string(),
1103            capabilities: client_caps,
1104            client_info: turbomcp_protocol::types::Implementation {
1105                name: "turbomcp-client".to_string(),
1106                version: env!("CARGO_PKG_VERSION").to_string(),
1107                title: Some("TurboMCP Client".to_string()),
1108            },
1109            _meta: None,
1110        };
1111
1112        let protocol_response: ProtocolInitializeResult = self
1113            .inner
1114            .protocol
1115            .request("initialize", Some(serde_json::to_value(request)?))
1116            .await?;
1117
1118        // AtomicBool: lock-free store with Ordering::Relaxed
1119        self.inner.initialized.store(true, Ordering::Relaxed);
1120
1121        // Send initialized notification
1122        self.inner
1123            .protocol
1124            .notify("notifications/initialized", None)
1125            .await?;
1126
1127        // Convert protocol response to client response type
1128        Ok(InitializeResult {
1129            server_info: protocol_response.server_info,
1130            server_capabilities: protocol_response.capabilities,
1131        })
1132    }
1133
1134    /// Execute a protocol method with plugin middleware
1135    ///
1136    /// This is a generic helper for wrapping protocol calls with plugin middleware.
1137    pub(crate) async fn execute_with_plugins<R>(
1138        &self,
1139        method_name: &str,
1140        params: Option<serde_json::Value>,
1141    ) -> Result<R>
1142    where
1143        R: serde::de::DeserializeOwned + serde::Serialize + Clone,
1144    {
1145        // Create JSON-RPC request for plugin context
1146        let json_rpc_request = turbomcp_protocol::jsonrpc::JsonRpcRequest {
1147            jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
1148            id: turbomcp_protocol::MessageId::Number(1),
1149            method: method_name.to_string(),
1150            params: params.clone(),
1151        };
1152
1153        // 1. Create request context for plugins
1154        let mut req_ctx =
1155            crate::plugins::RequestContext::new(json_rpc_request, std::collections::HashMap::new());
1156
1157        // 2. Execute before_request plugin middleware
1158        if let Err(e) = self
1159            .inner
1160            .plugin_registry
1161            .lock()
1162            .await
1163            .execute_before_request(&mut req_ctx)
1164            .await
1165        {
1166            return Err(Error::bad_request(format!(
1167                "Plugin before_request failed: {}",
1168                e
1169            )));
1170        }
1171
1172        // 3. Execute the actual protocol call
1173        let start_time = std::time::Instant::now();
1174        let protocol_result: Result<R> = self
1175            .inner
1176            .protocol
1177            .request(method_name, req_ctx.params().cloned())
1178            .await;
1179        let duration = start_time.elapsed();
1180
1181        // 4. Prepare response context
1182        let mut resp_ctx = match protocol_result {
1183            Ok(ref response) => {
1184                let response_value = serde_json::to_value(response.clone())?;
1185                crate::plugins::ResponseContext::new(req_ctx, Some(response_value), None, duration)
1186            }
1187            Err(ref e) => {
1188                crate::plugins::ResponseContext::new(req_ctx, None, Some(*e.clone()), duration)
1189            }
1190        };
1191
1192        // 5. Execute after_response plugin middleware
1193        if let Err(e) = self
1194            .inner
1195            .plugin_registry
1196            .lock()
1197            .await
1198            .execute_after_response(&mut resp_ctx)
1199            .await
1200        {
1201            return Err(Error::bad_request(format!(
1202                "Plugin after_response failed: {}",
1203                e
1204            )));
1205        }
1206
1207        // 6. Return the final result, checking for plugin modifications
1208        match protocol_result {
1209            Ok(ref response) => {
1210                // Check if plugins modified the response
1211                if let Some(modified_response) = resp_ctx.response {
1212                    // Try to deserialize the modified response
1213                    if let Ok(modified_result) =
1214                        serde_json::from_value::<R>(modified_response.clone())
1215                    {
1216                        return Ok(modified_result);
1217                    }
1218                }
1219
1220                // No plugin modifications, use original response
1221                Ok(response.clone())
1222            }
1223            Err(e) => {
1224                // Check if plugins provided an error recovery response
1225                if let Some(recovery_response) = resp_ctx.response {
1226                    if let Ok(recovery_result) = serde_json::from_value::<R>(recovery_response) {
1227                        Ok(recovery_result)
1228                    } else {
1229                        Err(e)
1230                    }
1231                } else {
1232                    Err(e)
1233                }
1234            }
1235        }
1236    }
1237
1238    /// Subscribe to resource change notifications
1239    ///
1240    /// Registers interest in receiving notifications when the specified
1241    /// resource changes. The server will send notifications when the
1242    /// resource is modified, created, or deleted.
1243    ///
1244    /// # Arguments
1245    ///
1246    /// * `uri` - The URI of the resource to monitor
1247    ///
1248    /// # Returns
1249    ///
1250    /// Returns `EmptyResult` on successful subscription.
1251    ///
1252    /// # Errors
1253    ///
1254    /// Returns an error if:
1255    /// - The client is not initialized
1256    /// - The URI is invalid or empty
1257    /// - The server doesn't support subscriptions
1258    /// - The request fails
1259    ///
1260    /// # Examples
1261    ///
1262    /// ```rust,no_run
1263    /// # use turbomcp_client::Client;
1264    /// # use turbomcp_transport::stdio::StdioTransport;
1265    /// # async fn example() -> turbomcp_protocol::Result<()> {
1266    /// let mut client = Client::new(StdioTransport::new());
1267    /// client.initialize().await?;
1268    ///
1269    /// // Subscribe to file changes
1270    /// client.subscribe("file:///watch/directory").await?;
1271    /// println!("Subscribed to resource changes");
1272    /// # Ok(())
1273    /// # }
1274    /// ```
1275    pub async fn subscribe(&self, uri: &str) -> Result<EmptyResult> {
1276        if !self.inner.initialized.load(Ordering::Relaxed) {
1277            return Err(Error::bad_request("Client not initialized"));
1278        }
1279
1280        if uri.is_empty() {
1281            return Err(Error::bad_request("Subscription URI cannot be empty"));
1282        }
1283
1284        // Send resources/subscribe request with plugin middleware
1285        let request = SubscribeRequest {
1286            uri: uri.to_string(),
1287        };
1288
1289        self.execute_with_plugins(
1290            "resources/subscribe",
1291            Some(serde_json::to_value(request).map_err(|e| {
1292                Error::protocol(format!("Failed to serialize subscribe request: {}", e))
1293            })?),
1294        )
1295        .await
1296    }
1297
1298    /// Unsubscribe from resource change notifications
1299    ///
1300    /// Cancels a previous subscription to resource changes. After unsubscribing,
1301    /// the client will no longer receive notifications for the specified resource.
1302    ///
1303    /// # Arguments
1304    ///
1305    /// * `uri` - The URI of the resource to stop monitoring
1306    ///
1307    /// # Returns
1308    ///
1309    /// Returns `EmptyResult` on successful unsubscription.
1310    ///
1311    /// # Errors
1312    ///
1313    /// Returns an error if:
1314    /// - The client is not initialized
1315    /// - The URI is invalid or empty
1316    /// - No active subscription exists for the URI
1317    /// - The request fails
1318    ///
1319    /// # Examples
1320    ///
1321    /// ```rust,no_run
1322    /// # use turbomcp_client::Client;
1323    /// # use turbomcp_transport::stdio::StdioTransport;
1324    /// # async fn example() -> turbomcp_protocol::Result<()> {
1325    /// let mut client = Client::new(StdioTransport::new());
1326    /// client.initialize().await?;
1327    ///
1328    /// // Unsubscribe from file changes
1329    /// client.unsubscribe("file:///watch/directory").await?;
1330    /// println!("Unsubscribed from resource changes");
1331    /// # Ok(())
1332    /// # }
1333    /// ```
1334    pub async fn unsubscribe(&self, uri: &str) -> Result<EmptyResult> {
1335        if !self.inner.initialized.load(Ordering::Relaxed) {
1336            return Err(Error::bad_request("Client not initialized"));
1337        }
1338
1339        if uri.is_empty() {
1340            return Err(Error::bad_request("Unsubscription URI cannot be empty"));
1341        }
1342
1343        // Send resources/unsubscribe request with plugin middleware
1344        let request = UnsubscribeRequest {
1345            uri: uri.to_string(),
1346        };
1347
1348        self.execute_with_plugins(
1349            "resources/unsubscribe",
1350            Some(serde_json::to_value(request).map_err(|e| {
1351                Error::protocol(format!("Failed to serialize unsubscribe request: {}", e))
1352            })?),
1353        )
1354        .await
1355    }
1356
1357    /// Get the client's capabilities configuration
1358    #[must_use]
1359    pub fn capabilities(&self) -> &ClientCapabilities {
1360        &self.inner.capabilities
1361    }
1362
1363    /// Initialize all registered plugins
1364    ///
1365    /// This should be called after registration but before using the client.
1366    pub async fn initialize_plugins(&self) -> Result<()> {
1367        // Set up client context for plugins with actual client capabilities
1368        let mut capabilities = std::collections::HashMap::new();
1369        capabilities.insert(
1370            "protocol_version".to_string(),
1371            serde_json::json!("2024-11-05"),
1372        );
1373        capabilities.insert(
1374            "mcp_version".to_string(),
1375            serde_json::json!(env!("CARGO_PKG_VERSION")),
1376        );
1377        capabilities.insert(
1378            "supports_notifications".to_string(),
1379            serde_json::json!(true),
1380        );
1381        capabilities.insert(
1382            "supports_sampling".to_string(),
1383            serde_json::json!(self.has_sampling_handler()),
1384        );
1385        capabilities.insert("supports_progress".to_string(), serde_json::json!(true));
1386        capabilities.insert("supports_roots".to_string(), serde_json::json!(true));
1387
1388        // Extract client configuration
1389        let mut config = std::collections::HashMap::new();
1390        config.insert(
1391            "client_name".to_string(),
1392            serde_json::json!("turbomcp-client"),
1393        );
1394        config.insert(
1395            "initialized".to_string(),
1396            serde_json::json!(self.inner.initialized.load(Ordering::Relaxed)),
1397        );
1398        config.insert(
1399            "plugin_count".to_string(),
1400            serde_json::json!(self.inner.plugin_registry.lock().await.plugin_count()),
1401        );
1402
1403        let context = crate::plugins::PluginContext::new(
1404            "turbomcp-client".to_string(),
1405            env!("CARGO_PKG_VERSION").to_string(),
1406            capabilities,
1407            config,
1408            vec![], // Will be populated by the registry
1409        );
1410
1411        self.inner
1412            .plugin_registry
1413            .lock()
1414            .await
1415            .set_client_context(context);
1416
1417        // Note: Individual plugins are initialized automatically during registration
1418        // via PluginRegistry::register_plugin(). This method ensures the registry
1419        // has proper client context for any future plugin registrations.
1420        Ok(())
1421    }
1422
1423    /// Cleanup all registered plugins
1424    ///
1425    /// This should be called when the client is being shut down.
1426    pub async fn cleanup_plugins(&self) -> Result<()> {
1427        // Clear the plugin registry - plugins will be dropped and cleaned up automatically
1428        // The Rust ownership system ensures proper cleanup when the Arc<dyn ClientPlugin>
1429        // references are dropped.
1430
1431        // Note: The plugin system uses RAII (Resource Acquisition Is Initialization)
1432        // pattern where plugins clean up their resources in their Drop implementation.
1433        // Replace the registry with a fresh one (mutex ensures safe access)
1434        *self.inner.plugin_registry.lock().await = crate::plugins::PluginRegistry::new();
1435        Ok(())
1436    }
1437
1438    // Note: Capability detection methods (has_*_handler, get_*_capabilities)
1439    // are defined in their respective operation modules:
1440    // - sampling.rs: has_sampling_handler, get_sampling_capabilities
1441    // - handlers.rs: has_elicitation_handler, has_roots_handler
1442    //
1443    // Additional capability getters for elicitation and roots added below
1444    // since they're used during initialization
1445
1446    /// Get elicitation capabilities if handler is registered
1447    /// Automatically detects capability based on registered handler
1448    fn get_elicitation_capabilities(
1449        &self,
1450    ) -> Option<turbomcp_protocol::types::ElicitationCapabilities> {
1451        if self.has_elicitation_handler() {
1452            // Currently returns default capabilities. In the future, schema_validation support
1453            // could be detected from handler traits by adding a HasSchemaValidation marker trait
1454            // that handlers could implement. For now, handlers validate schemas themselves.
1455            Some(turbomcp_protocol::types::ElicitationCapabilities::default())
1456        } else {
1457            None
1458        }
1459    }
1460
1461    /// Get roots capabilities if handler is registered
1462    fn get_roots_capabilities(&self) -> Option<turbomcp_protocol::types::RootsCapabilities> {
1463        if self.has_roots_handler() {
1464            // Roots capabilities indicate whether list can change
1465            Some(turbomcp_protocol::types::RootsCapabilities {
1466                list_changed: Some(true), // Support dynamic roots by default
1467            })
1468        } else {
1469            None
1470        }
1471    }
1472}