turbomcp_client/client/
core.rs

1//! Core Client implementation for MCP communication
2//!
3//! This module contains the main `Client<T>` struct and its implementation,
4//! providing the core MCP client functionality including:
5//!
6//! - Connection initialization and lifecycle management
7//! - Message processing and bidirectional communication
8//! - MCP operation support (tools, prompts, resources, sampling, etc.)
9//! - Plugin middleware integration
10//! - Handler registration and management
11//!
12//! # Architecture
13//!
14//! `Client<T>` is implemented as a cheaply-cloneable Arc wrapper with interior
15//! mutability (same pattern as reqwest and AWS SDK):
16//!
17//! - **AtomicBool** for initialized flag (lock-free)
18//! - **Arc<Mutex<...>>** for handlers/plugins (infrequent mutation)
19//! - **`Arc<ClientInner<T>>`** for cheap cloning
20
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::{Arc, Mutex as StdMutex};
23
24use turbomcp_protocol::jsonrpc::*;
25use turbomcp_protocol::types::{
26    ClientCapabilities as ProtocolClientCapabilities, InitializeResult as ProtocolInitializeResult,
27    *,
28};
29use turbomcp_protocol::{Error, PROTOCOL_VERSION, Result};
30use turbomcp_transport::{Transport, TransportMessage};
31
32use super::config::InitializeResult;
33use super::protocol::ProtocolClient;
34use crate::{ClientCapabilities, handlers::HandlerRegistry, sampling::SamplingHandler};
35
36/// Inner client state with interior mutability
37///
38/// This structure contains the actual client state and is wrapped in Arc<...>
39/// to enable cheap cloning (same pattern as reqwest and AWS SDK).
40pub(super) struct ClientInner<T: Transport + 'static> {
41    /// Protocol client for low-level communication
42    pub(super) protocol: ProtocolClient<T>,
43
44    /// Client capabilities (immutable after construction)
45    pub(super) capabilities: ClientCapabilities,
46
47    /// Initialization state (lock-free atomic boolean)
48    pub(super) initialized: AtomicBool,
49
50    /// Optional sampling handler (mutex for dynamic updates)
51    pub(super) sampling_handler: Arc<StdMutex<Option<Arc<dyn SamplingHandler>>>>,
52
53    /// Handler registry for bidirectional communication (mutex for registration)
54    pub(super) handlers: Arc<StdMutex<HandlerRegistry>>,
55
56    /// Plugin registry for middleware (tokio mutex - holds across await)
57    pub(super) plugin_registry: Arc<tokio::sync::Mutex<crate::plugins::PluginRegistry>>,
58}
59
60/// The core MCP client implementation
61///
62/// Client provides a comprehensive interface for communicating with MCP servers,
63/// supporting all protocol features including tools, prompts, resources, sampling,
64/// elicitation, and bidirectional communication patterns.
65///
66/// # Clone Pattern
67///
68/// `Client<T>` is cheaply cloneable via Arc (same pattern as reqwest and AWS SDK).
69/// All clones share the same underlying connection and state:
70///
71/// ```rust,no_run
72/// use turbomcp_client::Client;
73/// use turbomcp_transport::stdio::StdioTransport;
74///
75/// # async fn example() -> turbomcp_protocol::Result<()> {
76/// let client = Client::new(StdioTransport::new());
77/// client.initialize().await?;
78///
79/// // Cheap clone - shares same connection
80/// let client2 = client.clone();
81/// tokio::spawn(async move {
82///     client2.list_tools().await.ok();
83/// });
84/// # Ok(())
85/// # }
86/// ```
87///
88/// The client must be initialized before use by calling `initialize()` to perform
89/// the MCP handshake and capability negotiation.
90///
91/// # Features
92///
93/// - **Protocol Compliance**: Full MCP 2025-06-18 specification support
94/// - **Bidirectional Communication**: Server-initiated requests and client responses
95/// - **Plugin Middleware**: Extensible request/response processing
96/// - **Handler Registry**: Callbacks for server-initiated operations
97/// - **Connection Management**: Robust error handling and recovery
98/// - **Type Safety**: Compile-time guarantees for MCP message types
99/// - **Cheap Cloning**: Arc-based sharing like reqwest/AWS SDK
100///
101/// # Examples
102///
103/// ```rust,no_run
104/// use turbomcp_client::Client;
105/// use turbomcp_transport::stdio::StdioTransport;
106/// use std::collections::HashMap;
107///
108/// # async fn example() -> turbomcp_protocol::Result<()> {
109/// // Create and initialize client (no mut needed!)
110/// let client = Client::new(StdioTransport::new());
111/// let init_result = client.initialize().await?;
112/// println!("Connected to: {}", init_result.server_info.name);
113///
114/// // Use MCP operations
115/// let tools = client.list_tools().await?;
116/// let mut args = HashMap::new();
117/// args.insert("input".to_string(), serde_json::json!("test"));
118/// let result = client.call_tool("my_tool", Some(args)).await?;
119/// # Ok(())
120/// # }
121/// ```
122pub struct Client<T: Transport + 'static> {
123    pub(super) inner: Arc<ClientInner<T>>,
124}
125
126/// Clone implementation via Arc (same pattern as reqwest/AWS SDK)
127///
128/// Cloning a Client is cheap (just an Arc clone) and all clones share
129/// the same underlying connection and state.
130impl<T: Transport + 'static> Clone for Client<T> {
131    fn clone(&self) -> Self {
132        Self {
133            inner: Arc::clone(&self.inner),
134        }
135    }
136}
137
138impl<T: Transport + 'static> Drop for ClientInner<T> {
139    fn drop(&mut self) {
140        // Shutdown the dispatcher's background task when the LAST Client reference is dropped
141        // This prevents the background task from running forever after all clients are dropped
142        tracing::debug!("Last Client reference dropped - shutting down message dispatcher");
143        self.protocol.dispatcher().shutdown();
144    }
145}
146
147impl<T: Transport + 'static> Client<T> {
148    /// Create a new client with the specified transport
149    ///
150    /// Creates a new MCP client instance with default capabilities.
151    /// The client must be initialized before use by calling `initialize()`.
152    ///
153    /// # Arguments
154    ///
155    /// * `transport` - The transport implementation to use for communication
156    ///
157    /// # Examples
158    ///
159    /// ```rust,no_run
160    /// use turbomcp_client::Client;
161    /// use turbomcp_transport::stdio::StdioTransport;
162    ///
163    /// let transport = StdioTransport::new();
164    /// let client = Client::new(transport);
165    /// ```
166    pub fn new(transport: T) -> Self {
167        let client = Self {
168            inner: Arc::new(ClientInner {
169                protocol: ProtocolClient::new(transport),
170                capabilities: ClientCapabilities::default(),
171                initialized: AtomicBool::new(false),
172                sampling_handler: Arc::new(StdMutex::new(None)),
173                handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
174                plugin_registry: Arc::new(tokio::sync::Mutex::new(
175                    crate::plugins::PluginRegistry::new(),
176                )),
177            }),
178        };
179
180        // Register dispatcher handlers for bidirectional communication
181        client.register_dispatcher_handlers();
182
183        client
184    }
185
186    /// Create a new client with custom capabilities
187    ///
188    /// # Arguments
189    ///
190    /// * `transport` - The transport implementation to use
191    /// * `capabilities` - The client capabilities to negotiate
192    ///
193    /// # Examples
194    ///
195    /// ```rust,no_run
196    /// use turbomcp_client::{Client, ClientCapabilities};
197    /// use turbomcp_transport::stdio::StdioTransport;
198    ///
199    /// let capabilities = ClientCapabilities {
200    ///     tools: true,
201    ///     prompts: true,
202    ///     resources: false,
203    ///     sampling: false,
204    /// };
205    ///
206    /// let transport = StdioTransport::new();
207    /// let client = Client::with_capabilities(transport, capabilities);
208    /// ```
209    pub fn with_capabilities(transport: T, capabilities: ClientCapabilities) -> Self {
210        let client = Self {
211            inner: Arc::new(ClientInner {
212                protocol: ProtocolClient::new(transport),
213                capabilities,
214                initialized: AtomicBool::new(false),
215                sampling_handler: Arc::new(StdMutex::new(None)),
216                handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
217                plugin_registry: Arc::new(tokio::sync::Mutex::new(
218                    crate::plugins::PluginRegistry::new(),
219                )),
220            }),
221        };
222
223        // Register dispatcher handlers for bidirectional communication
224        client.register_dispatcher_handlers();
225
226        client
227    }
228}
229
230// ============================================================================
231// HTTP-Specific Convenience Constructors (Feature-Gated)
232// ============================================================================
233
234#[cfg(feature = "http")]
235impl Client<turbomcp_transport::streamable_http_client::StreamableHttpClientTransport> {
236    /// Connect to an HTTP MCP server (convenience method)
237    ///
238    /// This is a beautiful one-liner alternative to manual configuration.
239    /// Creates an HTTP client, connects, and initializes in one call.
240    ///
241    /// # Arguments
242    ///
243    /// * `url` - The base URL of the MCP server (e.g., "http://localhost:8080")
244    ///
245    /// # Returns
246    ///
247    /// Returns an initialized `Client` ready to use.
248    ///
249    /// # Errors
250    ///
251    /// Returns an error if:
252    /// - The URL is invalid
253    /// - Connection to the server fails
254    /// - Initialization handshake fails
255    ///
256    /// # Examples
257    ///
258    /// ```rust,no_run
259    /// use turbomcp_client::Client;
260    ///
261    /// # async fn example() -> turbomcp_protocol::Result<()> {
262    /// // Beautiful one-liner - balanced with server DX
263    /// let client = Client::connect_http("http://localhost:8080").await?;
264    ///
265    /// // Now use it directly
266    /// let tools = client.list_tools().await?;
267    /// # Ok(())
268    /// # }
269    /// ```
270    ///
271    /// Compare to verbose approach (10+ lines):
272    /// ```rust,no_run
273    /// use turbomcp_client::Client;
274    /// use turbomcp_transport::streamable_http_client::{
275    ///     StreamableHttpClientConfig, StreamableHttpClientTransport
276    /// };
277    ///
278    /// # async fn example() -> turbomcp_protocol::Result<()> {
279    /// let config = StreamableHttpClientConfig {
280    ///     base_url: "http://localhost:8080".to_string(),
281    ///     ..Default::default()
282    /// };
283    /// let transport = StreamableHttpClientTransport::new(config);
284    /// let client = Client::new(transport);
285    /// client.initialize().await?;
286    /// # Ok(())
287    /// # }
288    /// ```
289    pub async fn connect_http(url: impl Into<String>) -> Result<Self> {
290        use turbomcp_transport::streamable_http_client::{
291            StreamableHttpClientConfig, StreamableHttpClientTransport,
292        };
293
294        let config = StreamableHttpClientConfig {
295            base_url: url.into(),
296            ..Default::default()
297        };
298
299        let transport = StreamableHttpClientTransport::new(config);
300        let client = Self::new(transport);
301
302        // Initialize connection immediately
303        client.initialize().await?;
304
305        Ok(client)
306    }
307
308    /// Connect to an HTTP MCP server with custom configuration
309    ///
310    /// Provides more control than `connect_http()` while still being ergonomic.
311    ///
312    /// # Arguments
313    ///
314    /// * `url` - The base URL of the MCP server
315    /// * `config_fn` - Function to customize the configuration
316    ///
317    /// # Examples
318    ///
319    /// ```rust,no_run
320    /// use turbomcp_client::Client;
321    /// use std::time::Duration;
322    ///
323    /// # async fn example() -> turbomcp_protocol::Result<()> {
324    /// let client = Client::connect_http_with("http://localhost:8080", |config| {
325    ///     config.timeout = Duration::from_secs(60);
326    ///     config.endpoint_path = "/api/mcp".to_string();
327    /// }).await?;
328    /// # Ok(())
329    /// # }
330    /// ```
331    pub async fn connect_http_with<F>(url: impl Into<String>, config_fn: F) -> Result<Self>
332    where
333        F: FnOnce(&mut turbomcp_transport::streamable_http_client::StreamableHttpClientConfig),
334    {
335        use turbomcp_transport::streamable_http_client::{
336            StreamableHttpClientConfig, StreamableHttpClientTransport,
337        };
338
339        let mut config = StreamableHttpClientConfig {
340            base_url: url.into(),
341            ..Default::default()
342        };
343
344        config_fn(&mut config);
345
346        let transport = StreamableHttpClientTransport::new(config);
347        let client = Self::new(transport);
348
349        client.initialize().await?;
350
351        Ok(client)
352    }
353}
354
355// ============================================================================
356// TCP-Specific Convenience Constructors (Feature-Gated)
357// ============================================================================
358
359#[cfg(feature = "tcp")]
360impl Client<turbomcp_transport::tcp::TcpTransport> {
361    /// Connect to a TCP MCP server (convenience method)
362    ///
363    /// Beautiful one-liner for TCP connections - balanced DX.
364    ///
365    /// # Arguments
366    ///
367    /// * `addr` - Server address (e.g., "127.0.0.1:8765" or localhost:8765")
368    ///
369    /// # Returns
370    ///
371    /// Returns an initialized `Client` ready to use.
372    ///
373    /// # Examples
374    ///
375    /// ```rust,no_run
376    /// # #[cfg(feature = "tcp")]
377    /// use turbomcp_client::Client;
378    ///
379    /// # async fn example() -> turbomcp_protocol::Result<()> {
380    /// let client = Client::connect_tcp("127.0.0.1:8765").await?;
381    /// let tools = client.list_tools().await?;
382    /// # Ok(())
383    /// # }
384    /// ```
385    pub async fn connect_tcp(addr: impl AsRef<str>) -> Result<Self> {
386        use std::net::SocketAddr;
387        use turbomcp_transport::tcp::TcpTransport;
388
389        let server_addr: SocketAddr = addr
390            .as_ref()
391            .parse()
392            .map_err(|e| Error::bad_request(format!("Invalid address: {}", e)))?;
393
394        // Client binds to 0.0.0.0:0 (any available port)
395        let bind_addr: SocketAddr = if server_addr.is_ipv6() {
396            "[::]:0".parse().unwrap()
397        } else {
398            "0.0.0.0:0".parse().unwrap()
399        };
400
401        let transport = TcpTransport::new_client(bind_addr, server_addr);
402        let client = Self::new(transport);
403
404        client.initialize().await?;
405
406        Ok(client)
407    }
408}
409
410// ============================================================================
411// Unix Socket-Specific Convenience Constructors (Feature-Gated)
412// ============================================================================
413
414#[cfg(all(unix, feature = "unix"))]
415impl Client<turbomcp_transport::unix::UnixTransport> {
416    /// Connect to a Unix socket MCP server (convenience method)
417    ///
418    /// Beautiful one-liner for Unix socket IPC - balanced DX.
419    ///
420    /// # Arguments
421    ///
422    /// * `path` - Socket file path (e.g., "/tmp/mcp.sock")
423    ///
424    /// # Returns
425    ///
426    /// Returns an initialized `Client` ready to use.
427    ///
428    /// # Examples
429    ///
430    /// ```rust,no_run
431    /// # #[cfg(all(unix, feature = "unix"))]
432    /// use turbomcp_client::Client;
433    ///
434    /// # async fn example() -> turbomcp_protocol::Result<()> {
435    /// let client = Client::connect_unix("/tmp/mcp.sock").await?;
436    /// let tools = client.list_tools().await?;
437    /// # Ok(())
438    /// # }
439    /// ```
440    pub async fn connect_unix(path: impl Into<std::path::PathBuf>) -> Result<Self> {
441        use turbomcp_transport::unix::UnixTransport;
442
443        let transport = UnixTransport::new_client(path.into());
444        let client = Self::new(transport);
445
446        client.initialize().await?;
447
448        Ok(client)
449    }
450}
451
452impl<T: Transport + 'static> Client<T> {
453    /// Register message handlers with the dispatcher
454    ///
455    /// This method sets up the callbacks that handle server-initiated requests
456    /// and notifications. The dispatcher's background task routes incoming
457    /// messages to these handlers.
458    ///
459    /// This is called automatically during Client construction (in `new()` and
460    /// `with_capabilities()`), so you don't need to call it manually.
461    ///
462    /// ## How It Works
463    ///
464    /// The handlers are synchronous closures that spawn async tasks to do the
465    /// actual work. This allows the dispatcher to continue routing messages
466    /// without blocking on handler execution.
467    fn register_dispatcher_handlers(&self) {
468        let dispatcher = self.inner.protocol.dispatcher();
469        let client_for_requests = self.clone();
470        let client_for_notifications = self.clone();
471
472        // Request handler (elicitation, sampling, etc.)
473        let request_handler = Arc::new(move |request: JsonRpcRequest| {
474            let client = client_for_requests.clone();
475            // Spawn async task to handle the request
476            tokio::spawn(async move {
477                if let Err(e) = client.handle_request(request).await {
478                    tracing::error!("Error handling server request: {}", e);
479                }
480            });
481            Ok(())
482        });
483
484        // Notification handler
485        let notification_handler = Arc::new(move |notification: JsonRpcNotification| {
486            let client = client_for_notifications.clone();
487            // Spawn async task to handle the notification
488            tokio::spawn(async move {
489                if let Err(e) = client.handle_notification(notification).await {
490                    tracing::error!("Error handling server notification: {}", e);
491                }
492            });
493            Ok(())
494        });
495
496        // Register handlers synchronously - no race condition!
497        // The set_* methods are now synchronous with std::sync::Mutex
498        dispatcher.set_request_handler(request_handler);
499        dispatcher.set_notification_handler(notification_handler);
500        tracing::debug!("Dispatcher handlers registered successfully");
501    }
502
503    /// Handle server-initiated requests (elicitation, sampling, roots)
504    ///
505    /// This method is called by the MessageDispatcher when it receives a request
506    /// from the server. It routes the request to the appropriate handler based on
507    /// the method name.
508    async fn handle_request(&self, request: JsonRpcRequest) -> Result<()> {
509        match request.method.as_str() {
510            "sampling/createMessage" => {
511                let handler_opt = self
512                    .inner
513                    .sampling_handler
514                    .lock()
515                    .expect("sampling_handler mutex poisoned")
516                    .clone();
517                if let Some(handler) = handler_opt {
518                    let params: CreateMessageRequest =
519                        serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
520                            .map_err(|e| {
521                            Error::protocol(format!("Invalid createMessage params: {}", e))
522                        })?;
523
524                    match handler.handle_create_message(params).await {
525                        Ok(result) => {
526                            let result_value = serde_json::to_value(result).map_err(|e| {
527                                Error::protocol(format!("Failed to serialize response: {}", e))
528                            })?;
529                            let response = JsonRpcResponse::success(result_value, request.id);
530                            self.send_response(response).await?;
531                        }
532                        Err(e) => {
533                            let error = turbomcp_protocol::jsonrpc::JsonRpcError {
534                                code: -32603,
535                                message: format!("Sampling handler error: {}", e),
536                                data: None,
537                            };
538                            let response = JsonRpcResponse::error_response(error, request.id);
539                            self.send_response(response).await?;
540                        }
541                    }
542                } else {
543                    // No handler configured
544                    let error = turbomcp_protocol::jsonrpc::JsonRpcError {
545                        code: -32601,
546                        message: "Sampling not supported".to_string(),
547                        data: None,
548                    };
549                    let response = JsonRpcResponse::error_response(error, request.id);
550                    self.send_response(response).await?;
551                }
552            }
553            "roots/list" => {
554                // Handle roots/list request from server
555                // Clone the handler Arc to avoid holding mutex across await
556                let handler_opt = self
557                    .inner
558                    .handlers
559                    .lock()
560                    .expect("handlers mutex poisoned")
561                    .roots
562                    .clone();
563
564                let roots_result = if let Some(handler) = handler_opt {
565                    handler.handle_roots_request().await
566                } else {
567                    // No handler - return empty list per MCP spec
568                    Ok(Vec::new())
569                };
570
571                match roots_result {
572                    Ok(roots) => {
573                        let result_value =
574                            serde_json::to_value(turbomcp_protocol::types::ListRootsResult {
575                                roots,
576                                _meta: None,
577                            })
578                            .map_err(|e| {
579                                Error::protocol(format!(
580                                    "Failed to serialize roots response: {}",
581                                    e
582                                ))
583                            })?;
584                        let response = JsonRpcResponse::success(result_value, request.id);
585                        self.send_response(response).await?;
586                    }
587                    Err(e) => {
588                        let error = turbomcp_protocol::jsonrpc::JsonRpcError {
589                            code: -32603,
590                            message: format!("Roots handler error: {}", e),
591                            data: None,
592                        };
593                        let response = JsonRpcResponse::error_response(error, request.id);
594                        self.send_response(response).await?;
595                    }
596                }
597            }
598            "elicitation/create" => {
599                // Clone handler Arc before await to avoid holding mutex across await
600                let handler_opt = self
601                    .inner
602                    .handlers
603                    .lock()
604                    .expect("handlers mutex poisoned")
605                    .elicitation
606                    .clone();
607                if let Some(handler) = handler_opt {
608                    // Parse elicitation request params as MCP protocol type
609                    let proto_request: turbomcp_protocol::types::ElicitRequest =
610                        serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
611                            .map_err(|e| {
612                            Error::protocol(format!("Invalid elicitation params: {}", e))
613                        })?;
614
615                    // Wrap protocol request with ID for handler (preserves type safety!)
616                    let handler_request =
617                        crate::handlers::ElicitationRequest::new(request.id.clone(), proto_request);
618
619                    // Call the registered elicitation handler
620                    match handler.handle_elicitation(handler_request).await {
621                        Ok(elicit_response) => {
622                            // Convert handler response back to protocol type
623                            let proto_result = elicit_response.into_protocol();
624                            let result_value = serde_json::to_value(proto_result).map_err(|e| {
625                                Error::protocol(format!(
626                                    "Failed to serialize elicitation response: {}",
627                                    e
628                                ))
629                            })?;
630                            let response = JsonRpcResponse::success(result_value, request.id);
631                            self.send_response(response).await?;
632                        }
633                        Err(e) => {
634                            // Convert handler error to JSON-RPC error using centralized mapping
635                            let response =
636                                JsonRpcResponse::error_response(e.into_jsonrpc_error(), request.id);
637                            self.send_response(response).await?;
638                        }
639                    }
640                } else {
641                    // No handler configured - elicitation not supported
642                    let error = turbomcp_protocol::jsonrpc::JsonRpcError {
643                        code: -32601,
644                        message: "Elicitation not supported - no handler registered".to_string(),
645                        data: None,
646                    };
647                    let response = JsonRpcResponse::error_response(error, request.id);
648                    self.send_response(response).await?;
649                }
650            }
651            _ => {
652                // Unknown method
653                let error = turbomcp_protocol::jsonrpc::JsonRpcError {
654                    code: -32601,
655                    message: format!("Method not found: {}", request.method),
656                    data: None,
657                };
658                let response = JsonRpcResponse::error_response(error, request.id);
659                self.send_response(response).await?;
660            }
661        }
662        Ok(())
663    }
664
665    /// Handle server-initiated notifications
666    ///
667    /// Routes notifications to appropriate handlers based on method name.
668    /// MCP defines several notification types that servers can send to clients:
669    ///
670    /// - `notifications/progress` - Progress updates for long-running operations
671    /// - `notifications/message` - Log messages from server
672    /// - `notifications/resources/updated` - Resource content changed
673    /// - `notifications/resources/list_changed` - Resource list changed
674    async fn handle_notification(&self, notification: JsonRpcNotification) -> Result<()> {
675        match notification.method.as_str() {
676            "notifications/progress" => {
677                // Route to progress handler
678                let handler_opt = self
679                    .inner
680                    .handlers
681                    .lock()
682                    .expect("handlers mutex poisoned")
683                    .get_progress_handler();
684
685                if let Some(handler) = handler_opt {
686                    // Parse progress notification
687                    let progress: crate::handlers::ProgressNotification = serde_json::from_value(
688                        notification.params.unwrap_or(serde_json::Value::Null),
689                    )
690                    .map_err(|e| {
691                        Error::protocol(format!("Invalid progress notification: {}", e))
692                    })?;
693
694                    // Call handler (errors are logged but don't fail the flow)
695                    if let Err(e) = handler.handle_progress(progress).await {
696                        tracing::error!("Progress handler error: {}", e);
697                    }
698                } else {
699                    tracing::debug!("Received progress notification but no handler registered");
700                }
701            }
702
703            "notifications/message" => {
704                // Route to log handler
705                let handler_opt = self
706                    .inner
707                    .handlers
708                    .lock()
709                    .expect("handlers mutex poisoned")
710                    .get_log_handler();
711
712                if let Some(handler) = handler_opt {
713                    // Parse log message
714                    let log: crate::handlers::LoggingNotification = serde_json::from_value(
715                        notification.params.unwrap_or(serde_json::Value::Null),
716                    )
717                    .map_err(|e| Error::protocol(format!("Invalid log notification: {}", e)))?;
718
719                    // Call handler
720                    if let Err(e) = handler.handle_log(log).await {
721                        tracing::error!("Log handler error: {}", e);
722                    }
723                } else {
724                    tracing::debug!("Received log notification but no handler registered");
725                }
726            }
727
728            "notifications/resources/updated" => {
729                // Route to resource update handler
730                let handler_opt = self
731                    .inner
732                    .handlers
733                    .lock()
734                    .expect("handlers mutex poisoned")
735                    .get_resource_update_handler();
736
737                if let Some(handler) = handler_opt {
738                    // Parse resource update notification
739                    let update: crate::handlers::ResourceUpdatedNotification =
740                        serde_json::from_value(
741                            notification.params.unwrap_or(serde_json::Value::Null),
742                        )
743                        .map_err(|e| {
744                            Error::protocol(format!("Invalid resource update notification: {}", e))
745                        })?;
746
747                    // Call handler
748                    if let Err(e) = handler.handle_resource_update(update).await {
749                        tracing::error!("Resource update handler error: {}", e);
750                    }
751                } else {
752                    tracing::debug!(
753                        "Received resource update notification but no handler registered"
754                    );
755                }
756            }
757
758            "notifications/resources/list_changed" => {
759                // Route to resource list changed handler
760                let handler_opt = self
761                    .inner
762                    .handlers
763                    .lock()
764                    .expect("handlers mutex poisoned")
765                    .get_resource_list_changed_handler();
766
767                if let Some(handler) = handler_opt {
768                    if let Err(e) = handler.handle_resource_list_changed().await {
769                        tracing::error!("Resource list changed handler error: {}", e);
770                    }
771                } else {
772                    tracing::debug!(
773                        "Resource list changed notification received (no handler registered)"
774                    );
775                }
776            }
777
778            "notifications/prompts/list_changed" => {
779                // Route to prompt list changed handler
780                let handler_opt = self
781                    .inner
782                    .handlers
783                    .lock()
784                    .expect("handlers mutex poisoned")
785                    .get_prompt_list_changed_handler();
786
787                if let Some(handler) = handler_opt {
788                    if let Err(e) = handler.handle_prompt_list_changed().await {
789                        tracing::error!("Prompt list changed handler error: {}", e);
790                    }
791                } else {
792                    tracing::debug!(
793                        "Prompt list changed notification received (no handler registered)"
794                    );
795                }
796            }
797
798            "notifications/tools/list_changed" => {
799                // Route to tool list changed handler
800                let handler_opt = self
801                    .inner
802                    .handlers
803                    .lock()
804                    .expect("handlers mutex poisoned")
805                    .get_tool_list_changed_handler();
806
807                if let Some(handler) = handler_opt {
808                    if let Err(e) = handler.handle_tool_list_changed().await {
809                        tracing::error!("Tool list changed handler error: {}", e);
810                    }
811                } else {
812                    tracing::debug!(
813                        "Tool list changed notification received (no handler registered)"
814                    );
815                }
816            }
817
818            "notifications/cancelled" => {
819                // Route to cancellation handler
820                let handler_opt = self
821                    .inner
822                    .handlers
823                    .lock()
824                    .expect("handlers mutex poisoned")
825                    .get_cancellation_handler();
826
827                if let Some(handler) = handler_opt {
828                    // Parse cancellation notification
829                    let cancellation: crate::handlers::CancelledNotification =
830                        serde_json::from_value(
831                            notification.params.unwrap_or(serde_json::Value::Null),
832                        )
833                        .map_err(|e| {
834                            Error::protocol(format!("Invalid cancellation notification: {}", e))
835                        })?;
836
837                    // Call handler
838                    if let Err(e) = handler.handle_cancellation(cancellation).await {
839                        tracing::error!("Cancellation handler error: {}", e);
840                    }
841                } else {
842                    tracing::debug!("Cancellation notification received (no handler registered)");
843                }
844            }
845
846            _ => {
847                // Unknown notification type
848                tracing::debug!("Received unknown notification: {}", notification.method);
849            }
850        }
851
852        Ok(())
853    }
854
855    async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
856        let payload = serde_json::to_vec(&response)
857            .map_err(|e| Error::protocol(format!("Failed to serialize response: {}", e)))?;
858
859        let message = TransportMessage::new(
860            turbomcp_protocol::MessageId::from("response".to_string()),
861            payload.into(),
862        );
863
864        self.inner
865            .protocol
866            .transport()
867            .send(message)
868            .await
869            .map_err(|e| Error::transport(format!("Failed to send response: {}", e)))?;
870
871        Ok(())
872    }
873
874    /// Initialize the connection with the MCP server
875    ///
876    /// Performs the initialization handshake with the server, negotiating capabilities
877    /// and establishing the protocol version. This method must be called before
878    /// any other operations can be performed.
879    ///
880    /// # Returns
881    ///
882    /// Returns an `InitializeResult` containing server information and negotiated capabilities.
883    ///
884    /// # Errors
885    ///
886    /// Returns an error if:
887    /// - The transport connection fails
888    /// - The server rejects the initialization request
889    /// - Protocol negotiation fails
890    ///
891    /// # Examples
892    ///
893    /// ```rust,no_run
894    /// # use turbomcp_client::Client;
895    /// # use turbomcp_transport::stdio::StdioTransport;
896    /// # async fn example() -> turbomcp_protocol::Result<()> {
897    /// let mut client = Client::new(StdioTransport::new());
898    ///
899    /// let result = client.initialize().await?;
900    /// println!("Server: {} v{}", result.server_info.name, result.server_info.version);
901    /// # Ok(())
902    /// # }
903    /// ```
904    pub async fn initialize(&self) -> Result<InitializeResult> {
905        // Auto-connect transport if not already connected
906        // This provides consistent DX across all transports (Stdio, TCP, HTTP, WebSocket, Unix)
907        let transport = self.inner.protocol.transport();
908        let transport_state = transport.state().await;
909        if !matches!(
910            transport_state,
911            turbomcp_transport::TransportState::Connected
912        ) {
913            tracing::debug!(
914                "Auto-connecting transport (current state: {:?})",
915                transport_state
916            );
917            transport
918                .connect()
919                .await
920                .map_err(|e| Error::transport(format!("Failed to connect transport: {}", e)))?;
921            tracing::info!("Transport connected successfully");
922        }
923
924        // Build client capabilities based on registered handlers (automatic detection)
925        let mut client_caps = ProtocolClientCapabilities::default();
926
927        // Detect sampling capability from handler
928        if let Some(sampling_caps) = self.get_sampling_capabilities() {
929            client_caps.sampling = Some(sampling_caps);
930        }
931
932        // Detect elicitation capability from handler
933        if let Some(elicitation_caps) = self.get_elicitation_capabilities() {
934            client_caps.elicitation = Some(elicitation_caps);
935        }
936
937        // Detect roots capability from handler
938        if let Some(roots_caps) = self.get_roots_capabilities() {
939            client_caps.roots = Some(roots_caps);
940        }
941
942        // Send MCP initialization request
943        let request = InitializeRequest {
944            protocol_version: PROTOCOL_VERSION.to_string(),
945            capabilities: client_caps,
946            client_info: turbomcp_protocol::types::Implementation {
947                name: "turbomcp-client".to_string(),
948                version: env!("CARGO_PKG_VERSION").to_string(),
949                title: Some("TurboMCP Client".to_string()),
950            },
951            _meta: None,
952        };
953
954        let protocol_response: ProtocolInitializeResult = self
955            .inner
956            .protocol
957            .request("initialize", Some(serde_json::to_value(request)?))
958            .await?;
959
960        // AtomicBool: lock-free store with Ordering::Relaxed
961        self.inner.initialized.store(true, Ordering::Relaxed);
962
963        // Send initialized notification
964        self.inner
965            .protocol
966            .notify("notifications/initialized", None)
967            .await?;
968
969        // Convert protocol response to client response type
970        Ok(InitializeResult {
971            server_info: protocol_response.server_info,
972            server_capabilities: protocol_response.capabilities,
973        })
974    }
975
976    /// Execute a protocol method with plugin middleware
977    ///
978    /// This is a generic helper for wrapping protocol calls with plugin middleware.
979    pub(crate) async fn execute_with_plugins<R>(
980        &self,
981        method_name: &str,
982        params: Option<serde_json::Value>,
983    ) -> Result<R>
984    where
985        R: serde::de::DeserializeOwned + serde::Serialize + Clone,
986    {
987        // Create JSON-RPC request for plugin context
988        let json_rpc_request = turbomcp_protocol::jsonrpc::JsonRpcRequest {
989            jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
990            id: turbomcp_protocol::MessageId::Number(1),
991            method: method_name.to_string(),
992            params: params.clone(),
993        };
994
995        // 1. Create request context for plugins
996        let mut req_ctx =
997            crate::plugins::RequestContext::new(json_rpc_request, std::collections::HashMap::new());
998
999        // 2. Execute before_request plugin middleware
1000        if let Err(e) = self
1001            .inner
1002            .plugin_registry
1003            .lock()
1004            .await
1005            .execute_before_request(&mut req_ctx)
1006            .await
1007        {
1008            return Err(Error::bad_request(format!(
1009                "Plugin before_request failed: {}",
1010                e
1011            )));
1012        }
1013
1014        // 3. Execute the actual protocol call
1015        let start_time = std::time::Instant::now();
1016        let protocol_result: Result<R> = self
1017            .inner
1018            .protocol
1019            .request(method_name, req_ctx.params().cloned())
1020            .await;
1021        let duration = start_time.elapsed();
1022
1023        // 4. Prepare response context
1024        let mut resp_ctx = match protocol_result {
1025            Ok(ref response) => {
1026                let response_value = serde_json::to_value(response.clone())?;
1027                crate::plugins::ResponseContext::new(req_ctx, Some(response_value), None, duration)
1028            }
1029            Err(ref e) => {
1030                crate::plugins::ResponseContext::new(req_ctx, None, Some(*e.clone()), duration)
1031            }
1032        };
1033
1034        // 5. Execute after_response plugin middleware
1035        if let Err(e) = self
1036            .inner
1037            .plugin_registry
1038            .lock()
1039            .await
1040            .execute_after_response(&mut resp_ctx)
1041            .await
1042        {
1043            return Err(Error::bad_request(format!(
1044                "Plugin after_response failed: {}",
1045                e
1046            )));
1047        }
1048
1049        // 6. Return the final result, checking for plugin modifications
1050        match protocol_result {
1051            Ok(ref response) => {
1052                // Check if plugins modified the response
1053                if let Some(modified_response) = resp_ctx.response {
1054                    // Try to deserialize the modified response
1055                    if let Ok(modified_result) =
1056                        serde_json::from_value::<R>(modified_response.clone())
1057                    {
1058                        return Ok(modified_result);
1059                    }
1060                }
1061
1062                // No plugin modifications, use original response
1063                Ok(response.clone())
1064            }
1065            Err(e) => {
1066                // Check if plugins provided an error recovery response
1067                if let Some(recovery_response) = resp_ctx.response {
1068                    if let Ok(recovery_result) = serde_json::from_value::<R>(recovery_response) {
1069                        Ok(recovery_result)
1070                    } else {
1071                        Err(e)
1072                    }
1073                } else {
1074                    Err(e)
1075                }
1076            }
1077        }
1078    }
1079
1080    /// Subscribe to resource change notifications
1081    ///
1082    /// Registers interest in receiving notifications when the specified
1083    /// resource changes. The server will send notifications when the
1084    /// resource is modified, created, or deleted.
1085    ///
1086    /// # Arguments
1087    ///
1088    /// * `uri` - The URI of the resource to monitor
1089    ///
1090    /// # Returns
1091    ///
1092    /// Returns `EmptyResult` on successful subscription.
1093    ///
1094    /// # Errors
1095    ///
1096    /// Returns an error if:
1097    /// - The client is not initialized
1098    /// - The URI is invalid or empty
1099    /// - The server doesn't support subscriptions
1100    /// - The request fails
1101    ///
1102    /// # Examples
1103    ///
1104    /// ```rust,no_run
1105    /// # use turbomcp_client::Client;
1106    /// # use turbomcp_transport::stdio::StdioTransport;
1107    /// # async fn example() -> turbomcp_protocol::Result<()> {
1108    /// let mut client = Client::new(StdioTransport::new());
1109    /// client.initialize().await?;
1110    ///
1111    /// // Subscribe to file changes
1112    /// client.subscribe("file:///watch/directory").await?;
1113    /// println!("Subscribed to resource changes");
1114    /// # Ok(())
1115    /// # }
1116    /// ```
1117    pub async fn subscribe(&self, uri: &str) -> Result<EmptyResult> {
1118        if !self.inner.initialized.load(Ordering::Relaxed) {
1119            return Err(Error::bad_request("Client not initialized"));
1120        }
1121
1122        if uri.is_empty() {
1123            return Err(Error::bad_request("Subscription URI cannot be empty"));
1124        }
1125
1126        // Send resources/subscribe request with plugin middleware
1127        let request = SubscribeRequest {
1128            uri: uri.to_string(),
1129        };
1130
1131        self.execute_with_plugins(
1132            "resources/subscribe",
1133            Some(serde_json::to_value(request).map_err(|e| {
1134                Error::protocol(format!("Failed to serialize subscribe request: {}", e))
1135            })?),
1136        )
1137        .await
1138    }
1139
1140    /// Unsubscribe from resource change notifications
1141    ///
1142    /// Cancels a previous subscription to resource changes. After unsubscribing,
1143    /// the client will no longer receive notifications for the specified resource.
1144    ///
1145    /// # Arguments
1146    ///
1147    /// * `uri` - The URI of the resource to stop monitoring
1148    ///
1149    /// # Returns
1150    ///
1151    /// Returns `EmptyResult` on successful unsubscription.
1152    ///
1153    /// # Errors
1154    ///
1155    /// Returns an error if:
1156    /// - The client is not initialized
1157    /// - The URI is invalid or empty
1158    /// - No active subscription exists for the URI
1159    /// - The request fails
1160    ///
1161    /// # Examples
1162    ///
1163    /// ```rust,no_run
1164    /// # use turbomcp_client::Client;
1165    /// # use turbomcp_transport::stdio::StdioTransport;
1166    /// # async fn example() -> turbomcp_protocol::Result<()> {
1167    /// let mut client = Client::new(StdioTransport::new());
1168    /// client.initialize().await?;
1169    ///
1170    /// // Unsubscribe from file changes
1171    /// client.unsubscribe("file:///watch/directory").await?;
1172    /// println!("Unsubscribed from resource changes");
1173    /// # Ok(())
1174    /// # }
1175    /// ```
1176    pub async fn unsubscribe(&self, uri: &str) -> Result<EmptyResult> {
1177        if !self.inner.initialized.load(Ordering::Relaxed) {
1178            return Err(Error::bad_request("Client not initialized"));
1179        }
1180
1181        if uri.is_empty() {
1182            return Err(Error::bad_request("Unsubscription URI cannot be empty"));
1183        }
1184
1185        // Send resources/unsubscribe request with plugin middleware
1186        let request = UnsubscribeRequest {
1187            uri: uri.to_string(),
1188        };
1189
1190        self.execute_with_plugins(
1191            "resources/unsubscribe",
1192            Some(serde_json::to_value(request).map_err(|e| {
1193                Error::protocol(format!("Failed to serialize unsubscribe request: {}", e))
1194            })?),
1195        )
1196        .await
1197    }
1198
1199    /// Get the client's capabilities configuration
1200    pub fn capabilities(&self) -> &ClientCapabilities {
1201        &self.inner.capabilities
1202    }
1203
1204    /// Initialize all registered plugins
1205    ///
1206    /// This should be called after registration but before using the client.
1207    pub async fn initialize_plugins(&self) -> Result<()> {
1208        // Set up client context for plugins with actual client capabilities
1209        let mut capabilities = std::collections::HashMap::new();
1210        capabilities.insert(
1211            "protocol_version".to_string(),
1212            serde_json::json!("2024-11-05"),
1213        );
1214        capabilities.insert(
1215            "mcp_version".to_string(),
1216            serde_json::json!(env!("CARGO_PKG_VERSION")),
1217        );
1218        capabilities.insert(
1219            "supports_notifications".to_string(),
1220            serde_json::json!(true),
1221        );
1222        capabilities.insert(
1223            "supports_sampling".to_string(),
1224            serde_json::json!(self.has_sampling_handler()),
1225        );
1226        capabilities.insert("supports_progress".to_string(), serde_json::json!(true));
1227        capabilities.insert("supports_roots".to_string(), serde_json::json!(true));
1228
1229        // Extract client configuration
1230        let mut config = std::collections::HashMap::new();
1231        config.insert(
1232            "client_name".to_string(),
1233            serde_json::json!("turbomcp-client"),
1234        );
1235        config.insert(
1236            "initialized".to_string(),
1237            serde_json::json!(self.inner.initialized.load(Ordering::Relaxed)),
1238        );
1239        config.insert(
1240            "plugin_count".to_string(),
1241            serde_json::json!(self.inner.plugin_registry.lock().await.plugin_count()),
1242        );
1243
1244        let context = crate::plugins::PluginContext::new(
1245            "turbomcp-client".to_string(),
1246            env!("CARGO_PKG_VERSION").to_string(),
1247            capabilities,
1248            config,
1249            vec![], // Will be populated by the registry
1250        );
1251
1252        self.inner
1253            .plugin_registry
1254            .lock()
1255            .await
1256            .set_client_context(context);
1257
1258        // Note: Individual plugins are initialized automatically during registration
1259        // via PluginRegistry::register_plugin(). This method ensures the registry
1260        // has proper client context for any future plugin registrations.
1261        Ok(())
1262    }
1263
1264    /// Cleanup all registered plugins
1265    ///
1266    /// This should be called when the client is being shut down.
1267    pub async fn cleanup_plugins(&self) -> Result<()> {
1268        // Clear the plugin registry - plugins will be dropped and cleaned up automatically
1269        // The Rust ownership system ensures proper cleanup when the Arc<dyn ClientPlugin>
1270        // references are dropped.
1271
1272        // Note: The plugin system uses RAII (Resource Acquisition Is Initialization)
1273        // pattern where plugins clean up their resources in their Drop implementation.
1274        // Replace the registry with a fresh one (mutex ensures safe access)
1275        *self.inner.plugin_registry.lock().await = crate::plugins::PluginRegistry::new();
1276        Ok(())
1277    }
1278
1279    // Note: Capability detection methods (has_*_handler, get_*_capabilities)
1280    // are defined in their respective operation modules:
1281    // - sampling.rs: has_sampling_handler, get_sampling_capabilities
1282    // - handlers.rs: has_elicitation_handler, has_roots_handler
1283    //
1284    // Additional capability getters for elicitation and roots added below
1285    // since they're used during initialization
1286
1287    /// Get elicitation capabilities if handler is registered
1288    /// Automatically detects capability based on registered handler
1289    fn get_elicitation_capabilities(
1290        &self,
1291    ) -> Option<turbomcp_protocol::types::ElicitationCapabilities> {
1292        if self.has_elicitation_handler() {
1293            // Currently returns default capabilities. In the future, schema_validation support
1294            // could be detected from handler traits by adding a HasSchemaValidation marker trait
1295            // that handlers could implement. For now, handlers validate schemas themselves.
1296            Some(turbomcp_protocol::types::ElicitationCapabilities::default())
1297        } else {
1298            None
1299        }
1300    }
1301
1302    /// Get roots capabilities if handler is registered
1303    fn get_roots_capabilities(&self) -> Option<turbomcp_protocol::types::RootsCapabilities> {
1304        if self.has_roots_handler() {
1305            // Roots capabilities indicate whether list can change
1306            Some(turbomcp_protocol::types::RootsCapabilities {
1307                list_changed: Some(true), // Support dynamic roots by default
1308            })
1309        } else {
1310            None
1311        }
1312    }
1313}