turbomcp_client/client/core.rs
1//! Core Client implementation for MCP communication
2//!
3//! This module contains the main `Client<T>` struct and its implementation,
4//! providing the core MCP client functionality including:
5//!
6//! - Connection initialization and lifecycle management
7//! - Message processing and bidirectional communication
8//! - MCP operation support (tools, prompts, resources, sampling, etc.)
9//! - Plugin middleware integration
10//! - Handler registration and management
11//!
12//! # Architecture
13//!
14//! `Client<T>` is implemented as a cheaply-cloneable Arc wrapper with interior
15//! mutability (same pattern as reqwest and AWS SDK):
16//!
17//! - **AtomicBool** for initialized flag (lock-free)
18//! - **Arc<Mutex<...>>** for handlers/plugins (infrequent mutation)
19//! - **`Arc<ClientInner<T>>`** for cheap cloning
20
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::{Arc, Mutex as StdMutex};
23
24use turbomcp_protocol::jsonrpc::*;
25use turbomcp_protocol::types::{
26 ClientCapabilities as ProtocolClientCapabilities, InitializeResult as ProtocolInitializeResult,
27 *,
28};
29use turbomcp_protocol::{Error, PROTOCOL_VERSION, Result};
30use turbomcp_transport::{Transport, TransportMessage};
31
32use super::config::InitializeResult;
33use super::protocol::ProtocolClient;
34use crate::{ClientCapabilities, handlers::HandlerRegistry, sampling::SamplingHandler};
35
36/// Inner client state with interior mutability
37///
38/// This structure contains the actual client state and is wrapped in Arc<...>
39/// to enable cheap cloning (same pattern as reqwest and AWS SDK).
40pub(super) struct ClientInner<T: Transport> {
41 /// Protocol client for low-level communication
42 pub(super) protocol: ProtocolClient<T>,
43
44 /// Client capabilities (immutable after construction)
45 pub(super) capabilities: ClientCapabilities,
46
47 /// Initialization state (lock-free atomic boolean)
48 pub(super) initialized: AtomicBool,
49
50 /// Optional sampling handler (mutex for dynamic updates)
51 pub(super) sampling_handler: Arc<StdMutex<Option<Arc<dyn SamplingHandler>>>>,
52
53 /// Handler registry for bidirectional communication (mutex for registration)
54 pub(super) handlers: Arc<StdMutex<HandlerRegistry>>,
55
56 /// Plugin registry for middleware (tokio mutex - holds across await)
57 pub(super) plugin_registry: Arc<tokio::sync::Mutex<crate::plugins::PluginRegistry>>,
58}
59
60/// The core MCP client implementation
61///
62/// Client provides a comprehensive interface for communicating with MCP servers,
63/// supporting all protocol features including tools, prompts, resources, sampling,
64/// elicitation, and bidirectional communication patterns.
65///
66/// # Clone Pattern
67///
68/// `Client<T>` is cheaply cloneable via Arc (same pattern as reqwest and AWS SDK).
69/// All clones share the same underlying connection and state:
70///
71/// ```rust,no_run
72/// use turbomcp_client::Client;
73/// use turbomcp_transport::stdio::StdioTransport;
74///
75/// # async fn example() -> turbomcp_protocol::Result<()> {
76/// let client = Client::new(StdioTransport::new());
77/// client.initialize().await?;
78///
79/// // Cheap clone - shares same connection
80/// let client2 = client.clone();
81/// tokio::spawn(async move {
82/// client2.list_tools().await.ok();
83/// });
84/// # Ok(())
85/// # }
86/// ```
87///
88/// The client must be initialized before use by calling `initialize()` to perform
89/// the MCP handshake and capability negotiation.
90///
91/// # Features
92///
93/// - **Protocol Compliance**: Full MCP 2025-06-18 specification support
94/// - **Bidirectional Communication**: Server-initiated requests and client responses
95/// - **Plugin Middleware**: Extensible request/response processing
96/// - **Handler Registry**: Callbacks for server-initiated operations
97/// - **Connection Management**: Robust error handling and recovery
98/// - **Type Safety**: Compile-time guarantees for MCP message types
99/// - **Cheap Cloning**: Arc-based sharing like reqwest/AWS SDK
100///
101/// # Examples
102///
103/// ```rust,no_run
104/// use turbomcp_client::Client;
105/// use turbomcp_transport::stdio::StdioTransport;
106/// use std::collections::HashMap;
107///
108/// # async fn example() -> turbomcp_protocol::Result<()> {
109/// // Create and initialize client (no mut needed!)
110/// let client = Client::new(StdioTransport::new());
111/// let init_result = client.initialize().await?;
112/// println!("Connected to: {}", init_result.server_info.name);
113///
114/// // Use MCP operations
115/// let tools = client.list_tools().await?;
116/// let mut args = HashMap::new();
117/// args.insert("input".to_string(), serde_json::json!("test"));
118/// let result = client.call_tool("my_tool", Some(args)).await?;
119/// # Ok(())
120/// # }
121/// ```
122pub struct Client<T: Transport> {
123 pub(super) inner: Arc<ClientInner<T>>,
124}
125
126/// Clone implementation via Arc (same pattern as reqwest/AWS SDK)
127///
128/// Cloning a Client is cheap (just an Arc clone) and all clones share
129/// the same underlying connection and state.
130impl<T: Transport> Clone for Client<T> {
131 fn clone(&self) -> Self {
132 Self {
133 inner: Arc::clone(&self.inner),
134 }
135 }
136}
137
138impl<T: Transport> Client<T> {
139 /// Create a new client with the specified transport
140 ///
141 /// Creates a new MCP client instance with default capabilities.
142 /// The client must be initialized before use by calling `initialize()`.
143 ///
144 /// # Arguments
145 ///
146 /// * `transport` - The transport implementation to use for communication
147 ///
148 /// # Examples
149 ///
150 /// ```rust,no_run
151 /// use turbomcp_client::Client;
152 /// use turbomcp_transport::stdio::StdioTransport;
153 ///
154 /// let transport = StdioTransport::new();
155 /// let client = Client::new(transport);
156 /// ```
157 pub fn new(transport: T) -> Self {
158 Self {
159 inner: Arc::new(ClientInner {
160 protocol: ProtocolClient::new(transport),
161 capabilities: ClientCapabilities::default(),
162 initialized: AtomicBool::new(false),
163 sampling_handler: Arc::new(StdMutex::new(None)),
164 handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
165 plugin_registry: Arc::new(tokio::sync::Mutex::new(
166 crate::plugins::PluginRegistry::new(),
167 )),
168 }),
169 }
170 }
171
172 /// Create a new client with custom capabilities
173 ///
174 /// # Arguments
175 ///
176 /// * `transport` - The transport implementation to use
177 /// * `capabilities` - The client capabilities to negotiate
178 ///
179 /// # Examples
180 ///
181 /// ```rust,no_run
182 /// use turbomcp_client::{Client, ClientCapabilities};
183 /// use turbomcp_transport::stdio::StdioTransport;
184 ///
185 /// let capabilities = ClientCapabilities {
186 /// tools: true,
187 /// prompts: true,
188 /// resources: false,
189 /// sampling: false,
190 /// };
191 ///
192 /// let transport = StdioTransport::new();
193 /// let client = Client::with_capabilities(transport, capabilities);
194 /// ```
195 pub fn with_capabilities(transport: T, capabilities: ClientCapabilities) -> Self {
196 Self {
197 inner: Arc::new(ClientInner {
198 protocol: ProtocolClient::new(transport),
199 capabilities,
200 initialized: AtomicBool::new(false),
201 sampling_handler: Arc::new(StdMutex::new(None)),
202 handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
203 plugin_registry: Arc::new(tokio::sync::Mutex::new(
204 crate::plugins::PluginRegistry::new(),
205 )),
206 }),
207 }
208 }
209}
210
211// ============================================================================
212// HTTP-Specific Convenience Constructors (Feature-Gated)
213// ============================================================================
214
215#[cfg(feature = "http")]
216impl Client<turbomcp_transport::streamable_http_client::StreamableHttpClientTransport> {
217 /// Connect to an HTTP MCP server (convenience method)
218 ///
219 /// This is a beautiful one-liner alternative to manual configuration.
220 /// Creates an HTTP client, connects, and initializes in one call.
221 ///
222 /// # Arguments
223 ///
224 /// * `url` - The base URL of the MCP server (e.g., "http://localhost:8080")
225 ///
226 /// # Returns
227 ///
228 /// Returns an initialized `Client` ready to use.
229 ///
230 /// # Errors
231 ///
232 /// Returns an error if:
233 /// - The URL is invalid
234 /// - Connection to the server fails
235 /// - Initialization handshake fails
236 ///
237 /// # Examples
238 ///
239 /// ```rust,no_run
240 /// use turbomcp_client::Client;
241 ///
242 /// # async fn example() -> turbomcp_protocol::Result<()> {
243 /// // Beautiful one-liner - balanced with server DX
244 /// let client = Client::connect_http("http://localhost:8080").await?;
245 ///
246 /// // Now use it directly
247 /// let tools = client.list_tools().await?;
248 /// # Ok(())
249 /// # }
250 /// ```
251 ///
252 /// Compare to verbose approach (10+ lines):
253 /// ```rust,no_run
254 /// use turbomcp_client::Client;
255 /// use turbomcp_transport::streamable_http_client::{
256 /// StreamableHttpClientConfig, StreamableHttpClientTransport
257 /// };
258 ///
259 /// # async fn example() -> turbomcp_protocol::Result<()> {
260 /// let config = StreamableHttpClientConfig {
261 /// base_url: "http://localhost:8080".to_string(),
262 /// ..Default::default()
263 /// };
264 /// let transport = StreamableHttpClientTransport::new(config);
265 /// let client = Client::new(transport);
266 /// client.initialize().await?;
267 /// # Ok(())
268 /// # }
269 /// ```
270 pub async fn connect_http(url: impl Into<String>) -> Result<Self> {
271 use turbomcp_transport::streamable_http_client::{
272 StreamableHttpClientConfig, StreamableHttpClientTransport,
273 };
274
275 let config = StreamableHttpClientConfig {
276 base_url: url.into(),
277 ..Default::default()
278 };
279
280 let transport = StreamableHttpClientTransport::new(config);
281 let client = Self::new(transport);
282
283 // Initialize connection immediately
284 client.initialize().await?;
285
286 Ok(client)
287 }
288
289 /// Connect to an HTTP MCP server with custom configuration
290 ///
291 /// Provides more control than `connect_http()` while still being ergonomic.
292 ///
293 /// # Arguments
294 ///
295 /// * `url` - The base URL of the MCP server
296 /// * `config_fn` - Function to customize the configuration
297 ///
298 /// # Examples
299 ///
300 /// ```rust,no_run
301 /// use turbomcp_client::Client;
302 /// use std::time::Duration;
303 ///
304 /// # async fn example() -> turbomcp_protocol::Result<()> {
305 /// let client = Client::connect_http_with("http://localhost:8080", |config| {
306 /// config.timeout = Duration::from_secs(60);
307 /// config.endpoint_path = "/api/mcp".to_string();
308 /// }).await?;
309 /// # Ok(())
310 /// # }
311 /// ```
312 pub async fn connect_http_with<F>(url: impl Into<String>, config_fn: F) -> Result<Self>
313 where
314 F: FnOnce(&mut turbomcp_transport::streamable_http_client::StreamableHttpClientConfig),
315 {
316 use turbomcp_transport::streamable_http_client::{
317 StreamableHttpClientConfig, StreamableHttpClientTransport,
318 };
319
320 let mut config = StreamableHttpClientConfig {
321 base_url: url.into(),
322 ..Default::default()
323 };
324
325 config_fn(&mut config);
326
327 let transport = StreamableHttpClientTransport::new(config);
328 let client = Self::new(transport);
329
330 client.initialize().await?;
331
332 Ok(client)
333 }
334}
335
336// ============================================================================
337// TCP-Specific Convenience Constructors (Feature-Gated)
338// ============================================================================
339
340#[cfg(feature = "tcp")]
341impl Client<turbomcp_transport::tcp::TcpTransport> {
342 /// Connect to a TCP MCP server (convenience method)
343 ///
344 /// Beautiful one-liner for TCP connections - balanced DX.
345 ///
346 /// # Arguments
347 ///
348 /// * `addr` - Server address (e.g., "127.0.0.1:8765" or localhost:8765")
349 ///
350 /// # Returns
351 ///
352 /// Returns an initialized `Client` ready to use.
353 ///
354 /// # Examples
355 ///
356 /// ```rust,no_run
357 /// # #[cfg(feature = "tcp")]
358 /// use turbomcp_client::Client;
359 ///
360 /// # async fn example() -> turbomcp_protocol::Result<()> {
361 /// let client = Client::connect_tcp("127.0.0.1:8765").await?;
362 /// let tools = client.list_tools().await?;
363 /// # Ok(())
364 /// # }
365 /// ```
366 pub async fn connect_tcp(addr: impl AsRef<str>) -> Result<Self> {
367 use std::net::SocketAddr;
368 use turbomcp_transport::tcp::TcpTransport;
369
370 let server_addr: SocketAddr = addr
371 .as_ref()
372 .parse()
373 .map_err(|e| Error::bad_request(format!("Invalid address: {}", e)))?;
374
375 // Client binds to 0.0.0.0:0 (any available port)
376 let bind_addr: SocketAddr = if server_addr.is_ipv6() {
377 "[::]:0".parse().unwrap()
378 } else {
379 "0.0.0.0:0".parse().unwrap()
380 };
381
382 let transport = TcpTransport::new_client(bind_addr, server_addr);
383 let client = Self::new(transport);
384
385 client.initialize().await?;
386
387 Ok(client)
388 }
389}
390
391// ============================================================================
392// Unix Socket-Specific Convenience Constructors (Feature-Gated)
393// ============================================================================
394
395#[cfg(all(unix, feature = "unix"))]
396impl Client<turbomcp_transport::unix::UnixTransport> {
397 /// Connect to a Unix socket MCP server (convenience method)
398 ///
399 /// Beautiful one-liner for Unix socket IPC - balanced DX.
400 ///
401 /// # Arguments
402 ///
403 /// * `path` - Socket file path (e.g., "/tmp/mcp.sock")
404 ///
405 /// # Returns
406 ///
407 /// Returns an initialized `Client` ready to use.
408 ///
409 /// # Examples
410 ///
411 /// ```rust,no_run
412 /// # #[cfg(all(unix, feature = "unix"))]
413 /// use turbomcp_client::Client;
414 ///
415 /// # async fn example() -> turbomcp_protocol::Result<()> {
416 /// let client = Client::connect_unix("/tmp/mcp.sock").await?;
417 /// let tools = client.list_tools().await?;
418 /// # Ok(())
419 /// # }
420 /// ```
421 pub async fn connect_unix(path: impl Into<std::path::PathBuf>) -> Result<Self> {
422 use turbomcp_transport::unix::UnixTransport;
423
424 let transport = UnixTransport::new_client(path.into());
425 let client = Self::new(transport);
426
427 client.initialize().await?;
428
429 Ok(client)
430 }
431}
432
433impl<T: Transport> Client<T> {
434 /// Process incoming messages from the server
435 ///
436 /// This method should be called in a loop to handle server-initiated requests
437 /// like sampling. It processes one message at a time.
438 ///
439 /// # Returns
440 ///
441 /// Returns `Ok(true)` if a message was processed, `Ok(false)` if no message was available.
442 ///
443 /// # Examples
444 ///
445 /// ```rust,no_run
446 /// # use turbomcp_client::Client;
447 /// # use turbomcp_transport::stdio::StdioTransport;
448 /// # async fn example() -> turbomcp_protocol::Result<()> {
449 /// let mut client = Client::new(StdioTransport::new());
450 ///
451 /// // Process messages in background
452 /// tokio::spawn(async move {
453 /// loop {
454 /// if let Err(e) = client.process_message().await {
455 /// eprintln!("Error processing message: {}", e);
456 /// }
457 /// }
458 /// });
459 /// # Ok(())
460 /// # }
461 /// ```
462 pub async fn process_message(&self) -> Result<bool> {
463 // Try to receive a message without blocking
464 let message = match self.inner.protocol.transport().receive().await {
465 Ok(Some(msg)) => msg,
466 Ok(None) => return Ok(false),
467 Err(e) => {
468 return Err(Error::transport(format!(
469 "Failed to receive message: {}",
470 e
471 )));
472 }
473 };
474
475 // Parse as JSON-RPC message
476 let json_msg: JsonRpcMessage = serde_json::from_slice(&message.payload)
477 .map_err(|e| Error::protocol(format!("Invalid JSON-RPC message: {}", e)))?;
478
479 match json_msg {
480 JsonRpcMessage::Request(request) => {
481 self.handle_request(request).await?;
482 Ok(true)
483 }
484 JsonRpcMessage::Response(_) => {
485 // Responses are handled by the protocol client during request/response flow
486 Ok(true)
487 }
488 JsonRpcMessage::Notification(notification) => {
489 self.handle_notification(notification).await?;
490 Ok(true)
491 }
492 JsonRpcMessage::RequestBatch(_)
493 | JsonRpcMessage::ResponseBatch(_)
494 | JsonRpcMessage::MessageBatch(_) => {
495 // Batch operations not yet supported
496 Ok(true)
497 }
498 }
499 }
500
501 async fn handle_request(&self, request: JsonRpcRequest) -> Result<()> {
502 match request.method.as_str() {
503 "sampling/createMessage" => {
504 let handler_opt = self
505 .inner
506 .sampling_handler
507 .lock()
508 .expect("sampling_handler mutex poisoned")
509 .clone();
510 if let Some(handler) = handler_opt {
511 let params: CreateMessageRequest =
512 serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
513 .map_err(|e| {
514 Error::protocol(format!("Invalid createMessage params: {}", e))
515 })?;
516
517 match handler.handle_create_message(params).await {
518 Ok(result) => {
519 let result_value = serde_json::to_value(result).map_err(|e| {
520 Error::protocol(format!("Failed to serialize response: {}", e))
521 })?;
522 let response = JsonRpcResponse::success(result_value, request.id);
523 self.send_response(response).await?;
524 }
525 Err(e) => {
526 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
527 code: -32603,
528 message: format!("Sampling handler error: {}", e),
529 data: None,
530 };
531 let response = JsonRpcResponse::error_response(error, request.id);
532 self.send_response(response).await?;
533 }
534 }
535 } else {
536 // No handler configured
537 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
538 code: -32601,
539 message: "Sampling not supported".to_string(),
540 data: None,
541 };
542 let response = JsonRpcResponse::error_response(error, request.id);
543 self.send_response(response).await?;
544 }
545 }
546 "elicitation/create" => {
547 // Clone handler Arc before await to avoid holding mutex across await
548 let handler_opt = self
549 .inner
550 .handlers
551 .lock()
552 .expect("handlers mutex poisoned")
553 .elicitation
554 .clone();
555 if let Some(handler) = handler_opt {
556 // Parse elicitation request params
557 let params: crate::handlers::ElicitationRequest =
558 serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
559 .map_err(|e| {
560 Error::protocol(format!("Invalid elicitation params: {}", e))
561 })?;
562
563 // Call the registered elicitation handler
564 match handler.handle_elicitation(params).await {
565 Ok(elicit_response) => {
566 let result_value =
567 serde_json::to_value(elicit_response).map_err(|e| {
568 Error::protocol(format!(
569 "Failed to serialize elicitation response: {}",
570 e
571 ))
572 })?;
573 let response = JsonRpcResponse::success(result_value, request.id);
574 self.send_response(response).await?;
575 }
576 Err(e) => {
577 // Map handler errors to JSON-RPC errors
578 let (code, message) = match e {
579 crate::handlers::HandlerError::UserCancelled => {
580 (-32800, "User cancelled elicitation request".to_string())
581 }
582 crate::handlers::HandlerError::Timeout { timeout_seconds } => (
583 -32801,
584 format!(
585 "Elicitation request timed out after {} seconds",
586 timeout_seconds
587 ),
588 ),
589 crate::handlers::HandlerError::InvalidInput { details } => {
590 (-32602, format!("Invalid user input: {}", details))
591 }
592 _ => (-32603, format!("Elicitation handler error: {}", e)),
593 };
594 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
595 code,
596 message,
597 data: None,
598 };
599 let response = JsonRpcResponse::error_response(error, request.id);
600 self.send_response(response).await?;
601 }
602 }
603 } else {
604 // No handler configured - elicitation not supported
605 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
606 code: -32601,
607 message: "Elicitation not supported - no handler registered".to_string(),
608 data: None,
609 };
610 let response = JsonRpcResponse::error_response(error, request.id);
611 self.send_response(response).await?;
612 }
613 }
614 _ => {
615 // Unknown method
616 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
617 code: -32601,
618 message: format!("Method not found: {}", request.method),
619 data: None,
620 };
621 let response = JsonRpcResponse::error_response(error, request.id);
622 self.send_response(response).await?;
623 }
624 }
625 Ok(())
626 }
627
628 async fn handle_notification(&self, _notification: JsonRpcNotification) -> Result<()> {
629 // Handle notifications if needed
630 // Currently MCP doesn't define client-side notifications
631 Ok(())
632 }
633
634 async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
635 let payload = serde_json::to_vec(&response)
636 .map_err(|e| Error::protocol(format!("Failed to serialize response: {}", e)))?;
637
638 let message = TransportMessage::new(
639 turbomcp_protocol::MessageId::from("response".to_string()),
640 payload.into(),
641 );
642
643 self.inner
644 .protocol
645 .transport()
646 .send(message)
647 .await
648 .map_err(|e| Error::transport(format!("Failed to send response: {}", e)))?;
649
650 Ok(())
651 }
652
653 /// Initialize the connection with the MCP server
654 ///
655 /// Performs the initialization handshake with the server, negotiating capabilities
656 /// and establishing the protocol version. This method must be called before
657 /// any other operations can be performed.
658 ///
659 /// # Returns
660 ///
661 /// Returns an `InitializeResult` containing server information and negotiated capabilities.
662 ///
663 /// # Errors
664 ///
665 /// Returns an error if:
666 /// - The transport connection fails
667 /// - The server rejects the initialization request
668 /// - Protocol negotiation fails
669 ///
670 /// # Examples
671 ///
672 /// ```rust,no_run
673 /// # use turbomcp_client::Client;
674 /// # use turbomcp_transport::stdio::StdioTransport;
675 /// # async fn example() -> turbomcp_protocol::Result<()> {
676 /// let mut client = Client::new(StdioTransport::new());
677 ///
678 /// let result = client.initialize().await?;
679 /// println!("Server: {} v{}", result.server_info.name, result.server_info.version);
680 /// # Ok(())
681 /// # }
682 /// ```
683 pub async fn initialize(&self) -> Result<InitializeResult> {
684 // Auto-connect transport if not already connected
685 // This provides consistent DX across all transports (Stdio, TCP, HTTP, WebSocket, Unix)
686 let transport = self.inner.protocol.transport();
687 let transport_state = transport.state().await;
688 if !matches!(
689 transport_state,
690 turbomcp_transport::TransportState::Connected
691 ) {
692 tracing::debug!(
693 "Auto-connecting transport (current state: {:?})",
694 transport_state
695 );
696 transport
697 .connect()
698 .await
699 .map_err(|e| Error::transport(format!("Failed to connect transport: {}", e)))?;
700 tracing::info!("Transport connected successfully");
701 }
702
703 // Build client capabilities based on registered handlers (automatic detection)
704 let mut client_caps = ProtocolClientCapabilities::default();
705
706 // Detect sampling capability from handler
707 if let Some(sampling_caps) = self.get_sampling_capabilities() {
708 client_caps.sampling = Some(sampling_caps);
709 }
710
711 // Detect elicitation capability from handler
712 if let Some(elicitation_caps) = self.get_elicitation_capabilities() {
713 client_caps.elicitation = Some(elicitation_caps);
714 }
715
716 // Detect roots capability from handler
717 if let Some(roots_caps) = self.get_roots_capabilities() {
718 client_caps.roots = Some(roots_caps);
719 }
720
721 // Send MCP initialization request
722 let request = InitializeRequest {
723 protocol_version: PROTOCOL_VERSION.to_string(),
724 capabilities: client_caps,
725 client_info: turbomcp_protocol::types::Implementation {
726 name: "turbomcp-client".to_string(),
727 version: env!("CARGO_PKG_VERSION").to_string(),
728 title: Some("TurboMCP Client".to_string()),
729 },
730 _meta: None,
731 };
732
733 let protocol_response: ProtocolInitializeResult = self
734 .inner
735 .protocol
736 .request("initialize", Some(serde_json::to_value(request)?))
737 .await?;
738
739 // AtomicBool: lock-free store with Ordering::Relaxed
740 self.inner.initialized.store(true, Ordering::Relaxed);
741
742 // Send initialized notification
743 self.inner
744 .protocol
745 .notify("notifications/initialized", None)
746 .await?;
747
748 // Convert protocol response to client response type
749 Ok(InitializeResult {
750 server_info: protocol_response.server_info,
751 server_capabilities: protocol_response.capabilities,
752 })
753 }
754
755 /// Execute a protocol method with plugin middleware
756 ///
757 /// This is a generic helper for wrapping protocol calls with plugin middleware.
758 pub(crate) async fn execute_with_plugins<R>(
759 &self,
760 method_name: &str,
761 params: Option<serde_json::Value>,
762 ) -> Result<R>
763 where
764 R: serde::de::DeserializeOwned + serde::Serialize + Clone,
765 {
766 // Create JSON-RPC request for plugin context
767 let json_rpc_request = turbomcp_protocol::jsonrpc::JsonRpcRequest {
768 jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
769 id: turbomcp_protocol::MessageId::Number(1),
770 method: method_name.to_string(),
771 params: params.clone(),
772 };
773
774 // 1. Create request context for plugins
775 let mut req_ctx =
776 crate::plugins::RequestContext::new(json_rpc_request, std::collections::HashMap::new());
777
778 // 2. Execute before_request plugin middleware
779 if let Err(e) = self
780 .inner
781 .plugin_registry
782 .lock()
783 .await
784 .execute_before_request(&mut req_ctx)
785 .await
786 {
787 return Err(Error::bad_request(format!(
788 "Plugin before_request failed: {}",
789 e
790 )));
791 }
792
793 // 3. Execute the actual protocol call
794 let start_time = std::time::Instant::now();
795 let protocol_result: Result<R> = self
796 .inner
797 .protocol
798 .request(method_name, req_ctx.params().cloned())
799 .await;
800 let duration = start_time.elapsed();
801
802 // 4. Prepare response context
803 let mut resp_ctx = match protocol_result {
804 Ok(ref response) => {
805 let response_value = serde_json::to_value(response.clone())?;
806 crate::plugins::ResponseContext::new(req_ctx, Some(response_value), None, duration)
807 }
808 Err(ref e) => {
809 crate::plugins::ResponseContext::new(req_ctx, None, Some(*e.clone()), duration)
810 }
811 };
812
813 // 5. Execute after_response plugin middleware
814 if let Err(e) = self
815 .inner
816 .plugin_registry
817 .lock()
818 .await
819 .execute_after_response(&mut resp_ctx)
820 .await
821 {
822 return Err(Error::bad_request(format!(
823 "Plugin after_response failed: {}",
824 e
825 )));
826 }
827
828 // 6. Return the final result, checking for plugin modifications
829 match protocol_result {
830 Ok(ref response) => {
831 // Check if plugins modified the response
832 if let Some(modified_response) = resp_ctx.response {
833 // Try to deserialize the modified response
834 if let Ok(modified_result) =
835 serde_json::from_value::<R>(modified_response.clone())
836 {
837 return Ok(modified_result);
838 }
839 }
840
841 // No plugin modifications, use original response
842 Ok(response.clone())
843 }
844 Err(e) => {
845 // Check if plugins provided an error recovery response
846 if let Some(recovery_response) = resp_ctx.response {
847 if let Ok(recovery_result) = serde_json::from_value::<R>(recovery_response) {
848 Ok(recovery_result)
849 } else {
850 Err(e)
851 }
852 } else {
853 Err(e)
854 }
855 }
856 }
857 }
858
859 /// Subscribe to resource change notifications
860 ///
861 /// Registers interest in receiving notifications when the specified
862 /// resource changes. The server will send notifications when the
863 /// resource is modified, created, or deleted.
864 ///
865 /// # Arguments
866 ///
867 /// * `uri` - The URI of the resource to monitor
868 ///
869 /// # Returns
870 ///
871 /// Returns `EmptyResult` on successful subscription.
872 ///
873 /// # Errors
874 ///
875 /// Returns an error if:
876 /// - The client is not initialized
877 /// - The URI is invalid or empty
878 /// - The server doesn't support subscriptions
879 /// - The request fails
880 ///
881 /// # Examples
882 ///
883 /// ```rust,no_run
884 /// # use turbomcp_client::Client;
885 /// # use turbomcp_transport::stdio::StdioTransport;
886 /// # async fn example() -> turbomcp_protocol::Result<()> {
887 /// let mut client = Client::new(StdioTransport::new());
888 /// client.initialize().await?;
889 ///
890 /// // Subscribe to file changes
891 /// client.subscribe("file:///watch/directory").await?;
892 /// println!("Subscribed to resource changes");
893 /// # Ok(())
894 /// # }
895 /// ```
896 pub async fn subscribe(&self, uri: &str) -> Result<EmptyResult> {
897 if !self.inner.initialized.load(Ordering::Relaxed) {
898 return Err(Error::bad_request("Client not initialized"));
899 }
900
901 if uri.is_empty() {
902 return Err(Error::bad_request("Subscription URI cannot be empty"));
903 }
904
905 // Send resources/subscribe request with plugin middleware
906 let request = SubscribeRequest {
907 uri: uri.to_string(),
908 };
909
910 self.execute_with_plugins(
911 "resources/subscribe",
912 Some(serde_json::to_value(request).map_err(|e| {
913 Error::protocol(format!("Failed to serialize subscribe request: {}", e))
914 })?),
915 )
916 .await
917 }
918
919 /// Unsubscribe from resource change notifications
920 ///
921 /// Cancels a previous subscription to resource changes. After unsubscribing,
922 /// the client will no longer receive notifications for the specified resource.
923 ///
924 /// # Arguments
925 ///
926 /// * `uri` - The URI of the resource to stop monitoring
927 ///
928 /// # Returns
929 ///
930 /// Returns `EmptyResult` on successful unsubscription.
931 ///
932 /// # Errors
933 ///
934 /// Returns an error if:
935 /// - The client is not initialized
936 /// - The URI is invalid or empty
937 /// - No active subscription exists for the URI
938 /// - The request fails
939 ///
940 /// # Examples
941 ///
942 /// ```rust,no_run
943 /// # use turbomcp_client::Client;
944 /// # use turbomcp_transport::stdio::StdioTransport;
945 /// # async fn example() -> turbomcp_protocol::Result<()> {
946 /// let mut client = Client::new(StdioTransport::new());
947 /// client.initialize().await?;
948 ///
949 /// // Unsubscribe from file changes
950 /// client.unsubscribe("file:///watch/directory").await?;
951 /// println!("Unsubscribed from resource changes");
952 /// # Ok(())
953 /// # }
954 /// ```
955 pub async fn unsubscribe(&self, uri: &str) -> Result<EmptyResult> {
956 if !self.inner.initialized.load(Ordering::Relaxed) {
957 return Err(Error::bad_request("Client not initialized"));
958 }
959
960 if uri.is_empty() {
961 return Err(Error::bad_request("Unsubscription URI cannot be empty"));
962 }
963
964 // Send resources/unsubscribe request with plugin middleware
965 let request = UnsubscribeRequest {
966 uri: uri.to_string(),
967 };
968
969 self.execute_with_plugins(
970 "resources/unsubscribe",
971 Some(serde_json::to_value(request).map_err(|e| {
972 Error::protocol(format!("Failed to serialize unsubscribe request: {}", e))
973 })?),
974 )
975 .await
976 }
977
978 /// Get the client's capabilities configuration
979 pub fn capabilities(&self) -> &ClientCapabilities {
980 &self.inner.capabilities
981 }
982
983 /// Initialize all registered plugins
984 ///
985 /// This should be called after registration but before using the client.
986 pub async fn initialize_plugins(&self) -> Result<()> {
987 // Set up client context for plugins with actual client capabilities
988 let mut capabilities = std::collections::HashMap::new();
989 capabilities.insert(
990 "protocol_version".to_string(),
991 serde_json::json!("2024-11-05"),
992 );
993 capabilities.insert(
994 "mcp_version".to_string(),
995 serde_json::json!(env!("CARGO_PKG_VERSION")),
996 );
997 capabilities.insert(
998 "supports_notifications".to_string(),
999 serde_json::json!(true),
1000 );
1001 capabilities.insert(
1002 "supports_sampling".to_string(),
1003 serde_json::json!(self.has_sampling_handler()),
1004 );
1005 capabilities.insert("supports_progress".to_string(), serde_json::json!(true));
1006 capabilities.insert("supports_roots".to_string(), serde_json::json!(true));
1007
1008 // Extract client configuration
1009 let mut config = std::collections::HashMap::new();
1010 config.insert(
1011 "client_name".to_string(),
1012 serde_json::json!("turbomcp-client"),
1013 );
1014 config.insert(
1015 "initialized".to_string(),
1016 serde_json::json!(self.inner.initialized.load(Ordering::Relaxed)),
1017 );
1018 config.insert(
1019 "plugin_count".to_string(),
1020 serde_json::json!(self.inner.plugin_registry.lock().await.plugin_count()),
1021 );
1022
1023 let context = crate::plugins::PluginContext::new(
1024 "turbomcp-client".to_string(),
1025 env!("CARGO_PKG_VERSION").to_string(),
1026 capabilities,
1027 config,
1028 vec![], // Will be populated by the registry
1029 );
1030
1031 self.inner
1032 .plugin_registry
1033 .lock()
1034 .await
1035 .set_client_context(context);
1036
1037 // Note: Individual plugins are initialized automatically during registration
1038 // via PluginRegistry::register_plugin(). This method ensures the registry
1039 // has proper client context for any future plugin registrations.
1040 Ok(())
1041 }
1042
1043 /// Cleanup all registered plugins
1044 ///
1045 /// This should be called when the client is being shut down.
1046 pub async fn cleanup_plugins(&self) -> Result<()> {
1047 // Clear the plugin registry - plugins will be dropped and cleaned up automatically
1048 // The Rust ownership system ensures proper cleanup when the Arc<dyn ClientPlugin>
1049 // references are dropped.
1050
1051 // Note: The plugin system uses RAII (Resource Acquisition Is Initialization)
1052 // pattern where plugins clean up their resources in their Drop implementation.
1053 // Replace the registry with a fresh one (mutex ensures safe access)
1054 *self.inner.plugin_registry.lock().await = crate::plugins::PluginRegistry::new();
1055 Ok(())
1056 }
1057
1058 // Note: Capability detection methods (has_*_handler, get_*_capabilities)
1059 // are defined in their respective operation modules:
1060 // - sampling.rs: has_sampling_handler, get_sampling_capabilities
1061 // - handlers.rs: has_elicitation_handler, has_roots_handler
1062 //
1063 // Additional capability getters for elicitation and roots added below
1064 // since they're used during initialization
1065
1066 /// Get elicitation capabilities if handler is registered
1067 /// Automatically detects capability based on registered handler
1068 fn get_elicitation_capabilities(
1069 &self,
1070 ) -> Option<turbomcp_protocol::types::ElicitationCapabilities> {
1071 if self.has_elicitation_handler() {
1072 // Currently returns default capabilities. In the future, schema_validation support
1073 // could be detected from handler traits by adding a HasSchemaValidation marker trait
1074 // that handlers could implement. For now, handlers validate schemas themselves.
1075 Some(turbomcp_protocol::types::ElicitationCapabilities::default())
1076 } else {
1077 None
1078 }
1079 }
1080
1081 /// Get roots capabilities if handler is registered
1082 fn get_roots_capabilities(&self) -> Option<turbomcp_protocol::types::RootsCapabilities> {
1083 if self.has_roots_handler() {
1084 // Roots capabilities indicate whether list can change
1085 Some(turbomcp_protocol::types::RootsCapabilities {
1086 list_changed: Some(true), // Support dynamic roots by default
1087 })
1088 } else {
1089 None
1090 }
1091 }
1092}