turbomcp_client/client/
core.rs

1//! Core Client implementation for MCP communication
2//!
3//! This module contains the main `Client<T>` struct and its implementation,
4//! providing the core MCP client functionality including:
5//!
6//! - Connection initialization and lifecycle management
7//! - Message processing and bidirectional communication
8//! - MCP operation support (tools, prompts, resources, sampling, etc.)
9//! - Plugin middleware integration
10//! - Handler registration and management
11//!
12//! # Architecture
13//!
14//! `Client<T>` is implemented as a cheaply-cloneable Arc wrapper with interior
15//! mutability (same pattern as reqwest and AWS SDK):
16//!
17//! - **AtomicBool** for initialized flag (lock-free)
18//! - **Arc<Mutex<...>>** for handlers/plugins (infrequent mutation)
19//! - **`Arc<ClientInner<T>>`** for cheap cloning
20
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::{Arc, Mutex as StdMutex};
23
24use turbomcp_protocol::jsonrpc::*;
25use turbomcp_protocol::types::{
26    ClientCapabilities as ProtocolClientCapabilities, InitializeResult as ProtocolInitializeResult,
27    *,
28};
29use turbomcp_protocol::{Error, PROTOCOL_VERSION, Result};
30use turbomcp_transport::{Transport, TransportMessage};
31
32use super::config::InitializeResult;
33use super::protocol::ProtocolClient;
34use crate::{ClientCapabilities, handlers::HandlerRegistry, sampling::SamplingHandler};
35
36/// Inner client state with interior mutability
37///
38/// This structure contains the actual client state and is wrapped in Arc<...>
39/// to enable cheap cloning (same pattern as reqwest and AWS SDK).
40pub(super) struct ClientInner<T: Transport> {
41    /// Protocol client for low-level communication
42    pub(super) protocol: ProtocolClient<T>,
43
44    /// Client capabilities (immutable after construction)
45    pub(super) capabilities: ClientCapabilities,
46
47    /// Initialization state (lock-free atomic boolean)
48    pub(super) initialized: AtomicBool,
49
50    /// Optional sampling handler (mutex for dynamic updates)
51    pub(super) sampling_handler: Arc<StdMutex<Option<Arc<dyn SamplingHandler>>>>,
52
53    /// Handler registry for bidirectional communication (mutex for registration)
54    pub(super) handlers: Arc<StdMutex<HandlerRegistry>>,
55
56    /// Plugin registry for middleware (tokio mutex - holds across await)
57    pub(super) plugin_registry: Arc<tokio::sync::Mutex<crate::plugins::PluginRegistry>>,
58}
59
60/// The core MCP client implementation
61///
62/// Client provides a comprehensive interface for communicating with MCP servers,
63/// supporting all protocol features including tools, prompts, resources, sampling,
64/// elicitation, and bidirectional communication patterns.
65///
66/// # Clone Pattern
67///
68/// `Client<T>` is cheaply cloneable via Arc (same pattern as reqwest and AWS SDK).
69/// All clones share the same underlying connection and state:
70///
71/// ```rust,no_run
72/// use turbomcp_client::Client;
73/// use turbomcp_transport::stdio::StdioTransport;
74///
75/// # async fn example() -> turbomcp_protocol::Result<()> {
76/// let client = Client::new(StdioTransport::new());
77/// client.initialize().await?;
78///
79/// // Cheap clone - shares same connection
80/// let client2 = client.clone();
81/// tokio::spawn(async move {
82///     client2.list_tools().await.ok();
83/// });
84/// # Ok(())
85/// # }
86/// ```
87///
88/// The client must be initialized before use by calling `initialize()` to perform
89/// the MCP handshake and capability negotiation.
90///
91/// # Features
92///
93/// - **Protocol Compliance**: Full MCP 2025-06-18 specification support
94/// - **Bidirectional Communication**: Server-initiated requests and client responses
95/// - **Plugin Middleware**: Extensible request/response processing
96/// - **Handler Registry**: Callbacks for server-initiated operations
97/// - **Connection Management**: Robust error handling and recovery
98/// - **Type Safety**: Compile-time guarantees for MCP message types
99/// - **Cheap Cloning**: Arc-based sharing like reqwest/AWS SDK
100///
101/// # Examples
102///
103/// ```rust,no_run
104/// use turbomcp_client::Client;
105/// use turbomcp_transport::stdio::StdioTransport;
106/// use std::collections::HashMap;
107///
108/// # async fn example() -> turbomcp_protocol::Result<()> {
109/// // Create and initialize client (no mut needed!)
110/// let client = Client::new(StdioTransport::new());
111/// let init_result = client.initialize().await?;
112/// println!("Connected to: {}", init_result.server_info.name);
113///
114/// // Use MCP operations
115/// let tools = client.list_tools().await?;
116/// let mut args = HashMap::new();
117/// args.insert("input".to_string(), serde_json::json!("test"));
118/// let result = client.call_tool("my_tool", Some(args)).await?;
119/// # Ok(())
120/// # }
121/// ```
122pub struct Client<T: Transport> {
123    pub(super) inner: Arc<ClientInner<T>>,
124}
125
126/// Clone implementation via Arc (same pattern as reqwest/AWS SDK)
127///
128/// Cloning a Client is cheap (just an Arc clone) and all clones share
129/// the same underlying connection and state.
130impl<T: Transport> Clone for Client<T> {
131    fn clone(&self) -> Self {
132        Self {
133            inner: Arc::clone(&self.inner),
134        }
135    }
136}
137
138impl<T: Transport> Client<T> {
139    /// Create a new client with the specified transport
140    ///
141    /// Creates a new MCP client instance with default capabilities.
142    /// The client must be initialized before use by calling `initialize()`.
143    ///
144    /// # Arguments
145    ///
146    /// * `transport` - The transport implementation to use for communication
147    ///
148    /// # Examples
149    ///
150    /// ```rust,no_run
151    /// use turbomcp_client::Client;
152    /// use turbomcp_transport::stdio::StdioTransport;
153    ///
154    /// let transport = StdioTransport::new();
155    /// let client = Client::new(transport);
156    /// ```
157    pub fn new(transport: T) -> Self {
158        Self {
159            inner: Arc::new(ClientInner {
160                protocol: ProtocolClient::new(transport),
161                capabilities: ClientCapabilities::default(),
162                initialized: AtomicBool::new(false),
163                sampling_handler: Arc::new(StdMutex::new(None)),
164                handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
165                plugin_registry: Arc::new(tokio::sync::Mutex::new(
166                    crate::plugins::PluginRegistry::new(),
167                )),
168            }),
169        }
170    }
171
172    /// Create a new client with custom capabilities
173    ///
174    /// # Arguments
175    ///
176    /// * `transport` - The transport implementation to use
177    /// * `capabilities` - The client capabilities to negotiate
178    ///
179    /// # Examples
180    ///
181    /// ```rust,no_run
182    /// use turbomcp_client::{Client, ClientCapabilities};
183    /// use turbomcp_transport::stdio::StdioTransport;
184    ///
185    /// let capabilities = ClientCapabilities {
186    ///     tools: true,
187    ///     prompts: true,
188    ///     resources: false,
189    ///     sampling: false,
190    /// };
191    ///
192    /// let transport = StdioTransport::new();
193    /// let client = Client::with_capabilities(transport, capabilities);
194    /// ```
195    pub fn with_capabilities(transport: T, capabilities: ClientCapabilities) -> Self {
196        Self {
197            inner: Arc::new(ClientInner {
198                protocol: ProtocolClient::new(transport),
199                capabilities,
200                initialized: AtomicBool::new(false),
201                sampling_handler: Arc::new(StdMutex::new(None)),
202                handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
203                plugin_registry: Arc::new(tokio::sync::Mutex::new(
204                    crate::plugins::PluginRegistry::new(),
205                )),
206            }),
207        }
208    }
209}
210
211// ============================================================================
212// HTTP-Specific Convenience Constructors (Feature-Gated)
213// ============================================================================
214
215#[cfg(feature = "http")]
216impl Client<turbomcp_transport::streamable_http_client::StreamableHttpClientTransport> {
217    /// Connect to an HTTP MCP server (convenience method)
218    ///
219    /// This is a beautiful one-liner alternative to manual configuration.
220    /// Creates an HTTP client, connects, and initializes in one call.
221    ///
222    /// # Arguments
223    ///
224    /// * `url` - The base URL of the MCP server (e.g., "http://localhost:8080")
225    ///
226    /// # Returns
227    ///
228    /// Returns an initialized `Client` ready to use.
229    ///
230    /// # Errors
231    ///
232    /// Returns an error if:
233    /// - The URL is invalid
234    /// - Connection to the server fails
235    /// - Initialization handshake fails
236    ///
237    /// # Examples
238    ///
239    /// ```rust,no_run
240    /// use turbomcp_client::Client;
241    ///
242    /// # async fn example() -> turbomcp_protocol::Result<()> {
243    /// // Beautiful one-liner - balanced with server DX
244    /// let client = Client::connect_http("http://localhost:8080").await?;
245    ///
246    /// // Now use it directly
247    /// let tools = client.list_tools().await?;
248    /// # Ok(())
249    /// # }
250    /// ```
251    ///
252    /// Compare to verbose approach (10+ lines):
253    /// ```rust,no_run
254    /// use turbomcp_client::Client;
255    /// use turbomcp_transport::streamable_http_client::{
256    ///     StreamableHttpClientConfig, StreamableHttpClientTransport
257    /// };
258    ///
259    /// # async fn example() -> turbomcp_protocol::Result<()> {
260    /// let config = StreamableHttpClientConfig {
261    ///     base_url: "http://localhost:8080".to_string(),
262    ///     ..Default::default()
263    /// };
264    /// let transport = StreamableHttpClientTransport::new(config);
265    /// let client = Client::new(transport);
266    /// client.initialize().await?;
267    /// # Ok(())
268    /// # }
269    /// ```
270    pub async fn connect_http(url: impl Into<String>) -> Result<Self> {
271        use turbomcp_transport::streamable_http_client::{
272            StreamableHttpClientConfig, StreamableHttpClientTransport,
273        };
274
275        let config = StreamableHttpClientConfig {
276            base_url: url.into(),
277            ..Default::default()
278        };
279
280        let transport = StreamableHttpClientTransport::new(config);
281        let client = Self::new(transport);
282
283        // Initialize connection immediately
284        client.initialize().await?;
285
286        Ok(client)
287    }
288
289    /// Connect to an HTTP MCP server with custom configuration
290    ///
291    /// Provides more control than `connect_http()` while still being ergonomic.
292    ///
293    /// # Arguments
294    ///
295    /// * `url` - The base URL of the MCP server
296    /// * `config_fn` - Function to customize the configuration
297    ///
298    /// # Examples
299    ///
300    /// ```rust,no_run
301    /// use turbomcp_client::Client;
302    /// use std::time::Duration;
303    ///
304    /// # async fn example() -> turbomcp_protocol::Result<()> {
305    /// let client = Client::connect_http_with("http://localhost:8080", |config| {
306    ///     config.timeout = Duration::from_secs(60);
307    ///     config.endpoint_path = "/api/mcp".to_string();
308    /// }).await?;
309    /// # Ok(())
310    /// # }
311    /// ```
312    pub async fn connect_http_with<F>(url: impl Into<String>, config_fn: F) -> Result<Self>
313    where
314        F: FnOnce(&mut turbomcp_transport::streamable_http_client::StreamableHttpClientConfig),
315    {
316        use turbomcp_transport::streamable_http_client::{
317            StreamableHttpClientConfig, StreamableHttpClientTransport,
318        };
319
320        let mut config = StreamableHttpClientConfig {
321            base_url: url.into(),
322            ..Default::default()
323        };
324
325        config_fn(&mut config);
326
327        let transport = StreamableHttpClientTransport::new(config);
328        let client = Self::new(transport);
329
330        client.initialize().await?;
331
332        Ok(client)
333    }
334}
335
336// ============================================================================
337// TCP-Specific Convenience Constructors (Feature-Gated)
338// ============================================================================
339
340#[cfg(feature = "tcp")]
341impl Client<turbomcp_transport::tcp::TcpTransport> {
342    /// Connect to a TCP MCP server (convenience method)
343    ///
344    /// Beautiful one-liner for TCP connections - balanced DX.
345    ///
346    /// # Arguments
347    ///
348    /// * `addr` - Server address (e.g., "127.0.0.1:8765" or localhost:8765")
349    ///
350    /// # Returns
351    ///
352    /// Returns an initialized `Client` ready to use.
353    ///
354    /// # Examples
355    ///
356    /// ```rust,no_run
357    /// # #[cfg(feature = "tcp")]
358    /// use turbomcp_client::Client;
359    ///
360    /// # async fn example() -> turbomcp_protocol::Result<()> {
361    /// let client = Client::connect_tcp("127.0.0.1:8765").await?;
362    /// let tools = client.list_tools().await?;
363    /// # Ok(())
364    /// # }
365    /// ```
366    pub async fn connect_tcp(addr: impl AsRef<str>) -> Result<Self> {
367        use std::net::SocketAddr;
368        use turbomcp_transport::tcp::TcpTransport;
369
370        let server_addr: SocketAddr = addr
371            .as_ref()
372            .parse()
373            .map_err(|e| Error::bad_request(format!("Invalid address: {}", e)))?;
374
375        // Client binds to 0.0.0.0:0 (any available port)
376        let bind_addr: SocketAddr = if server_addr.is_ipv6() {
377            "[::]:0".parse().unwrap()
378        } else {
379            "0.0.0.0:0".parse().unwrap()
380        };
381
382        let transport = TcpTransport::new_client(bind_addr, server_addr);
383        let client = Self::new(transport);
384
385        client.initialize().await?;
386
387        Ok(client)
388    }
389}
390
391// ============================================================================
392// Unix Socket-Specific Convenience Constructors (Feature-Gated)
393// ============================================================================
394
395#[cfg(all(unix, feature = "unix"))]
396impl Client<turbomcp_transport::unix::UnixTransport> {
397    /// Connect to a Unix socket MCP server (convenience method)
398    ///
399    /// Beautiful one-liner for Unix socket IPC - balanced DX.
400    ///
401    /// # Arguments
402    ///
403    /// * `path` - Socket file path (e.g., "/tmp/mcp.sock")
404    ///
405    /// # Returns
406    ///
407    /// Returns an initialized `Client` ready to use.
408    ///
409    /// # Examples
410    ///
411    /// ```rust,no_run
412    /// # #[cfg(all(unix, feature = "unix"))]
413    /// use turbomcp_client::Client;
414    ///
415    /// # async fn example() -> turbomcp_protocol::Result<()> {
416    /// let client = Client::connect_unix("/tmp/mcp.sock").await?;
417    /// let tools = client.list_tools().await?;
418    /// # Ok(())
419    /// # }
420    /// ```
421    pub async fn connect_unix(path: impl Into<std::path::PathBuf>) -> Result<Self> {
422        use turbomcp_transport::unix::UnixTransport;
423
424        let transport = UnixTransport::new_client(path.into());
425        let client = Self::new(transport);
426
427        client.initialize().await?;
428
429        Ok(client)
430    }
431}
432
433impl<T: Transport> Client<T> {
434    /// Process incoming messages from the server
435    ///
436    /// This method should be called in a loop to handle server-initiated requests
437    /// like sampling. It processes one message at a time.
438    ///
439    /// # Returns
440    ///
441    /// Returns `Ok(true)` if a message was processed, `Ok(false)` if no message was available.
442    ///
443    /// # Examples
444    ///
445    /// ```rust,no_run
446    /// # use turbomcp_client::Client;
447    /// # use turbomcp_transport::stdio::StdioTransport;
448    /// # async fn example() -> turbomcp_protocol::Result<()> {
449    /// let mut client = Client::new(StdioTransport::new());
450    ///
451    /// // Process messages in background
452    /// tokio::spawn(async move {
453    ///     loop {
454    ///         if let Err(e) = client.process_message().await {
455    ///             eprintln!("Error processing message: {}", e);
456    ///         }
457    ///     }
458    /// });
459    /// # Ok(())
460    /// # }
461    /// ```
462    pub async fn process_message(&self) -> Result<bool> {
463        // Try to receive a message without blocking
464        let message = match self.inner.protocol.transport().receive().await {
465            Ok(Some(msg)) => msg,
466            Ok(None) => return Ok(false),
467            Err(e) => {
468                return Err(Error::transport(format!(
469                    "Failed to receive message: {}",
470                    e
471                )));
472            }
473        };
474
475        // Parse as JSON-RPC message
476        let json_msg: JsonRpcMessage = serde_json::from_slice(&message.payload)
477            .map_err(|e| Error::protocol(format!("Invalid JSON-RPC message: {}", e)))?;
478
479        match json_msg {
480            JsonRpcMessage::Request(request) => {
481                self.handle_request(request).await?;
482                Ok(true)
483            }
484            JsonRpcMessage::Response(_) => {
485                // Responses are handled by the protocol client during request/response flow
486                Ok(true)
487            }
488            JsonRpcMessage::Notification(notification) => {
489                self.handle_notification(notification).await?;
490                Ok(true)
491            }
492            JsonRpcMessage::RequestBatch(_)
493            | JsonRpcMessage::ResponseBatch(_)
494            | JsonRpcMessage::MessageBatch(_) => {
495                // Batch operations not yet supported
496                Ok(true)
497            }
498        }
499    }
500
501    async fn handle_request(&self, request: JsonRpcRequest) -> Result<()> {
502        match request.method.as_str() {
503            "sampling/createMessage" => {
504                let handler_opt = self
505                    .inner
506                    .sampling_handler
507                    .lock()
508                    .expect("sampling_handler mutex poisoned")
509                    .clone();
510                if let Some(handler) = handler_opt {
511                    let params: CreateMessageRequest =
512                        serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
513                            .map_err(|e| {
514                            Error::protocol(format!("Invalid createMessage params: {}", e))
515                        })?;
516
517                    match handler.handle_create_message(params).await {
518                        Ok(result) => {
519                            let result_value = serde_json::to_value(result).map_err(|e| {
520                                Error::protocol(format!("Failed to serialize response: {}", e))
521                            })?;
522                            let response = JsonRpcResponse::success(result_value, request.id);
523                            self.send_response(response).await?;
524                        }
525                        Err(e) => {
526                            let error = turbomcp_protocol::jsonrpc::JsonRpcError {
527                                code: -32603,
528                                message: format!("Sampling handler error: {}", e),
529                                data: None,
530                            };
531                            let response = JsonRpcResponse::error_response(error, request.id);
532                            self.send_response(response).await?;
533                        }
534                    }
535                } else {
536                    // No handler configured
537                    let error = turbomcp_protocol::jsonrpc::JsonRpcError {
538                        code: -32601,
539                        message: "Sampling not supported".to_string(),
540                        data: None,
541                    };
542                    let response = JsonRpcResponse::error_response(error, request.id);
543                    self.send_response(response).await?;
544                }
545            }
546            "elicitation/create" => {
547                // Clone handler Arc before await to avoid holding mutex across await
548                let handler_opt = self
549                    .inner
550                    .handlers
551                    .lock()
552                    .expect("handlers mutex poisoned")
553                    .elicitation
554                    .clone();
555                if let Some(handler) = handler_opt {
556                    // Parse elicitation request params
557                    let params: crate::handlers::ElicitationRequest =
558                        serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
559                            .map_err(|e| {
560                            Error::protocol(format!("Invalid elicitation params: {}", e))
561                        })?;
562
563                    // Call the registered elicitation handler
564                    match handler.handle_elicitation(params).await {
565                        Ok(elicit_response) => {
566                            let result_value =
567                                serde_json::to_value(elicit_response).map_err(|e| {
568                                    Error::protocol(format!(
569                                        "Failed to serialize elicitation response: {}",
570                                        e
571                                    ))
572                                })?;
573                            let response = JsonRpcResponse::success(result_value, request.id);
574                            self.send_response(response).await?;
575                        }
576                        Err(e) => {
577                            // Map handler errors to JSON-RPC errors
578                            let (code, message) = match e {
579                                crate::handlers::HandlerError::UserCancelled => {
580                                    (-32800, "User cancelled elicitation request".to_string())
581                                }
582                                crate::handlers::HandlerError::Timeout { timeout_seconds } => (
583                                    -32801,
584                                    format!(
585                                        "Elicitation request timed out after {} seconds",
586                                        timeout_seconds
587                                    ),
588                                ),
589                                crate::handlers::HandlerError::InvalidInput { details } => {
590                                    (-32602, format!("Invalid user input: {}", details))
591                                }
592                                _ => (-32603, format!("Elicitation handler error: {}", e)),
593                            };
594                            let error = turbomcp_protocol::jsonrpc::JsonRpcError {
595                                code,
596                                message,
597                                data: None,
598                            };
599                            let response = JsonRpcResponse::error_response(error, request.id);
600                            self.send_response(response).await?;
601                        }
602                    }
603                } else {
604                    // No handler configured - elicitation not supported
605                    let error = turbomcp_protocol::jsonrpc::JsonRpcError {
606                        code: -32601,
607                        message: "Elicitation not supported - no handler registered".to_string(),
608                        data: None,
609                    };
610                    let response = JsonRpcResponse::error_response(error, request.id);
611                    self.send_response(response).await?;
612                }
613            }
614            _ => {
615                // Unknown method
616                let error = turbomcp_protocol::jsonrpc::JsonRpcError {
617                    code: -32601,
618                    message: format!("Method not found: {}", request.method),
619                    data: None,
620                };
621                let response = JsonRpcResponse::error_response(error, request.id);
622                self.send_response(response).await?;
623            }
624        }
625        Ok(())
626    }
627
628    async fn handle_notification(&self, _notification: JsonRpcNotification) -> Result<()> {
629        // Handle notifications if needed
630        // Currently MCP doesn't define client-side notifications
631        Ok(())
632    }
633
634    async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
635        let payload = serde_json::to_vec(&response)
636            .map_err(|e| Error::protocol(format!("Failed to serialize response: {}", e)))?;
637
638        let message = TransportMessage::new(
639            turbomcp_protocol::MessageId::from("response".to_string()),
640            payload.into(),
641        );
642
643        self.inner
644            .protocol
645            .transport()
646            .send(message)
647            .await
648            .map_err(|e| Error::transport(format!("Failed to send response: {}", e)))?;
649
650        Ok(())
651    }
652
653    /// Initialize the connection with the MCP server
654    ///
655    /// Performs the initialization handshake with the server, negotiating capabilities
656    /// and establishing the protocol version. This method must be called before
657    /// any other operations can be performed.
658    ///
659    /// # Returns
660    ///
661    /// Returns an `InitializeResult` containing server information and negotiated capabilities.
662    ///
663    /// # Errors
664    ///
665    /// Returns an error if:
666    /// - The transport connection fails
667    /// - The server rejects the initialization request
668    /// - Protocol negotiation fails
669    ///
670    /// # Examples
671    ///
672    /// ```rust,no_run
673    /// # use turbomcp_client::Client;
674    /// # use turbomcp_transport::stdio::StdioTransport;
675    /// # async fn example() -> turbomcp_protocol::Result<()> {
676    /// let mut client = Client::new(StdioTransport::new());
677    ///
678    /// let result = client.initialize().await?;
679    /// println!("Server: {} v{}", result.server_info.name, result.server_info.version);
680    /// # Ok(())
681    /// # }
682    /// ```
683    pub async fn initialize(&self) -> Result<InitializeResult> {
684        // Auto-connect transport if not already connected
685        // This provides consistent DX across all transports (Stdio, TCP, HTTP, WebSocket, Unix)
686        let transport = self.inner.protocol.transport();
687        let transport_state = transport.state().await;
688        if !matches!(
689            transport_state,
690            turbomcp_transport::TransportState::Connected
691        ) {
692            tracing::debug!(
693                "Auto-connecting transport (current state: {:?})",
694                transport_state
695            );
696            transport
697                .connect()
698                .await
699                .map_err(|e| Error::transport(format!("Failed to connect transport: {}", e)))?;
700            tracing::info!("Transport connected successfully");
701        }
702
703        // Build client capabilities based on registered handlers (automatic detection)
704        let mut client_caps = ProtocolClientCapabilities::default();
705
706        // Detect sampling capability from handler
707        if let Some(sampling_caps) = self.get_sampling_capabilities() {
708            client_caps.sampling = Some(sampling_caps);
709        }
710
711        // Detect elicitation capability from handler
712        if let Some(elicitation_caps) = self.get_elicitation_capabilities() {
713            client_caps.elicitation = Some(elicitation_caps);
714        }
715
716        // Detect roots capability from handler
717        if let Some(roots_caps) = self.get_roots_capabilities() {
718            client_caps.roots = Some(roots_caps);
719        }
720
721        // Send MCP initialization request
722        let request = InitializeRequest {
723            protocol_version: PROTOCOL_VERSION.to_string(),
724            capabilities: client_caps,
725            client_info: turbomcp_protocol::types::Implementation {
726                name: "turbomcp-client".to_string(),
727                version: env!("CARGO_PKG_VERSION").to_string(),
728                title: Some("TurboMCP Client".to_string()),
729            },
730            _meta: None,
731        };
732
733        let protocol_response: ProtocolInitializeResult = self
734            .inner
735            .protocol
736            .request("initialize", Some(serde_json::to_value(request)?))
737            .await?;
738
739        // AtomicBool: lock-free store with Ordering::Relaxed
740        self.inner.initialized.store(true, Ordering::Relaxed);
741
742        // Send initialized notification
743        self.inner
744            .protocol
745            .notify("notifications/initialized", None)
746            .await?;
747
748        // Convert protocol response to client response type
749        Ok(InitializeResult {
750            server_info: protocol_response.server_info,
751            server_capabilities: protocol_response.capabilities,
752        })
753    }
754
755    /// Execute a protocol method with plugin middleware
756    ///
757    /// This is a generic helper for wrapping protocol calls with plugin middleware.
758    pub(crate) async fn execute_with_plugins<R>(
759        &self,
760        method_name: &str,
761        params: Option<serde_json::Value>,
762    ) -> Result<R>
763    where
764        R: serde::de::DeserializeOwned + serde::Serialize + Clone,
765    {
766        // Create JSON-RPC request for plugin context
767        let json_rpc_request = turbomcp_protocol::jsonrpc::JsonRpcRequest {
768            jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
769            id: turbomcp_protocol::MessageId::Number(1),
770            method: method_name.to_string(),
771            params: params.clone(),
772        };
773
774        // 1. Create request context for plugins
775        let mut req_ctx =
776            crate::plugins::RequestContext::new(json_rpc_request, std::collections::HashMap::new());
777
778        // 2. Execute before_request plugin middleware
779        if let Err(e) = self
780            .inner
781            .plugin_registry
782            .lock()
783            .await
784            .execute_before_request(&mut req_ctx)
785            .await
786        {
787            return Err(Error::bad_request(format!(
788                "Plugin before_request failed: {}",
789                e
790            )));
791        }
792
793        // 3. Execute the actual protocol call
794        let start_time = std::time::Instant::now();
795        let protocol_result: Result<R> = self
796            .inner
797            .protocol
798            .request(method_name, req_ctx.params().cloned())
799            .await;
800        let duration = start_time.elapsed();
801
802        // 4. Prepare response context
803        let mut resp_ctx = match protocol_result {
804            Ok(ref response) => {
805                let response_value = serde_json::to_value(response.clone())?;
806                crate::plugins::ResponseContext::new(req_ctx, Some(response_value), None, duration)
807            }
808            Err(ref e) => {
809                crate::plugins::ResponseContext::new(req_ctx, None, Some(*e.clone()), duration)
810            }
811        };
812
813        // 5. Execute after_response plugin middleware
814        if let Err(e) = self
815            .inner
816            .plugin_registry
817            .lock()
818            .await
819            .execute_after_response(&mut resp_ctx)
820            .await
821        {
822            return Err(Error::bad_request(format!(
823                "Plugin after_response failed: {}",
824                e
825            )));
826        }
827
828        // 6. Return the final result, checking for plugin modifications
829        match protocol_result {
830            Ok(ref response) => {
831                // Check if plugins modified the response
832                if let Some(modified_response) = resp_ctx.response {
833                    // Try to deserialize the modified response
834                    if let Ok(modified_result) =
835                        serde_json::from_value::<R>(modified_response.clone())
836                    {
837                        return Ok(modified_result);
838                    }
839                }
840
841                // No plugin modifications, use original response
842                Ok(response.clone())
843            }
844            Err(e) => {
845                // Check if plugins provided an error recovery response
846                if let Some(recovery_response) = resp_ctx.response {
847                    if let Ok(recovery_result) = serde_json::from_value::<R>(recovery_response) {
848                        Ok(recovery_result)
849                    } else {
850                        Err(e)
851                    }
852                } else {
853                    Err(e)
854                }
855            }
856        }
857    }
858
859    /// Subscribe to resource change notifications
860    ///
861    /// Registers interest in receiving notifications when the specified
862    /// resource changes. The server will send notifications when the
863    /// resource is modified, created, or deleted.
864    ///
865    /// # Arguments
866    ///
867    /// * `uri` - The URI of the resource to monitor
868    ///
869    /// # Returns
870    ///
871    /// Returns `EmptyResult` on successful subscription.
872    ///
873    /// # Errors
874    ///
875    /// Returns an error if:
876    /// - The client is not initialized
877    /// - The URI is invalid or empty
878    /// - The server doesn't support subscriptions
879    /// - The request fails
880    ///
881    /// # Examples
882    ///
883    /// ```rust,no_run
884    /// # use turbomcp_client::Client;
885    /// # use turbomcp_transport::stdio::StdioTransport;
886    /// # async fn example() -> turbomcp_protocol::Result<()> {
887    /// let mut client = Client::new(StdioTransport::new());
888    /// client.initialize().await?;
889    ///
890    /// // Subscribe to file changes
891    /// client.subscribe("file:///watch/directory").await?;
892    /// println!("Subscribed to resource changes");
893    /// # Ok(())
894    /// # }
895    /// ```
896    pub async fn subscribe(&self, uri: &str) -> Result<EmptyResult> {
897        if !self.inner.initialized.load(Ordering::Relaxed) {
898            return Err(Error::bad_request("Client not initialized"));
899        }
900
901        if uri.is_empty() {
902            return Err(Error::bad_request("Subscription URI cannot be empty"));
903        }
904
905        // Send resources/subscribe request with plugin middleware
906        let request = SubscribeRequest {
907            uri: uri.to_string(),
908        };
909
910        self.execute_with_plugins(
911            "resources/subscribe",
912            Some(serde_json::to_value(request).map_err(|e| {
913                Error::protocol(format!("Failed to serialize subscribe request: {}", e))
914            })?),
915        )
916        .await
917    }
918
919    /// Unsubscribe from resource change notifications
920    ///
921    /// Cancels a previous subscription to resource changes. After unsubscribing,
922    /// the client will no longer receive notifications for the specified resource.
923    ///
924    /// # Arguments
925    ///
926    /// * `uri` - The URI of the resource to stop monitoring
927    ///
928    /// # Returns
929    ///
930    /// Returns `EmptyResult` on successful unsubscription.
931    ///
932    /// # Errors
933    ///
934    /// Returns an error if:
935    /// - The client is not initialized
936    /// - The URI is invalid or empty
937    /// - No active subscription exists for the URI
938    /// - The request fails
939    ///
940    /// # Examples
941    ///
942    /// ```rust,no_run
943    /// # use turbomcp_client::Client;
944    /// # use turbomcp_transport::stdio::StdioTransport;
945    /// # async fn example() -> turbomcp_protocol::Result<()> {
946    /// let mut client = Client::new(StdioTransport::new());
947    /// client.initialize().await?;
948    ///
949    /// // Unsubscribe from file changes
950    /// client.unsubscribe("file:///watch/directory").await?;
951    /// println!("Unsubscribed from resource changes");
952    /// # Ok(())
953    /// # }
954    /// ```
955    pub async fn unsubscribe(&self, uri: &str) -> Result<EmptyResult> {
956        if !self.inner.initialized.load(Ordering::Relaxed) {
957            return Err(Error::bad_request("Client not initialized"));
958        }
959
960        if uri.is_empty() {
961            return Err(Error::bad_request("Unsubscription URI cannot be empty"));
962        }
963
964        // Send resources/unsubscribe request with plugin middleware
965        let request = UnsubscribeRequest {
966            uri: uri.to_string(),
967        };
968
969        self.execute_with_plugins(
970            "resources/unsubscribe",
971            Some(serde_json::to_value(request).map_err(|e| {
972                Error::protocol(format!("Failed to serialize unsubscribe request: {}", e))
973            })?),
974        )
975        .await
976    }
977
978    /// Get the client's capabilities configuration
979    pub fn capabilities(&self) -> &ClientCapabilities {
980        &self.inner.capabilities
981    }
982
983    /// Initialize all registered plugins
984    ///
985    /// This should be called after registration but before using the client.
986    pub async fn initialize_plugins(&self) -> Result<()> {
987        // Set up client context for plugins with actual client capabilities
988        let mut capabilities = std::collections::HashMap::new();
989        capabilities.insert(
990            "protocol_version".to_string(),
991            serde_json::json!("2024-11-05"),
992        );
993        capabilities.insert(
994            "mcp_version".to_string(),
995            serde_json::json!(env!("CARGO_PKG_VERSION")),
996        );
997        capabilities.insert(
998            "supports_notifications".to_string(),
999            serde_json::json!(true),
1000        );
1001        capabilities.insert(
1002            "supports_sampling".to_string(),
1003            serde_json::json!(self.has_sampling_handler()),
1004        );
1005        capabilities.insert("supports_progress".to_string(), serde_json::json!(true));
1006        capabilities.insert("supports_roots".to_string(), serde_json::json!(true));
1007
1008        // Extract client configuration
1009        let mut config = std::collections::HashMap::new();
1010        config.insert(
1011            "client_name".to_string(),
1012            serde_json::json!("turbomcp-client"),
1013        );
1014        config.insert(
1015            "initialized".to_string(),
1016            serde_json::json!(self.inner.initialized.load(Ordering::Relaxed)),
1017        );
1018        config.insert(
1019            "plugin_count".to_string(),
1020            serde_json::json!(self.inner.plugin_registry.lock().await.plugin_count()),
1021        );
1022
1023        let context = crate::plugins::PluginContext::new(
1024            "turbomcp-client".to_string(),
1025            env!("CARGO_PKG_VERSION").to_string(),
1026            capabilities,
1027            config,
1028            vec![], // Will be populated by the registry
1029        );
1030
1031        self.inner
1032            .plugin_registry
1033            .lock()
1034            .await
1035            .set_client_context(context);
1036
1037        // Note: Individual plugins are initialized automatically during registration
1038        // via PluginRegistry::register_plugin(). This method ensures the registry
1039        // has proper client context for any future plugin registrations.
1040        Ok(())
1041    }
1042
1043    /// Cleanup all registered plugins
1044    ///
1045    /// This should be called when the client is being shut down.
1046    pub async fn cleanup_plugins(&self) -> Result<()> {
1047        // Clear the plugin registry - plugins will be dropped and cleaned up automatically
1048        // The Rust ownership system ensures proper cleanup when the Arc<dyn ClientPlugin>
1049        // references are dropped.
1050
1051        // Note: The plugin system uses RAII (Resource Acquisition Is Initialization)
1052        // pattern where plugins clean up their resources in their Drop implementation.
1053        // Replace the registry with a fresh one (mutex ensures safe access)
1054        *self.inner.plugin_registry.lock().await = crate::plugins::PluginRegistry::new();
1055        Ok(())
1056    }
1057
1058    // Note: Capability detection methods (has_*_handler, get_*_capabilities)
1059    // are defined in their respective operation modules:
1060    // - sampling.rs: has_sampling_handler, get_sampling_capabilities
1061    // - handlers.rs: has_elicitation_handler, has_roots_handler
1062    //
1063    // Additional capability getters for elicitation and roots added below
1064    // since they're used during initialization
1065
1066    /// Get elicitation capabilities if handler is registered
1067    /// Automatically detects capability based on registered handler
1068    fn get_elicitation_capabilities(
1069        &self,
1070    ) -> Option<turbomcp_protocol::types::ElicitationCapabilities> {
1071        if self.has_elicitation_handler() {
1072            // Currently returns default capabilities. In the future, schema_validation support
1073            // could be detected from handler traits by adding a HasSchemaValidation marker trait
1074            // that handlers could implement. For now, handlers validate schemas themselves.
1075            Some(turbomcp_protocol::types::ElicitationCapabilities::default())
1076        } else {
1077            None
1078        }
1079    }
1080
1081    /// Get roots capabilities if handler is registered
1082    fn get_roots_capabilities(&self) -> Option<turbomcp_protocol::types::RootsCapabilities> {
1083        if self.has_roots_handler() {
1084            // Roots capabilities indicate whether list can change
1085            Some(turbomcp_protocol::types::RootsCapabilities {
1086                list_changed: Some(true), // Support dynamic roots by default
1087            })
1088        } else {
1089            None
1090        }
1091    }
1092}