turbomcp_client/client/
core.rs

1//! Core Client implementation for MCP communication
2//!
3//! This module contains the main `Client<T>` struct and its implementation,
4//! providing the core MCP client functionality including:
5//!
6//! - Connection initialization and lifecycle management
7//! - Message processing and bidirectional communication
8//! - MCP operation support (tools, prompts, resources, sampling, etc.)
9//! - Plugin middleware integration
10//! - Handler registration and management
11//!
12//! # Architecture
13//!
14//! `Client<T>` is implemented as a cheaply-cloneable Arc wrapper with interior
15//! mutability (same pattern as reqwest and AWS SDK):
16//!
17//! - **AtomicBool** for initialized flag (lock-free)
18//! - **Arc<Mutex<...>>** for handlers/plugins (infrequent mutation)
19//! - **`Arc<ClientInner<T>>`** for cheap cloning
20
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::{Arc, Mutex as StdMutex};
23
24use tokio::sync::Semaphore;
25
26use turbomcp_protocol::jsonrpc::*;
27use turbomcp_protocol::types::{
28    ClientCapabilities as ProtocolClientCapabilities, InitializeResult as ProtocolInitializeResult,
29    *,
30};
31use turbomcp_protocol::{Error, PROTOCOL_VERSION, Result};
32use turbomcp_transport::{Transport, TransportMessage};
33
34use super::config::InitializeResult;
35use super::protocol::ProtocolClient;
36use crate::{
37    ClientCapabilities,
38    handlers::{HandlerError, HandlerRegistry},
39    sampling::SamplingHandler,
40};
41
42/// Inner client state with interior mutability
43///
44/// This structure contains the actual client state and is wrapped in Arc<...>
45/// to enable cheap cloning (same pattern as reqwest and AWS SDK).
46pub(super) struct ClientInner<T: Transport + 'static> {
47    /// Protocol client for low-level communication
48    pub(super) protocol: ProtocolClient<T>,
49
50    /// Client capabilities (immutable after construction)
51    pub(super) capabilities: ClientCapabilities,
52
53    /// Initialization state (lock-free atomic boolean)
54    pub(super) initialized: AtomicBool,
55
56    /// Optional sampling handler (mutex for dynamic updates)
57    pub(super) sampling_handler: Arc<StdMutex<Option<Arc<dyn SamplingHandler>>>>,
58
59    /// Handler registry for bidirectional communication (mutex for registration)
60    pub(super) handlers: Arc<StdMutex<HandlerRegistry>>,
61
62    /// Plugin registry for middleware (tokio mutex - holds across await)
63    pub(super) plugin_registry: Arc<tokio::sync::Mutex<crate::plugins::PluginRegistry>>,
64
65    /// ✅ Semaphore for bounded concurrency of request/notification handlers
66    /// Limits concurrent server-initiated request handlers to prevent resource exhaustion
67    pub(super) handler_semaphore: Arc<Semaphore>,
68}
69
70/// The core MCP client implementation
71///
72/// Client provides a comprehensive interface for communicating with MCP servers,
73/// supporting all protocol features including tools, prompts, resources, sampling,
74/// elicitation, and bidirectional communication patterns.
75///
76/// # Clone Pattern
77///
78/// `Client<T>` is cheaply cloneable via Arc (same pattern as reqwest and AWS SDK).
79/// All clones share the same underlying connection and state:
80///
81/// ```rust,no_run
82/// use turbomcp_client::Client;
83/// use turbomcp_transport::stdio::StdioTransport;
84///
85/// # async fn example() -> turbomcp_protocol::Result<()> {
86/// let client = Client::new(StdioTransport::new());
87/// client.initialize().await?;
88///
89/// // Cheap clone - shares same connection
90/// let client2 = client.clone();
91/// tokio::spawn(async move {
92///     client2.list_tools().await.ok();
93/// });
94/// # Ok(())
95/// # }
96/// ```
97///
98/// The client must be initialized before use by calling `initialize()` to perform
99/// the MCP handshake and capability negotiation.
100///
101/// # Features
102///
103/// - **Protocol Compliance**: Full MCP 2025-06-18 specification support
104/// - **Bidirectional Communication**: Server-initiated requests and client responses
105/// - **Plugin Middleware**: Extensible request/response processing
106/// - **Handler Registry**: Callbacks for server-initiated operations
107/// - **Connection Management**: Robust error handling and recovery
108/// - **Type Safety**: Compile-time guarantees for MCP message types
109/// - **Cheap Cloning**: Arc-based sharing like reqwest/AWS SDK
110///
111/// # Examples
112///
113/// ```rust,no_run
114/// use turbomcp_client::Client;
115/// use turbomcp_transport::stdio::StdioTransport;
116/// use std::collections::HashMap;
117///
118/// # async fn example() -> turbomcp_protocol::Result<()> {
119/// // Create and initialize client (no mut needed!)
120/// let client = Client::new(StdioTransport::new());
121/// let init_result = client.initialize().await?;
122/// println!("Connected to: {}", init_result.server_info.name);
123///
124/// // Use MCP operations
125/// let tools = client.list_tools().await?;
126/// let mut args = HashMap::new();
127/// args.insert("input".to_string(), serde_json::json!("test"));
128/// let result = client.call_tool("my_tool", Some(args)).await?;
129/// # Ok(())
130/// # }
131/// ```
132pub struct Client<T: Transport + 'static> {
133    pub(super) inner: Arc<ClientInner<T>>,
134}
135
136/// Clone implementation via Arc (same pattern as reqwest/AWS SDK)
137///
138/// Cloning a Client is cheap (just an Arc clone) and all clones share
139/// the same underlying connection and state.
140impl<T: Transport + 'static> Clone for Client<T> {
141    fn clone(&self) -> Self {
142        Self {
143            inner: Arc::clone(&self.inner),
144        }
145    }
146}
147
148impl<T: Transport + 'static> Drop for ClientInner<T> {
149    fn drop(&mut self) {
150        // Best-effort cleanup if shutdown() wasn't called
151        // This is a safety net, but applications SHOULD call shutdown() explicitly
152
153        // Single warning to catch attention
154        tracing::warn!(
155            "MCP Client dropped without explicit shutdown() - call client.shutdown().await for clean resource cleanup"
156        );
157
158        // Details at debug level to avoid noise in production
159        tracing::debug!("   📘 RECOMMENDATION: Call client.shutdown().await before dropping");
160        tracing::debug!(
161            "   🐛 IMPACT: Background tasks (especially WebSocket reconnection) may continue running"
162        );
163        tracing::debug!("   💡 See client shutdown documentation for best practices");
164
165        // We can shutdown the dispatcher (it's synchronous)
166        self.protocol.dispatcher().shutdown();
167        tracing::debug!("   ✅ Message dispatcher stopped");
168
169        // ⚠️  LIMITATION: Cannot call transport.disconnect() from Drop because it's async
170        //    This means:
171        //    - WebSocket: Close frame not sent, reconnection tasks keep running
172        //    - HTTP: Connections may not close cleanly
173        //    - All transports: Background tasks may be orphaned
174        tracing::debug!("   ❌ Transport NOT disconnected (Drop cannot call async methods)");
175        tracing::debug!("   ⚠️  WebSocket reconnection and other background tasks may continue");
176    }
177}
178
179impl<T: Transport + 'static> Client<T> {
180    /// Create a new client with the specified transport
181    ///
182    /// Creates a new MCP client instance with default capabilities.
183    /// The client must be initialized before use by calling `initialize()`.
184    ///
185    /// # Arguments
186    ///
187    /// * `transport` - The transport implementation to use for communication
188    ///
189    /// # Examples
190    ///
191    /// ```rust,no_run
192    /// use turbomcp_client::Client;
193    /// use turbomcp_transport::stdio::StdioTransport;
194    ///
195    /// let transport = StdioTransport::new();
196    /// let client = Client::new(transport);
197    /// ```
198    pub fn new(transport: T) -> Self {
199        let capabilities = ClientCapabilities::default();
200        let client = Self {
201            inner: Arc::new(ClientInner {
202                protocol: ProtocolClient::new(transport),
203                capabilities: capabilities.clone(),
204                initialized: AtomicBool::new(false),
205                sampling_handler: Arc::new(StdMutex::new(None)),
206                handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
207                plugin_registry: Arc::new(tokio::sync::Mutex::new(
208                    crate::plugins::PluginRegistry::new(),
209                )),
210                handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), // ✅ Configurable concurrent handlers
211            }),
212        };
213
214        // Register dispatcher handlers for bidirectional communication
215        client.register_dispatcher_handlers();
216
217        client
218    }
219
220    /// Create a new client with custom capabilities
221    ///
222    /// # Arguments
223    ///
224    /// * `transport` - The transport implementation to use
225    /// * `capabilities` - The client capabilities to negotiate
226    ///
227    /// # Examples
228    ///
229    /// ```rust,no_run
230    /// use turbomcp_client::{Client, ClientCapabilities};
231    /// use turbomcp_transport::stdio::StdioTransport;
232    ///
233    /// let capabilities = ClientCapabilities {
234    ///     tools: true,
235    ///     prompts: true,
236    ///     resources: false,
237    ///     sampling: false,
238    ///     max_concurrent_handlers: 100,
239    /// };
240    ///
241    /// let transport = StdioTransport::new();
242    /// let client = Client::with_capabilities(transport, capabilities);
243    /// ```
244    pub fn with_capabilities(transport: T, capabilities: ClientCapabilities) -> Self {
245        let client = Self {
246            inner: Arc::new(ClientInner {
247                protocol: ProtocolClient::new(transport),
248                capabilities: capabilities.clone(),
249                initialized: AtomicBool::new(false),
250                sampling_handler: Arc::new(StdMutex::new(None)),
251                handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
252                plugin_registry: Arc::new(tokio::sync::Mutex::new(
253                    crate::plugins::PluginRegistry::new(),
254                )),
255                handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), // ✅ Configurable concurrent handlers
256            }),
257        };
258
259        // Register dispatcher handlers for bidirectional communication
260        client.register_dispatcher_handlers();
261
262        client
263    }
264
265    /// Shutdown the client and clean up all resources
266    ///
267    /// This method performs a **graceful shutdown** of the MCP client by:
268    /// 1. Stopping the message dispatcher background task
269    /// 2. Disconnecting the transport (closes connection, stops background tasks)
270    ///
271    /// **CRITICAL**: After calling `shutdown()`, the client can no longer be used.
272    ///
273    /// # Why This Method Exists
274    ///
275    /// The `Drop` implementation cannot call async methods like `transport.disconnect()`,
276    /// which is required for proper cleanup of WebSocket connections and background tasks.
277    /// Without calling `shutdown()`, WebSocket reconnection tasks will continue running.
278    ///
279    /// # Best Practices
280    ///
281    /// - **Always call `shutdown()` before dropping** for clean resource cleanup
282    /// - For applications: call in signal handlers (`SIGINT`, `SIGTERM`)
283    /// - For tests: call in cleanup/teardown
284    /// - If forgotten, Drop will log warnings and do best-effort cleanup
285    ///
286    /// # Examples
287    ///
288    /// ```rust,no_run
289    /// # use turbomcp_client::Client;
290    /// # use turbomcp_transport::stdio::StdioTransport;
291    /// # async fn example() -> turbomcp_protocol::Result<()> {
292    /// let client = Client::new(StdioTransport::new());
293    /// client.initialize().await?;
294    ///
295    /// // Use client...
296    /// let _tools = client.list_tools().await?;
297    ///
298    /// // ✅ Clean shutdown
299    /// client.shutdown().await?;
300    /// # Ok(())
301    /// # }
302    /// ```
303    pub async fn shutdown(&self) -> Result<()> {
304        tracing::info!("🛑 Shutting down MCP client");
305
306        // 1. Shutdown message dispatcher
307        self.inner.protocol.dispatcher().shutdown();
308        tracing::debug!("✅ Message dispatcher stopped");
309
310        // 2. Disconnect transport (WebSocket: stops reconnection, HTTP: closes connections)
311        match self.inner.protocol.transport().disconnect().await {
312            Ok(()) => {
313                tracing::info!("✅ Transport disconnected successfully");
314            }
315            Err(e) => {
316                tracing::warn!("Transport disconnect error (may already be closed): {}", e);
317            }
318        }
319
320        tracing::info!("✅ MCP client shutdown complete");
321        Ok(())
322    }
323}
324
325// ============================================================================
326// HTTP-Specific Convenience Constructors (Feature-Gated)
327// ============================================================================
328
329#[cfg(feature = "http")]
330impl Client<turbomcp_transport::streamable_http_client::StreamableHttpClientTransport> {
331    /// Connect to an HTTP MCP server (convenience method)
332    ///
333    /// This is a convenient one-liner alternative to manual configuration.
334    /// Creates an HTTP client, connects, and initializes in one call.
335    ///
336    /// # Arguments
337    ///
338    /// * `url` - The base URL of the MCP server (e.g., "http://localhost:8080")
339    ///
340    /// # Returns
341    ///
342    /// Returns an initialized `Client` ready to use.
343    ///
344    /// # Errors
345    ///
346    /// Returns an error if:
347    /// - The URL is invalid
348    /// - Connection to the server fails
349    /// - Initialization handshake fails
350    ///
351    /// # Examples
352    ///
353    /// ```rust,no_run
354    /// use turbomcp_client::Client;
355    ///
356    /// # async fn example() -> turbomcp_protocol::Result<()> {
357    /// // Convenient one-liner - balanced with server DX
358    /// let client = Client::connect_http("http://localhost:8080").await?;
359    ///
360    /// // Now use it directly
361    /// let tools = client.list_tools().await?;
362    /// # Ok(())
363    /// # }
364    /// ```
365    ///
366    /// Compare to verbose approach (10+ lines):
367    /// ```rust,no_run
368    /// use turbomcp_client::Client;
369    /// use turbomcp_transport::streamable_http_client::{
370    ///     StreamableHttpClientConfig, StreamableHttpClientTransport
371    /// };
372    ///
373    /// # async fn example() -> turbomcp_protocol::Result<()> {
374    /// let config = StreamableHttpClientConfig {
375    ///     base_url: "http://localhost:8080".to_string(),
376    ///     ..Default::default()
377    /// };
378    /// let transport = StreamableHttpClientTransport::new(config);
379    /// let client = Client::new(transport);
380    /// client.initialize().await?;
381    /// # Ok(())
382    /// # }
383    /// ```
384    pub async fn connect_http(url: impl Into<String>) -> Result<Self> {
385        use turbomcp_transport::streamable_http_client::{
386            StreamableHttpClientConfig, StreamableHttpClientTransport,
387        };
388
389        let config = StreamableHttpClientConfig {
390            base_url: url.into(),
391            ..Default::default()
392        };
393
394        let transport = StreamableHttpClientTransport::new(config);
395        let client = Self::new(transport);
396
397        // Initialize connection immediately
398        client.initialize().await?;
399
400        Ok(client)
401    }
402
403    /// Connect to an HTTP MCP server with custom configuration
404    ///
405    /// Provides more control than `connect_http()` while still being ergonomic.
406    ///
407    /// # Arguments
408    ///
409    /// * `url` - The base URL of the MCP server
410    /// * `config_fn` - Function to customize the configuration
411    ///
412    /// # Examples
413    ///
414    /// ```rust,no_run
415    /// use turbomcp_client::Client;
416    /// use std::time::Duration;
417    ///
418    /// # async fn example() -> turbomcp_protocol::Result<()> {
419    /// let client = Client::connect_http_with("http://localhost:8080", |config| {
420    ///     config.timeout = Duration::from_secs(60);
421    ///     config.endpoint_path = "/api/mcp".to_string();
422    /// }).await?;
423    /// # Ok(())
424    /// # }
425    /// ```
426    pub async fn connect_http_with<F>(url: impl Into<String>, config_fn: F) -> Result<Self>
427    where
428        F: FnOnce(&mut turbomcp_transport::streamable_http_client::StreamableHttpClientConfig),
429    {
430        use turbomcp_transport::streamable_http_client::{
431            StreamableHttpClientConfig, StreamableHttpClientTransport,
432        };
433
434        let mut config = StreamableHttpClientConfig {
435            base_url: url.into(),
436            ..Default::default()
437        };
438
439        config_fn(&mut config);
440
441        let transport = StreamableHttpClientTransport::new(config);
442        let client = Self::new(transport);
443
444        client.initialize().await?;
445
446        Ok(client)
447    }
448}
449
450// ============================================================================
451// TCP-Specific Convenience Constructors (Feature-Gated)
452// ============================================================================
453
454#[cfg(feature = "tcp")]
455impl Client<turbomcp_transport::tcp::TcpTransport> {
456    /// Connect to a TCP MCP server (convenience method)
457    ///
458    /// Convenient one-liner for TCP connections - balanced DX.
459    ///
460    /// # Arguments
461    ///
462    /// * `addr` - Server address (e.g., "127.0.0.1:8765" or localhost:8765")
463    ///
464    /// # Returns
465    ///
466    /// Returns an initialized `Client` ready to use.
467    ///
468    /// # Examples
469    ///
470    /// ```rust,no_run
471    /// # #[cfg(feature = "tcp")]
472    /// use turbomcp_client::Client;
473    ///
474    /// # async fn example() -> turbomcp_protocol::Result<()> {
475    /// let client = Client::connect_tcp("127.0.0.1:8765").await?;
476    /// let tools = client.list_tools().await?;
477    /// # Ok(())
478    /// # }
479    /// ```
480    pub async fn connect_tcp(addr: impl AsRef<str>) -> Result<Self> {
481        use std::net::SocketAddr;
482        use turbomcp_transport::tcp::TcpTransport;
483
484        let server_addr: SocketAddr = addr
485            .as_ref()
486            .parse()
487            .map_err(|e| Error::bad_request(format!("Invalid address: {}", e)))?;
488
489        // Client binds to 0.0.0.0:0 (any available port)
490        let bind_addr: SocketAddr = if server_addr.is_ipv6() {
491            "[::]:0".parse().unwrap()
492        } else {
493            "0.0.0.0:0".parse().unwrap()
494        };
495
496        let transport = TcpTransport::new_client(bind_addr, server_addr);
497        let client = Self::new(transport);
498
499        client.initialize().await?;
500
501        Ok(client)
502    }
503}
504
505// ============================================================================
506// Unix Socket-Specific Convenience Constructors (Feature-Gated)
507// ============================================================================
508
509#[cfg(all(unix, feature = "unix"))]
510impl Client<turbomcp_transport::unix::UnixTransport> {
511    /// Connect to a Unix socket MCP server (convenience method)
512    ///
513    /// Convenient one-liner for Unix socket IPC - balanced DX.
514    ///
515    /// # Arguments
516    ///
517    /// * `path` - Socket file path (e.g., "/tmp/mcp.sock")
518    ///
519    /// # Returns
520    ///
521    /// Returns an initialized `Client` ready to use.
522    ///
523    /// # Examples
524    ///
525    /// ```rust,no_run
526    /// # #[cfg(all(unix, feature = "unix"))]
527    /// use turbomcp_client::Client;
528    ///
529    /// # async fn example() -> turbomcp_protocol::Result<()> {
530    /// let client = Client::connect_unix("/tmp/mcp.sock").await?;
531    /// let tools = client.list_tools().await?;
532    /// # Ok(())
533    /// # }
534    /// ```
535    pub async fn connect_unix(path: impl Into<std::path::PathBuf>) -> Result<Self> {
536        use turbomcp_transport::unix::UnixTransport;
537
538        let transport = UnixTransport::new_client(path.into());
539        let client = Self::new(transport);
540
541        client.initialize().await?;
542
543        Ok(client)
544    }
545}
546
547impl<T: Transport + 'static> Client<T> {
548    /// Register message handlers with the dispatcher
549    ///
550    /// This method sets up the callbacks that handle server-initiated requests
551    /// and notifications. The dispatcher's background task routes incoming
552    /// messages to these handlers.
553    ///
554    /// This is called automatically during Client construction (in `new()` and
555    /// `with_capabilities()`), so you don't need to call it manually.
556    ///
557    /// ## How It Works
558    ///
559    /// The handlers are synchronous closures that spawn async tasks to do the
560    /// actual work. This allows the dispatcher to continue routing messages
561    /// without blocking on handler execution.
562    fn register_dispatcher_handlers(&self) {
563        let dispatcher = self.inner.protocol.dispatcher();
564        let client_for_requests = self.clone();
565        let client_for_notifications = self.clone();
566
567        // Request handler (elicitation, sampling, etc.)
568        let semaphore = Arc::clone(&self.inner.handler_semaphore);
569        let request_handler = Arc::new(move |request: JsonRpcRequest| {
570            let client = client_for_requests.clone();
571            let method = request.method.clone();
572            let req_id = request.id.clone();
573            let semaphore = Arc::clone(&semaphore);
574
575            // ✅ Spawn async task with bounded concurrency
576            tokio::spawn(async move {
577                // Acquire permit (blocks if 100 requests already in flight)
578                let _permit = semaphore
579                    .acquire()
580                    .await
581                    .expect("Semaphore should not be closed");
582
583                tracing::debug!(
584                    "🔄 [request_handler] Handling server-initiated request: method={}, id={:?}",
585                    method,
586                    req_id
587                );
588                if let Err(e) = client.handle_request(request).await {
589                    tracing::error!(
590                        "❌ [request_handler] Error handling server request '{}': {}",
591                        method,
592                        e
593                    );
594                    tracing::error!("   Request ID: {:?}", req_id);
595                    tracing::error!("   Error kind: {:?}", e.kind);
596                } else {
597                    tracing::debug!(
598                        "✅ [request_handler] Successfully handled server request: method={}, id={:?}",
599                        method,
600                        req_id
601                    );
602                }
603                // Permit automatically released on drop ✅
604            });
605            Ok(())
606        });
607
608        // Notification handler
609        let semaphore_notif = Arc::clone(&self.inner.handler_semaphore);
610        let notification_handler = Arc::new(move |notification: JsonRpcNotification| {
611            let client = client_for_notifications.clone();
612            let semaphore = Arc::clone(&semaphore_notif);
613
614            // ✅ Spawn async task with bounded concurrency
615            tokio::spawn(async move {
616                // Acquire permit (blocks if 100 handlers already in flight)
617                let _permit = semaphore
618                    .acquire()
619                    .await
620                    .expect("Semaphore should not be closed");
621
622                if let Err(e) = client.handle_notification(notification).await {
623                    tracing::error!("Error handling server notification: {}", e);
624                }
625                // Permit automatically released on drop ✅
626            });
627            Ok(())
628        });
629
630        // Register handlers synchronously - no race condition!
631        // The set_* methods are now synchronous with std::sync::Mutex
632        dispatcher.set_request_handler(request_handler);
633        dispatcher.set_notification_handler(notification_handler);
634        tracing::debug!("Dispatcher handlers registered successfully");
635    }
636
637    /// Handle server-initiated requests (elicitation, sampling, roots)
638    ///
639    /// This method is called by the MessageDispatcher when it receives a request
640    /// from the server. It routes the request to the appropriate handler based on
641    /// the method name.
642    async fn handle_request(&self, request: JsonRpcRequest) -> Result<()> {
643        match request.method.as_str() {
644            "sampling/createMessage" => {
645                let handler_opt = self
646                    .inner
647                    .sampling_handler
648                    .lock()
649                    .expect("sampling_handler mutex poisoned")
650                    .clone();
651                if let Some(handler) = handler_opt {
652                    // Extract request ID for proper correlation
653                    let request_id = match &request.id {
654                        turbomcp_protocol::MessageId::String(s) => s.clone(),
655                        turbomcp_protocol::MessageId::Number(n) => n.to_string(),
656                        turbomcp_protocol::MessageId::Uuid(u) => u.to_string(),
657                    };
658
659                    let params: CreateMessageRequest =
660                        serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
661                            .map_err(|e| {
662                            Error::protocol(format!("Invalid createMessage params: {}", e))
663                        })?;
664
665                    match handler.handle_create_message(request_id, params).await {
666                        Ok(result) => {
667                            let result_value = serde_json::to_value(result).map_err(|e| {
668                                Error::protocol(format!("Failed to serialize response: {}", e))
669                            })?;
670                            let response = JsonRpcResponse::success(result_value, request.id);
671                            self.send_response(response).await?;
672                        }
673                        Err(e) => {
674                            tracing::warn!(
675                                "⚠️  [handle_request] Sampling handler returned error: {}",
676                                e
677                            );
678
679                            // Preserve error semantics by checking actual error type
680                            // This allows proper error code propagation for retry logic
681                            let (code, message) = if let Some(handler_err) =
682                                e.downcast_ref::<HandlerError>()
683                            {
684                                // HandlerError has explicit JSON-RPC code mapping
685                                let json_err = handler_err.into_jsonrpc_error();
686                                tracing::info!(
687                                    "📋 [handle_request] HandlerError mapped to JSON-RPC code: {}",
688                                    json_err.code
689                                );
690                                (json_err.code, json_err.message)
691                            } else if let Some(proto_err) =
692                                e.downcast_ref::<turbomcp_protocol::Error>()
693                            {
694                                // Protocol errors have ErrorKind-based mapping
695                                tracing::info!(
696                                    "📋 [handle_request] Protocol error mapped to code: {}",
697                                    proto_err.jsonrpc_error_code()
698                                );
699                                (proto_err.jsonrpc_error_code(), proto_err.to_string())
700                            } else {
701                                // Generic errors default to Internal (-32603)
702                                // Log the error type for debugging (should rarely hit this path)
703                                tracing::warn!(
704                                    "📋 [handle_request] Sampling handler returned unknown error type (not HandlerError or Protocol error): {}",
705                                    std::any::type_name_of_val(&*e)
706                                );
707                                (-32603, format!("Sampling handler error: {}", e))
708                            };
709
710                            let error = turbomcp_protocol::jsonrpc::JsonRpcError {
711                                code,
712                                message,
713                                data: None,
714                            };
715                            let response =
716                                JsonRpcResponse::error_response(error, request.id.clone());
717
718                            tracing::info!(
719                                "🔄 [handle_request] Attempting to send error response for request: {:?}",
720                                request.id
721                            );
722                            self.send_response(response).await?;
723                            tracing::info!(
724                                "✅ [handle_request] Error response sent successfully for request: {:?}",
725                                request.id
726                            );
727                        }
728                    }
729                } else {
730                    // No handler configured
731                    let error = turbomcp_protocol::jsonrpc::JsonRpcError {
732                        code: -32601,
733                        message: "Sampling not supported".to_string(),
734                        data: None,
735                    };
736                    let response = JsonRpcResponse::error_response(error, request.id);
737                    self.send_response(response).await?;
738                }
739            }
740            "roots/list" => {
741                // Handle roots/list request from server
742                // Clone the handler Arc to avoid holding mutex across await
743                let handler_opt = self
744                    .inner
745                    .handlers
746                    .lock()
747                    .expect("handlers mutex poisoned")
748                    .roots
749                    .clone();
750
751                let roots_result = if let Some(handler) = handler_opt {
752                    handler.handle_roots_request().await
753                } else {
754                    // No handler - return empty list per MCP spec
755                    Ok(Vec::new())
756                };
757
758                match roots_result {
759                    Ok(roots) => {
760                        let result_value =
761                            serde_json::to_value(turbomcp_protocol::types::ListRootsResult {
762                                roots,
763                                _meta: None,
764                            })
765                            .map_err(|e| {
766                                Error::protocol(format!(
767                                    "Failed to serialize roots response: {}",
768                                    e
769                                ))
770                            })?;
771                        let response = JsonRpcResponse::success(result_value, request.id);
772                        self.send_response(response).await?;
773                    }
774                    Err(e) => {
775                        // FIXED: Extract error code from HandlerError instead of hardcoding -32603
776                        // Roots handler returns HandlerResult<Vec<Root>>, so error is HandlerError
777                        let json_err = e.into_jsonrpc_error();
778                        let response = JsonRpcResponse::error_response(json_err, request.id);
779                        self.send_response(response).await?;
780                    }
781                }
782            }
783            "elicitation/create" => {
784                // Clone handler Arc before await to avoid holding mutex across await
785                let handler_opt = self
786                    .inner
787                    .handlers
788                    .lock()
789                    .expect("handlers mutex poisoned")
790                    .elicitation
791                    .clone();
792                if let Some(handler) = handler_opt {
793                    // Parse elicitation request params as MCP protocol type
794                    let proto_request: turbomcp_protocol::types::ElicitRequest =
795                        serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
796                            .map_err(|e| {
797                            Error::protocol(format!("Invalid elicitation params: {}", e))
798                        })?;
799
800                    // Wrap protocol request with ID for handler (preserves type safety!)
801                    let handler_request =
802                        crate::handlers::ElicitationRequest::new(request.id.clone(), proto_request);
803
804                    // Call the registered elicitation handler
805                    match handler.handle_elicitation(handler_request).await {
806                        Ok(elicit_response) => {
807                            // Convert handler response back to protocol type
808                            let proto_result = elicit_response.into_protocol();
809                            let result_value = serde_json::to_value(proto_result).map_err(|e| {
810                                Error::protocol(format!(
811                                    "Failed to serialize elicitation response: {}",
812                                    e
813                                ))
814                            })?;
815                            let response = JsonRpcResponse::success(result_value, request.id);
816                            self.send_response(response).await?;
817                        }
818                        Err(e) => {
819                            // Convert handler error to JSON-RPC error using centralized mapping
820                            let response =
821                                JsonRpcResponse::error_response(e.into_jsonrpc_error(), request.id);
822                            self.send_response(response).await?;
823                        }
824                    }
825                } else {
826                    // No handler configured - elicitation not supported
827                    let error = turbomcp_protocol::jsonrpc::JsonRpcError {
828                        code: -32601,
829                        message: "Elicitation not supported - no handler registered".to_string(),
830                        data: None,
831                    };
832                    let response = JsonRpcResponse::error_response(error, request.id);
833                    self.send_response(response).await?;
834                }
835            }
836            _ => {
837                // Unknown method
838                let error = turbomcp_protocol::jsonrpc::JsonRpcError {
839                    code: -32601,
840                    message: format!("Method not found: {}", request.method),
841                    data: None,
842                };
843                let response = JsonRpcResponse::error_response(error, request.id);
844                self.send_response(response).await?;
845            }
846        }
847        Ok(())
848    }
849
850    /// Handle server-initiated notifications
851    ///
852    /// Routes notifications to appropriate handlers based on method name.
853    /// MCP defines several notification types that servers can send to clients:
854    ///
855    /// - `notifications/progress` - Progress updates for long-running operations
856    /// - `notifications/message` - Log messages from server
857    /// - `notifications/resources/updated` - Resource content changed
858    /// - `notifications/resources/list_changed` - Resource list changed
859    async fn handle_notification(&self, notification: JsonRpcNotification) -> Result<()> {
860        match notification.method.as_str() {
861            "notifications/progress" => {
862                // Progress tracking has been removed
863                tracing::debug!(
864                    "Received progress notification - progress tracking has been removed"
865                );
866            }
867
868            "notifications/message" => {
869                // Route to log handler
870                let handler_opt = self
871                    .inner
872                    .handlers
873                    .lock()
874                    .expect("handlers mutex poisoned")
875                    .get_log_handler();
876
877                if let Some(handler) = handler_opt {
878                    // Parse log message
879                    let log: crate::handlers::LoggingNotification = serde_json::from_value(
880                        notification.params.unwrap_or(serde_json::Value::Null),
881                    )
882                    .map_err(|e| Error::protocol(format!("Invalid log notification: {}", e)))?;
883
884                    // Call handler
885                    if let Err(e) = handler.handle_log(log).await {
886                        tracing::error!("Log handler error: {}", e);
887                    }
888                } else {
889                    tracing::debug!("Received log notification but no handler registered");
890                }
891            }
892
893            "notifications/resources/updated" => {
894                // Route to resource update handler
895                let handler_opt = self
896                    .inner
897                    .handlers
898                    .lock()
899                    .expect("handlers mutex poisoned")
900                    .get_resource_update_handler();
901
902                if let Some(handler) = handler_opt {
903                    // Parse resource update notification
904                    let update: crate::handlers::ResourceUpdatedNotification =
905                        serde_json::from_value(
906                            notification.params.unwrap_or(serde_json::Value::Null),
907                        )
908                        .map_err(|e| {
909                            Error::protocol(format!("Invalid resource update notification: {}", e))
910                        })?;
911
912                    // Call handler
913                    if let Err(e) = handler.handle_resource_update(update).await {
914                        tracing::error!("Resource update handler error: {}", e);
915                    }
916                } else {
917                    tracing::debug!(
918                        "Received resource update notification but no handler registered"
919                    );
920                }
921            }
922
923            "notifications/resources/list_changed" => {
924                // Route to resource list changed handler
925                let handler_opt = self
926                    .inner
927                    .handlers
928                    .lock()
929                    .expect("handlers mutex poisoned")
930                    .get_resource_list_changed_handler();
931
932                if let Some(handler) = handler_opt {
933                    if let Err(e) = handler.handle_resource_list_changed().await {
934                        tracing::error!("Resource list changed handler error: {}", e);
935                    }
936                } else {
937                    tracing::debug!(
938                        "Resource list changed notification received (no handler registered)"
939                    );
940                }
941            }
942
943            "notifications/prompts/list_changed" => {
944                // Route to prompt list changed handler
945                let handler_opt = self
946                    .inner
947                    .handlers
948                    .lock()
949                    .expect("handlers mutex poisoned")
950                    .get_prompt_list_changed_handler();
951
952                if let Some(handler) = handler_opt {
953                    if let Err(e) = handler.handle_prompt_list_changed().await {
954                        tracing::error!("Prompt list changed handler error: {}", e);
955                    }
956                } else {
957                    tracing::debug!(
958                        "Prompt list changed notification received (no handler registered)"
959                    );
960                }
961            }
962
963            "notifications/tools/list_changed" => {
964                // Route to tool list changed handler
965                let handler_opt = self
966                    .inner
967                    .handlers
968                    .lock()
969                    .expect("handlers mutex poisoned")
970                    .get_tool_list_changed_handler();
971
972                if let Some(handler) = handler_opt {
973                    if let Err(e) = handler.handle_tool_list_changed().await {
974                        tracing::error!("Tool list changed handler error: {}", e);
975                    }
976                } else {
977                    tracing::debug!(
978                        "Tool list changed notification received (no handler registered)"
979                    );
980                }
981            }
982
983            "notifications/cancelled" => {
984                // Route to cancellation handler
985                let handler_opt = self
986                    .inner
987                    .handlers
988                    .lock()
989                    .expect("handlers mutex poisoned")
990                    .get_cancellation_handler();
991
992                if let Some(handler) = handler_opt {
993                    // Parse cancellation notification
994                    let cancellation: crate::handlers::CancelledNotification =
995                        serde_json::from_value(
996                            notification.params.unwrap_or(serde_json::Value::Null),
997                        )
998                        .map_err(|e| {
999                            Error::protocol(format!("Invalid cancellation notification: {}", e))
1000                        })?;
1001
1002                    // Call handler
1003                    if let Err(e) = handler.handle_cancellation(cancellation).await {
1004                        tracing::error!("Cancellation handler error: {}", e);
1005                    }
1006                } else {
1007                    tracing::debug!("Cancellation notification received (no handler registered)");
1008                }
1009            }
1010
1011            _ => {
1012                // Unknown notification type
1013                tracing::debug!("Received unknown notification: {}", notification.method);
1014            }
1015        }
1016
1017        Ok(())
1018    }
1019
1020    async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
1021        tracing::info!(
1022            "📤 [send_response] Sending JSON-RPC response: id={:?}",
1023            response.id
1024        );
1025
1026        let payload = serde_json::to_vec(&response).map_err(|e| {
1027            tracing::error!("❌ [send_response] Failed to serialize response: {}", e);
1028            Error::protocol(format!("Failed to serialize response: {}", e))
1029        })?;
1030
1031        tracing::debug!(
1032            "📤 [send_response] Response payload: {} bytes",
1033            payload.len()
1034        );
1035        tracing::debug!(
1036            "📤 [send_response] Response JSON: {}",
1037            String::from_utf8_lossy(&payload)
1038        );
1039
1040        let message = TransportMessage::new(
1041            turbomcp_protocol::MessageId::from("response".to_string()),
1042            payload.into(),
1043        );
1044
1045        self.inner
1046            .protocol
1047            .transport()
1048            .send(message)
1049            .await
1050            .map_err(|e| {
1051                tracing::error!("❌ [send_response] Transport send failed: {}", e);
1052                Error::transport(format!("Failed to send response: {}", e))
1053            })?;
1054
1055        tracing::info!(
1056            "✅ [send_response] Response sent successfully: id={:?}",
1057            response.id
1058        );
1059        Ok(())
1060    }
1061
1062    /// Initialize the connection with the MCP server
1063    ///
1064    /// Performs the initialization handshake with the server, negotiating capabilities
1065    /// and establishing the protocol version. This method must be called before
1066    /// any other operations can be performed.
1067    ///
1068    /// # Returns
1069    ///
1070    /// Returns an `InitializeResult` containing server information and negotiated capabilities.
1071    ///
1072    /// # Errors
1073    ///
1074    /// Returns an error if:
1075    /// - The transport connection fails
1076    /// - The server rejects the initialization request
1077    /// - Protocol negotiation fails
1078    ///
1079    /// # Examples
1080    ///
1081    /// ```rust,no_run
1082    /// # use turbomcp_client::Client;
1083    /// # use turbomcp_transport::stdio::StdioTransport;
1084    /// # async fn example() -> turbomcp_protocol::Result<()> {
1085    /// let mut client = Client::new(StdioTransport::new());
1086    ///
1087    /// let result = client.initialize().await?;
1088    /// println!("Server: {} v{}", result.server_info.name, result.server_info.version);
1089    /// # Ok(())
1090    /// # }
1091    /// ```
1092    pub async fn initialize(&self) -> Result<InitializeResult> {
1093        // Auto-connect transport if not already connected
1094        // This provides consistent DX across all transports (Stdio, TCP, HTTP, WebSocket, Unix)
1095        let transport = self.inner.protocol.transport();
1096        let transport_state = transport.state().await;
1097        if !matches!(
1098            transport_state,
1099            turbomcp_transport::TransportState::Connected
1100        ) {
1101            tracing::debug!(
1102                "Auto-connecting transport (current state: {:?})",
1103                transport_state
1104            );
1105            transport
1106                .connect()
1107                .await
1108                .map_err(|e| Error::transport(format!("Failed to connect transport: {}", e)))?;
1109            tracing::info!("Transport connected successfully");
1110        }
1111
1112        // Build client capabilities based on registered handlers (automatic detection)
1113        let mut client_caps = ProtocolClientCapabilities::default();
1114
1115        // Detect sampling capability from handler
1116        if let Some(sampling_caps) = self.get_sampling_capabilities() {
1117            client_caps.sampling = Some(sampling_caps);
1118        }
1119
1120        // Detect elicitation capability from handler
1121        if let Some(elicitation_caps) = self.get_elicitation_capabilities() {
1122            client_caps.elicitation = Some(elicitation_caps);
1123        }
1124
1125        // Detect roots capability from handler
1126        if let Some(roots_caps) = self.get_roots_capabilities() {
1127            client_caps.roots = Some(roots_caps);
1128        }
1129
1130        // Send MCP initialization request
1131        let request = InitializeRequest {
1132            protocol_version: PROTOCOL_VERSION.to_string(),
1133            capabilities: client_caps,
1134            client_info: turbomcp_protocol::types::Implementation {
1135                name: "turbomcp-client".to_string(),
1136                version: env!("CARGO_PKG_VERSION").to_string(),
1137                title: Some("TurboMCP Client".to_string()),
1138            },
1139            _meta: None,
1140        };
1141
1142        let protocol_response: ProtocolInitializeResult = self
1143            .inner
1144            .protocol
1145            .request("initialize", Some(serde_json::to_value(request)?))
1146            .await?;
1147
1148        // AtomicBool: lock-free store with Ordering::Relaxed
1149        self.inner.initialized.store(true, Ordering::Relaxed);
1150
1151        // Send initialized notification
1152        self.inner
1153            .protocol
1154            .notify("notifications/initialized", None)
1155            .await?;
1156
1157        // Convert protocol response to client response type
1158        Ok(InitializeResult {
1159            server_info: protocol_response.server_info,
1160            server_capabilities: protocol_response.capabilities,
1161        })
1162    }
1163
1164    /// Execute a protocol method with plugin middleware
1165    ///
1166    /// This is a generic helper for wrapping protocol calls with plugin middleware.
1167    pub(crate) async fn execute_with_plugins<R>(
1168        &self,
1169        method_name: &str,
1170        params: Option<serde_json::Value>,
1171    ) -> Result<R>
1172    where
1173        R: serde::de::DeserializeOwned + serde::Serialize + Clone,
1174    {
1175        // Create JSON-RPC request for plugin context
1176        let json_rpc_request = turbomcp_protocol::jsonrpc::JsonRpcRequest {
1177            jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
1178            id: turbomcp_protocol::MessageId::Number(1),
1179            method: method_name.to_string(),
1180            params: params.clone(),
1181        };
1182
1183        // 1. Create request context for plugins
1184        let mut req_ctx =
1185            crate::plugins::RequestContext::new(json_rpc_request, std::collections::HashMap::new());
1186
1187        // 2. Execute before_request plugin middleware
1188        if let Err(e) = self
1189            .inner
1190            .plugin_registry
1191            .lock()
1192            .await
1193            .execute_before_request(&mut req_ctx)
1194            .await
1195        {
1196            return Err(Error::bad_request(format!(
1197                "Plugin before_request failed: {}",
1198                e
1199            )));
1200        }
1201
1202        // 3. Execute the actual protocol call
1203        let start_time = std::time::Instant::now();
1204        let protocol_result: Result<R> = self
1205            .inner
1206            .protocol
1207            .request(method_name, req_ctx.params().cloned())
1208            .await;
1209        let duration = start_time.elapsed();
1210
1211        // 4. Prepare response context
1212        let mut resp_ctx = match protocol_result {
1213            Ok(ref response) => {
1214                let response_value = serde_json::to_value(response.clone())?;
1215                crate::plugins::ResponseContext::new(req_ctx, Some(response_value), None, duration)
1216            }
1217            Err(ref e) => {
1218                crate::plugins::ResponseContext::new(req_ctx, None, Some(*e.clone()), duration)
1219            }
1220        };
1221
1222        // 5. Execute after_response plugin middleware
1223        if let Err(e) = self
1224            .inner
1225            .plugin_registry
1226            .lock()
1227            .await
1228            .execute_after_response(&mut resp_ctx)
1229            .await
1230        {
1231            return Err(Error::bad_request(format!(
1232                "Plugin after_response failed: {}",
1233                e
1234            )));
1235        }
1236
1237        // 6. Return the final result, checking for plugin modifications
1238        match protocol_result {
1239            Ok(ref response) => {
1240                // Check if plugins modified the response
1241                if let Some(modified_response) = resp_ctx.response {
1242                    // Try to deserialize the modified response
1243                    if let Ok(modified_result) =
1244                        serde_json::from_value::<R>(modified_response.clone())
1245                    {
1246                        return Ok(modified_result);
1247                    }
1248                }
1249
1250                // No plugin modifications, use original response
1251                Ok(response.clone())
1252            }
1253            Err(e) => {
1254                // Check if plugins provided an error recovery response
1255                if let Some(recovery_response) = resp_ctx.response {
1256                    if let Ok(recovery_result) = serde_json::from_value::<R>(recovery_response) {
1257                        Ok(recovery_result)
1258                    } else {
1259                        Err(e)
1260                    }
1261                } else {
1262                    Err(e)
1263                }
1264            }
1265        }
1266    }
1267
1268    /// Subscribe to resource change notifications
1269    ///
1270    /// Registers interest in receiving notifications when the specified
1271    /// resource changes. The server will send notifications when the
1272    /// resource is modified, created, or deleted.
1273    ///
1274    /// # Arguments
1275    ///
1276    /// * `uri` - The URI of the resource to monitor
1277    ///
1278    /// # Returns
1279    ///
1280    /// Returns `EmptyResult` on successful subscription.
1281    ///
1282    /// # Errors
1283    ///
1284    /// Returns an error if:
1285    /// - The client is not initialized
1286    /// - The URI is invalid or empty
1287    /// - The server doesn't support subscriptions
1288    /// - The request fails
1289    ///
1290    /// # Examples
1291    ///
1292    /// ```rust,no_run
1293    /// # use turbomcp_client::Client;
1294    /// # use turbomcp_transport::stdio::StdioTransport;
1295    /// # async fn example() -> turbomcp_protocol::Result<()> {
1296    /// let mut client = Client::new(StdioTransport::new());
1297    /// client.initialize().await?;
1298    ///
1299    /// // Subscribe to file changes
1300    /// client.subscribe("file:///watch/directory").await?;
1301    /// println!("Subscribed to resource changes");
1302    /// # Ok(())
1303    /// # }
1304    /// ```
1305    pub async fn subscribe(&self, uri: &str) -> Result<EmptyResult> {
1306        if !self.inner.initialized.load(Ordering::Relaxed) {
1307            return Err(Error::bad_request("Client not initialized"));
1308        }
1309
1310        if uri.is_empty() {
1311            return Err(Error::bad_request("Subscription URI cannot be empty"));
1312        }
1313
1314        // Send resources/subscribe request with plugin middleware
1315        let request = SubscribeRequest {
1316            uri: uri.to_string(),
1317        };
1318
1319        self.execute_with_plugins(
1320            "resources/subscribe",
1321            Some(serde_json::to_value(request).map_err(|e| {
1322                Error::protocol(format!("Failed to serialize subscribe request: {}", e))
1323            })?),
1324        )
1325        .await
1326    }
1327
1328    /// Unsubscribe from resource change notifications
1329    ///
1330    /// Cancels a previous subscription to resource changes. After unsubscribing,
1331    /// the client will no longer receive notifications for the specified resource.
1332    ///
1333    /// # Arguments
1334    ///
1335    /// * `uri` - The URI of the resource to stop monitoring
1336    ///
1337    /// # Returns
1338    ///
1339    /// Returns `EmptyResult` on successful unsubscription.
1340    ///
1341    /// # Errors
1342    ///
1343    /// Returns an error if:
1344    /// - The client is not initialized
1345    /// - The URI is invalid or empty
1346    /// - No active subscription exists for the URI
1347    /// - The request fails
1348    ///
1349    /// # Examples
1350    ///
1351    /// ```rust,no_run
1352    /// # use turbomcp_client::Client;
1353    /// # use turbomcp_transport::stdio::StdioTransport;
1354    /// # async fn example() -> turbomcp_protocol::Result<()> {
1355    /// let mut client = Client::new(StdioTransport::new());
1356    /// client.initialize().await?;
1357    ///
1358    /// // Unsubscribe from file changes
1359    /// client.unsubscribe("file:///watch/directory").await?;
1360    /// println!("Unsubscribed from resource changes");
1361    /// # Ok(())
1362    /// # }
1363    /// ```
1364    pub async fn unsubscribe(&self, uri: &str) -> Result<EmptyResult> {
1365        if !self.inner.initialized.load(Ordering::Relaxed) {
1366            return Err(Error::bad_request("Client not initialized"));
1367        }
1368
1369        if uri.is_empty() {
1370            return Err(Error::bad_request("Unsubscription URI cannot be empty"));
1371        }
1372
1373        // Send resources/unsubscribe request with plugin middleware
1374        let request = UnsubscribeRequest {
1375            uri: uri.to_string(),
1376        };
1377
1378        self.execute_with_plugins(
1379            "resources/unsubscribe",
1380            Some(serde_json::to_value(request).map_err(|e| {
1381                Error::protocol(format!("Failed to serialize unsubscribe request: {}", e))
1382            })?),
1383        )
1384        .await
1385    }
1386
1387    /// Get the client's capabilities configuration
1388    #[must_use]
1389    pub fn capabilities(&self) -> &ClientCapabilities {
1390        &self.inner.capabilities
1391    }
1392
1393    /// Initialize all registered plugins
1394    ///
1395    /// This should be called after registration but before using the client.
1396    pub async fn initialize_plugins(&self) -> Result<()> {
1397        // Set up client context for plugins with actual client capabilities
1398        let mut capabilities = std::collections::HashMap::new();
1399        capabilities.insert(
1400            "protocol_version".to_string(),
1401            serde_json::json!("2024-11-05"),
1402        );
1403        capabilities.insert(
1404            "mcp_version".to_string(),
1405            serde_json::json!(env!("CARGO_PKG_VERSION")),
1406        );
1407        capabilities.insert(
1408            "supports_notifications".to_string(),
1409            serde_json::json!(true),
1410        );
1411        capabilities.insert(
1412            "supports_sampling".to_string(),
1413            serde_json::json!(self.has_sampling_handler()),
1414        );
1415        capabilities.insert("supports_progress".to_string(), serde_json::json!(true));
1416        capabilities.insert("supports_roots".to_string(), serde_json::json!(true));
1417
1418        // Extract client configuration
1419        let mut config = std::collections::HashMap::new();
1420        config.insert(
1421            "client_name".to_string(),
1422            serde_json::json!("turbomcp-client"),
1423        );
1424        config.insert(
1425            "initialized".to_string(),
1426            serde_json::json!(self.inner.initialized.load(Ordering::Relaxed)),
1427        );
1428        config.insert(
1429            "plugin_count".to_string(),
1430            serde_json::json!(self.inner.plugin_registry.lock().await.plugin_count()),
1431        );
1432
1433        let context = crate::plugins::PluginContext::new(
1434            "turbomcp-client".to_string(),
1435            env!("CARGO_PKG_VERSION").to_string(),
1436            capabilities,
1437            config,
1438            vec![], // Will be populated by the registry
1439        );
1440
1441        self.inner
1442            .plugin_registry
1443            .lock()
1444            .await
1445            .set_client_context(context);
1446
1447        // Note: Individual plugins are initialized automatically during registration
1448        // via PluginRegistry::register_plugin(). This method ensures the registry
1449        // has proper client context for any future plugin registrations.
1450        Ok(())
1451    }
1452
1453    /// Cleanup all registered plugins
1454    ///
1455    /// This should be called when the client is being shut down.
1456    pub async fn cleanup_plugins(&self) -> Result<()> {
1457        // Clear the plugin registry - plugins will be dropped and cleaned up automatically
1458        // The Rust ownership system ensures proper cleanup when the Arc<dyn ClientPlugin>
1459        // references are dropped.
1460
1461        // Note: The plugin system uses RAII (Resource Acquisition Is Initialization)
1462        // pattern where plugins clean up their resources in their Drop implementation.
1463        // Replace the registry with a fresh one (mutex ensures safe access)
1464        *self.inner.plugin_registry.lock().await = crate::plugins::PluginRegistry::new();
1465        Ok(())
1466    }
1467
1468    // Note: Capability detection methods (has_*_handler, get_*_capabilities)
1469    // are defined in their respective operation modules:
1470    // - sampling.rs: has_sampling_handler, get_sampling_capabilities
1471    // - handlers.rs: has_elicitation_handler, has_roots_handler
1472    //
1473    // Additional capability getters for elicitation and roots added below
1474    // since they're used during initialization
1475
1476    /// Get elicitation capabilities if handler is registered
1477    /// Automatically detects capability based on registered handler
1478    fn get_elicitation_capabilities(
1479        &self,
1480    ) -> Option<turbomcp_protocol::types::ElicitationCapabilities> {
1481        if self.has_elicitation_handler() {
1482            // Currently returns default capabilities. In the future, schema_validation support
1483            // could be detected from handler traits by adding a HasSchemaValidation marker trait
1484            // that handlers could implement. For now, handlers validate schemas themselves.
1485            Some(turbomcp_protocol::types::ElicitationCapabilities::default())
1486        } else {
1487            None
1488        }
1489    }
1490
1491    /// Get roots capabilities if handler is registered
1492    fn get_roots_capabilities(&self) -> Option<turbomcp_protocol::types::RootsCapabilities> {
1493        if self.has_roots_handler() {
1494            // Roots capabilities indicate whether list can change
1495            Some(turbomcp_protocol::types::RootsCapabilities {
1496                list_changed: Some(true), // Support dynamic roots by default
1497            })
1498        } else {
1499            None
1500        }
1501    }
1502}