turbomcp_client/client/core.rs
1//! Core Client implementation for MCP communication
2//!
3//! This module contains the main `Client<T>` struct and its implementation,
4//! providing the core MCP client functionality including:
5//!
6//! - Connection initialization and lifecycle management
7//! - Message processing and bidirectional communication
8//! - MCP operation support (tools, prompts, resources, sampling, etc.)
9//! - Plugin middleware integration
10//! - Handler registration and management
11//!
12//! # Architecture
13//!
14//! `Client<T>` is implemented as a cheaply-cloneable Arc wrapper with interior
15//! mutability (same pattern as reqwest and AWS SDK):
16//!
17//! - **AtomicBool** for initialized flag (lock-free)
18//! - **Arc<Mutex<...>>** for handlers/plugins (infrequent mutation)
19//! - **`Arc<ClientInner<T>>`** for cheap cloning
20
21use parking_lot::Mutex;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, Ordering};
24
25use tokio::sync::Semaphore;
26
27use turbomcp_protocol::jsonrpc::*;
28#[cfg(feature = "experimental-tasks")]
29use turbomcp_protocol::types::tasks::*;
30use turbomcp_protocol::types::{
31 ClientCapabilities as ProtocolClientCapabilities, InitializeResult as ProtocolInitializeResult,
32 *,
33};
34use turbomcp_protocol::{Error, PROTOCOL_VERSION, Result};
35use turbomcp_transport::{Transport, TransportConfig, TransportMessage};
36
37use super::config::InitializeResult;
38use super::protocol::ProtocolClient;
39use crate::{
40 ClientCapabilities,
41 handlers::{HandlerError, HandlerRegistry},
42 sampling::SamplingHandler,
43};
44
45/// Inner client state with interior mutability
46///
47/// This structure contains the actual client state and is wrapped in Arc<...>
48/// to enable cheap cloning (same pattern as reqwest and AWS SDK).
49pub(super) struct ClientInner<T: Transport + 'static> {
50 /// Protocol client for low-level communication
51 pub(super) protocol: ProtocolClient<T>,
52
53 /// Client capabilities (immutable after construction)
54 pub(super) capabilities: ClientCapabilities,
55
56 /// Initialization state (lock-free atomic boolean)
57 pub(super) initialized: AtomicBool,
58
59 /// Tracks whether graceful shutdown has already been requested.
60 pub(super) shutdown_requested: AtomicBool,
61
62 /// Optional sampling handler (mutex for dynamic updates)
63 pub(super) sampling_handler: Arc<Mutex<Option<Arc<dyn SamplingHandler>>>>,
64
65 /// Handler registry for bidirectional communication (mutex for registration)
66 pub(super) handlers: Arc<Mutex<HandlerRegistry>>,
67
68 /// ✅ Semaphore for bounded concurrency of request/notification handlers
69 /// Limits concurrent server-initiated request handlers to prevent resource exhaustion
70 pub(super) handler_semaphore: Arc<Semaphore>,
71}
72
73/// The core MCP client implementation
74///
75/// Client provides a comprehensive interface for communicating with MCP servers,
76/// supporting all protocol features including tools, prompts, resources, sampling,
77/// elicitation, and bidirectional communication patterns.
78///
79/// # Clone Pattern
80///
81/// `Client<T>` is cheaply cloneable via Arc (same pattern as reqwest and AWS SDK).
82/// All clones share the same underlying connection and state:
83///
84/// ```rust,no_run
85/// use turbomcp_client::Client;
86/// use turbomcp_transport::stdio::StdioTransport;
87///
88/// # async fn example() -> turbomcp_protocol::Result<()> {
89/// let client = Client::new(StdioTransport::new());
90/// client.initialize().await?;
91///
92/// // Cheap clone - shares same connection
93/// let client2 = client.clone();
94/// tokio::spawn(async move {
95/// client2.list_tools().await.ok();
96/// });
97/// # Ok(())
98/// # }
99/// ```
100///
101/// The client must be initialized before use by calling `initialize()` to perform
102/// the MCP handshake and capability negotiation.
103///
104/// # Features
105///
106/// - **Protocol Compliance**: Full current MCP specification support
107/// - **Bidirectional Communication**: Server-initiated requests and client responses
108/// - **Plugin Middleware**: Extensible request/response processing
109/// - **Handler Registry**: Callbacks for server-initiated operations
110/// - **Connection Management**: Robust error handling and recovery
111/// - **Type Safety**: Compile-time guarantees for MCP message types
112/// - **Cheap Cloning**: Arc-based sharing like reqwest/AWS SDK
113///
114/// # Examples
115///
116/// ```rust,no_run
117/// use turbomcp_client::Client;
118/// use turbomcp_transport::stdio::StdioTransport;
119/// use std::collections::HashMap;
120///
121/// # async fn example() -> turbomcp_protocol::Result<()> {
122/// // Create and initialize client (no mut needed!)
123/// let client = Client::new(StdioTransport::new());
124/// let init_result = client.initialize().await?;
125/// println!("Connected to: {}", init_result.server_info.name);
126///
127/// // Use MCP operations
128/// let tools = client.list_tools().await?;
129/// let mut args = HashMap::new();
130/// args.insert("input".to_string(), serde_json::json!("test"));
131/// let result = client.call_tool("my_tool", Some(args), None).await?;
132/// # Ok(())
133/// # }
134/// ```
135pub struct Client<T: Transport + 'static> {
136 pub(super) inner: Arc<ClientInner<T>>,
137}
138
139/// Clone implementation via Arc (same pattern as reqwest/AWS SDK)
140///
141/// Cloning a Client is cheap (just an Arc clone) and all clones share
142/// the same underlying connection and state.
143impl<T: Transport + 'static> Clone for Client<T> {
144 fn clone(&self) -> Self {
145 Self {
146 inner: Arc::clone(&self.inner),
147 }
148 }
149}
150
151impl<T: Transport + 'static> Drop for ClientInner<T> {
152 fn drop(&mut self) {
153 // Best-effort cleanup if shutdown() wasn't called
154 // This is a safety net, but applications SHOULD call shutdown() explicitly
155
156 // Single clear warning
157 if !self.shutdown_requested.load(Ordering::Relaxed) {
158 tracing::warn!(
159 "MCP Client dropped without explicit shutdown(). \
160 Call client.shutdown().await before dropping for clean resource cleanup. \
161 Background tasks (WebSocket reconnection) may continue running."
162 );
163 }
164
165 // We can shutdown the dispatcher (it's synchronous)
166 self.protocol.dispatcher().shutdown();
167 }
168}
169
170impl<T: Transport + 'static> Client<T> {
171 /// Create a new client with the specified transport
172 ///
173 /// Creates a new MCP client instance with default capabilities.
174 /// The client must be initialized before use by calling `initialize()`.
175 ///
176 /// # Arguments
177 ///
178 /// * `transport` - The transport implementation to use for communication
179 ///
180 /// # Examples
181 ///
182 /// ```rust,no_run
183 /// use turbomcp_client::Client;
184 /// use turbomcp_transport::stdio::StdioTransport;
185 ///
186 /// let transport = StdioTransport::new();
187 /// let client = Client::new(transport);
188 /// ```
189 pub fn new(transport: T) -> Self {
190 Self::new_with_config(transport, TransportConfig::default())
191 }
192
193 /// Create a new client with an explicit transport configuration.
194 pub fn new_with_config(transport: T, config: TransportConfig) -> Self {
195 let capabilities = ClientCapabilities::default();
196 let client = Self {
197 inner: Arc::new(ClientInner {
198 protocol: ProtocolClient::with_config(transport, config),
199 capabilities: capabilities.clone(),
200 initialized: AtomicBool::new(false),
201 shutdown_requested: AtomicBool::new(false),
202 sampling_handler: Arc::new(Mutex::new(None)),
203 handlers: Arc::new(Mutex::new(HandlerRegistry::new())),
204 handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), // ✅ Configurable concurrent handlers
205 }),
206 };
207
208 // Register dispatcher handlers for bidirectional communication
209 client.register_dispatcher_handlers();
210
211 client
212 }
213
214 /// Create a new client with custom capabilities
215 ///
216 /// # Arguments
217 ///
218 /// * `transport` - The transport implementation to use
219 /// * `capabilities` - The client capabilities to negotiate
220 ///
221 /// # Examples
222 ///
223 /// ```rust,no_run
224 /// use turbomcp_client::{Client, ClientCapabilities};
225 /// use turbomcp_transport::stdio::StdioTransport;
226 ///
227 /// let capabilities = ClientCapabilities {
228 /// tools: true,
229 /// prompts: true,
230 /// resources: false,
231 /// sampling: false,
232 /// max_concurrent_handlers: 100,
233 /// };
234 ///
235 /// let transport = StdioTransport::new();
236 /// let client = Client::with_capabilities(transport, capabilities);
237 /// ```
238 pub fn with_capabilities(transport: T, capabilities: ClientCapabilities) -> Self {
239 Self::with_capabilities_and_config(transport, capabilities, TransportConfig::default())
240 }
241
242 /// Create a new client with custom capabilities and transport configuration.
243 pub fn with_capabilities_and_config(
244 transport: T,
245 capabilities: ClientCapabilities,
246 config: TransportConfig,
247 ) -> Self {
248 let client = Self {
249 inner: Arc::new(ClientInner {
250 protocol: ProtocolClient::with_config(transport, config),
251 capabilities: capabilities.clone(),
252 initialized: AtomicBool::new(false),
253 shutdown_requested: AtomicBool::new(false),
254 sampling_handler: Arc::new(Mutex::new(None)),
255 handlers: Arc::new(Mutex::new(HandlerRegistry::new())),
256 handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), // ✅ Configurable concurrent handlers
257 }),
258 };
259
260 // Register dispatcher handlers for bidirectional communication
261 client.register_dispatcher_handlers();
262
263 client
264 }
265
266 /// Shutdown the client and clean up all resources
267 ///
268 /// This method performs a **graceful shutdown** of the MCP client by:
269 /// 1. Stopping the message dispatcher background task
270 /// 2. Disconnecting the transport (closes connection, stops background tasks)
271 ///
272 /// **CRITICAL**: After calling `shutdown()`, the client can no longer be used.
273 ///
274 /// # Why This Method Exists
275 ///
276 /// The `Drop` implementation cannot call async methods like `transport.disconnect()`,
277 /// which is required for proper cleanup of WebSocket connections and background tasks.
278 /// Without calling `shutdown()`, WebSocket reconnection tasks will continue running.
279 ///
280 /// # Best Practices
281 ///
282 /// - **Always call `shutdown()` before dropping** for clean resource cleanup
283 /// - For applications: call in signal handlers (`SIGINT`, `SIGTERM`)
284 /// - For tests: call in cleanup/teardown
285 /// - If forgotten, Drop will log warnings and do best-effort cleanup
286 ///
287 /// # Examples
288 ///
289 /// ```rust,no_run
290 /// # use turbomcp_client::Client;
291 /// # use turbomcp_transport::stdio::StdioTransport;
292 /// # async fn example() -> turbomcp_protocol::Result<()> {
293 /// let client = Client::new(StdioTransport::new());
294 /// client.initialize().await?;
295 ///
296 /// // Use client...
297 /// let _tools = client.list_tools().await?;
298 ///
299 /// // ✅ Clean shutdown
300 /// client.shutdown().await?;
301 /// # Ok(())
302 /// # }
303 /// ```
304 pub async fn shutdown(&self) -> Result<()> {
305 self.inner.shutdown_requested.store(true, Ordering::Relaxed);
306 tracing::info!("🛑 Shutting down MCP client");
307
308 // 1. Shutdown message dispatcher
309 self.inner.protocol.dispatcher().shutdown();
310 tracing::debug!("✅ Message dispatcher stopped");
311
312 // 2. Disconnect transport (WebSocket: stops reconnection, HTTP: closes connections)
313 match self.inner.protocol.transport().disconnect().await {
314 Ok(()) => {
315 tracing::info!("✅ Transport disconnected successfully");
316 }
317 Err(e) => {
318 tracing::warn!("Transport disconnect error (may already be closed): {}", e);
319 }
320 }
321
322 tracing::info!("✅ MCP client shutdown complete");
323 Ok(())
324 }
325}
326
327// ============================================================================
328// HTTP-Specific Convenience Constructors (Feature-Gated)
329// ============================================================================
330
331#[cfg(feature = "http")]
332impl Client<turbomcp_transport::streamable_http_client::StreamableHttpClientTransport> {
333 /// Connect to an HTTP MCP server (convenience method)
334 ///
335 /// This is a convenient one-liner alternative to manual configuration.
336 /// Creates an HTTP client, connects, and initializes in one call.
337 ///
338 /// # Arguments
339 ///
340 /// * `url` - The base URL of the MCP server (e.g., "http://localhost:8080")
341 ///
342 /// # Returns
343 ///
344 /// Returns an initialized `Client` ready to use.
345 ///
346 /// # Errors
347 ///
348 /// Returns an error if:
349 /// - The URL is invalid
350 /// - Connection to the server fails
351 /// - Initialization handshake fails
352 ///
353 /// # Examples
354 ///
355 /// ```rust,no_run
356 /// use turbomcp_client::Client;
357 ///
358 /// # async fn example() -> turbomcp_protocol::Result<()> {
359 /// // Convenient one-liner - balanced with server DX
360 /// let client = Client::connect_http("http://localhost:8080").await?;
361 ///
362 /// // Now use it directly
363 /// let tools = client.list_tools().await?;
364 /// # Ok(())
365 /// # }
366 /// ```
367 ///
368 /// Compare to verbose approach (10+ lines):
369 /// ```rust,no_run
370 /// use turbomcp_client::Client;
371 /// use turbomcp_transport::streamable_http_client::{
372 /// StreamableHttpClientConfig, StreamableHttpClientTransport
373 /// };
374 ///
375 /// # async fn example() -> turbomcp_protocol::Result<()> {
376 /// let config = StreamableHttpClientConfig {
377 /// base_url: "http://localhost:8080".to_string(),
378 /// ..Default::default()
379 /// };
380 /// let transport = StreamableHttpClientTransport::new(config);
381 /// let client = Client::new(transport);
382 /// client.initialize().await?;
383 /// # Ok(())
384 /// # }
385 /// ```
386 pub async fn connect_http(url: impl Into<String>) -> Result<Self> {
387 use turbomcp_transport::streamable_http_client::{
388 StreamableHttpClientConfig, StreamableHttpClientTransport,
389 };
390
391 let config = StreamableHttpClientConfig {
392 base_url: url.into(),
393 ..Default::default()
394 };
395
396 let transport = StreamableHttpClientTransport::new(config);
397 let client = Self::new(transport);
398
399 // Initialize connection immediately
400 client.initialize().await?;
401
402 Ok(client)
403 }
404
405 /// Connect to an HTTP MCP server with custom configuration
406 ///
407 /// Provides more control than `connect_http()` while still being ergonomic.
408 ///
409 /// # Arguments
410 ///
411 /// * `url` - The base URL of the MCP server
412 /// * `config_fn` - Function to customize the configuration
413 ///
414 /// # Examples
415 ///
416 /// ```rust,no_run
417 /// use turbomcp_client::Client;
418 /// use std::time::Duration;
419 ///
420 /// # async fn example() -> turbomcp_protocol::Result<()> {
421 /// let client = Client::connect_http_with("http://localhost:8080", |config| {
422 /// config.timeout = Duration::from_secs(60);
423 /// config.endpoint_path = "/api/mcp".to_string();
424 /// }).await?;
425 /// # Ok(())
426 /// # }
427 /// ```
428 pub async fn connect_http_with<F>(url: impl Into<String>, config_fn: F) -> Result<Self>
429 where
430 F: FnOnce(&mut turbomcp_transport::streamable_http_client::StreamableHttpClientConfig),
431 {
432 use turbomcp_transport::streamable_http_client::{
433 StreamableHttpClientConfig, StreamableHttpClientTransport,
434 };
435
436 let mut config = StreamableHttpClientConfig {
437 base_url: url.into(),
438 ..Default::default()
439 };
440
441 config_fn(&mut config);
442
443 let transport = StreamableHttpClientTransport::new(config);
444 let client = Self::new(transport);
445
446 client.initialize().await?;
447
448 Ok(client)
449 }
450}
451
452// ============================================================================
453// TCP-Specific Convenience Constructors (Feature-Gated)
454// ============================================================================
455
456#[cfg(feature = "tcp")]
457impl Client<turbomcp_transport::tcp::TcpTransport> {
458 /// Connect to a TCP MCP server (convenience method)
459 ///
460 /// Convenient one-liner for TCP connections - balanced DX.
461 ///
462 /// # Arguments
463 ///
464 /// * `addr` - Server address (e.g., "127.0.0.1:8765" or localhost:8765")
465 ///
466 /// # Returns
467 ///
468 /// Returns an initialized `Client` ready to use.
469 ///
470 /// # Examples
471 ///
472 /// ```rust,no_run
473 /// # #[cfg(feature = "tcp")]
474 /// use turbomcp_client::Client;
475 ///
476 /// # async fn example() -> turbomcp_protocol::Result<()> {
477 /// let client = Client::connect_tcp("127.0.0.1:8765").await?;
478 /// let tools = client.list_tools().await?;
479 /// # Ok(())
480 /// # }
481 /// ```
482 pub async fn connect_tcp(addr: impl AsRef<str>) -> Result<Self> {
483 use std::net::SocketAddr;
484 use turbomcp_transport::tcp::TcpTransport;
485
486 let server_addr: SocketAddr = addr
487 .as_ref()
488 .parse()
489 .map_err(|e| Error::invalid_request(format!("Invalid address: {}", e)))?;
490
491 // Client binds to 0.0.0.0:0 (any available port)
492 let bind_addr: SocketAddr = if server_addr.is_ipv6() {
493 "[::]:0".parse().expect("valid IPv6 any-port address")
494 } else {
495 "0.0.0.0:0".parse().expect("valid IPv4 any-port address")
496 };
497
498 let transport = TcpTransport::new_client(bind_addr, server_addr);
499 let client = Self::new(transport);
500
501 client.initialize().await?;
502
503 Ok(client)
504 }
505}
506
507// ============================================================================
508// Unix Socket-Specific Convenience Constructors (Feature-Gated)
509// ============================================================================
510
511#[cfg(all(unix, feature = "unix"))]
512impl Client<turbomcp_transport::unix::UnixTransport> {
513 /// Connect to a Unix socket MCP server (convenience method)
514 ///
515 /// Convenient one-liner for Unix socket IPC - balanced DX.
516 ///
517 /// # Arguments
518 ///
519 /// * `path` - Socket file path (e.g., "/tmp/mcp.sock")
520 ///
521 /// # Returns
522 ///
523 /// Returns an initialized `Client` ready to use.
524 ///
525 /// # Examples
526 ///
527 /// ```rust,no_run
528 /// # #[cfg(all(unix, feature = "unix"))]
529 /// use turbomcp_client::Client;
530 ///
531 /// # async fn example() -> turbomcp_protocol::Result<()> {
532 /// let client = Client::connect_unix("/tmp/mcp.sock").await?;
533 /// let tools = client.list_tools().await?;
534 /// # Ok(())
535 /// # }
536 /// ```
537 pub async fn connect_unix(path: impl Into<std::path::PathBuf>) -> Result<Self> {
538 use turbomcp_transport::unix::UnixTransport;
539
540 let transport = UnixTransport::new_client(path.into());
541 let client = Self::new(transport);
542
543 client.initialize().await?;
544
545 Ok(client)
546 }
547}
548
549impl<T: Transport + 'static> Client<T> {
550 /// Register message handlers with the dispatcher
551 ///
552 /// This method sets up the callbacks that handle server-initiated requests
553 /// and notifications. The dispatcher's background task routes incoming
554 /// messages to these handlers.
555 ///
556 /// This is called automatically during Client construction (in `new()` and
557 /// `with_capabilities()`), so you don't need to call it manually.
558 ///
559 /// ## How It Works
560 ///
561 /// The handlers are synchronous closures that spawn async tasks to do the
562 /// actual work. This allows the dispatcher to continue routing messages
563 /// without blocking on handler execution.
564 fn register_dispatcher_handlers(&self) {
565 let dispatcher = self.inner.protocol.dispatcher();
566 let client_for_requests = self.clone();
567 let client_for_notifications = self.clone();
568
569 // Request handler (elicitation, sampling, etc.)
570 let semaphore = Arc::clone(&self.inner.handler_semaphore);
571 let request_handler = Arc::new(move |request: JsonRpcRequest| {
572 let client = client_for_requests.clone();
573 let method = request.method.clone();
574 let req_id = request.id.clone();
575 let semaphore = Arc::clone(&semaphore);
576
577 // ✅ Spawn async task with bounded concurrency
578 tokio::spawn(async move {
579 // Acquire permit (blocks if 100 requests already in flight)
580 let _permit = match semaphore.acquire().await {
581 Ok(permit) => permit,
582 Err(_) => {
583 tracing::warn!(
584 "Handler semaphore closed, dropping request: method={}",
585 method
586 );
587 return;
588 }
589 };
590
591 tracing::debug!(
592 "🔄 [request_handler] Handling server-initiated request: method={}, id={:?}",
593 method,
594 req_id
595 );
596 if let Err(e) = client.handle_request(request).await {
597 tracing::error!(
598 "❌ [request_handler] Error handling server request '{}': {}",
599 method,
600 e
601 );
602 tracing::error!(" Request ID: {:?}", req_id);
603 tracing::error!(" Error kind: {:?}", e.kind);
604 } else {
605 tracing::debug!(
606 "✅ [request_handler] Successfully handled server request: method={}, id={:?}",
607 method,
608 req_id
609 );
610 }
611 // Permit automatically released on drop ✅
612 });
613 Ok(())
614 });
615
616 // Notification handler
617 let semaphore_notif = Arc::clone(&self.inner.handler_semaphore);
618 let notification_handler = Arc::new(move |notification: JsonRpcNotification| {
619 let client = client_for_notifications.clone();
620 let semaphore = Arc::clone(&semaphore_notif);
621
622 // ✅ Spawn async task with bounded concurrency
623 tokio::spawn(async move {
624 // Acquire permit (blocks if 100 handlers already in flight)
625 let _permit = match semaphore.acquire().await {
626 Ok(permit) => permit,
627 Err(_) => {
628 tracing::warn!("Handler semaphore closed, dropping notification");
629 return;
630 }
631 };
632
633 if let Err(e) = client.handle_notification(notification).await {
634 tracing::error!("Error handling server notification: {}", e);
635 }
636 // Permit automatically released on drop ✅
637 });
638 Ok(())
639 });
640
641 // Register handlers synchronously - no race condition!
642 // The set_* methods are now synchronous with std::sync::Mutex
643 dispatcher.set_request_handler(request_handler);
644 dispatcher.set_notification_handler(notification_handler);
645 tracing::debug!("Dispatcher handlers registered successfully");
646 }
647
648 /// Handle server-initiated requests (elicitation, sampling, roots)
649 ///
650 /// This method is called by the MessageDispatcher when it receives a request
651 /// from the server. It routes the request to the appropriate handler based on
652 /// the method name.
653 async fn handle_request(&self, request: JsonRpcRequest) -> Result<()> {
654 match request.method.as_str() {
655 "sampling/createMessage" => {
656 let handler_opt = self.inner.sampling_handler.lock().clone();
657 if let Some(handler) = handler_opt {
658 // Extract request ID for proper correlation
659 let request_id = match &request.id {
660 turbomcp_protocol::MessageId::String(s) => s.clone(),
661 turbomcp_protocol::MessageId::Number(n) => n.to_string(),
662 turbomcp_protocol::MessageId::Uuid(u) => u.to_string(),
663 };
664
665 let params: CreateMessageRequest =
666 serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
667 .map_err(|e| {
668 Error::internal(format!("Invalid createMessage params: {}", e))
669 })?;
670
671 match handler.handle_create_message(request_id, params).await {
672 Ok(result) => {
673 let result_value = serde_json::to_value(result).map_err(|e| {
674 Error::internal(format!("Failed to serialize response: {}", e))
675 })?;
676 let response = JsonRpcResponse::success(result_value, request.id);
677 self.send_response(response).await?;
678 }
679 Err(e) => {
680 tracing::warn!(
681 "⚠️ [handle_request] Sampling handler returned error: {}",
682 e
683 );
684
685 // Preserve error semantics by checking actual error type
686 // This allows proper error code propagation for retry logic
687 let (code, message) = if let Some(handler_err) =
688 e.downcast_ref::<HandlerError>()
689 {
690 // HandlerError has explicit JSON-RPC code mapping
691 let json_err = handler_err.into_jsonrpc_error();
692 tracing::info!(
693 "📋 [handle_request] HandlerError mapped to JSON-RPC code: {}",
694 json_err.code
695 );
696 (json_err.code, json_err.message)
697 } else if let Some(proto_err) =
698 e.downcast_ref::<turbomcp_protocol::Error>()
699 {
700 // Protocol errors have ErrorKind-based mapping
701 tracing::info!(
702 "📋 [handle_request] Protocol error mapped to code: {}",
703 proto_err.jsonrpc_error_code()
704 );
705 (proto_err.jsonrpc_error_code(), proto_err.to_string())
706 } else {
707 // Generic errors default to Internal (-32603)
708 // Log the error type for debugging (should rarely hit this path)
709 tracing::warn!(
710 "📋 [handle_request] Sampling handler returned unknown error type (not HandlerError or Protocol error): {}",
711 std::any::type_name_of_val(&*e)
712 );
713 (-32603, format!("Sampling handler error: {}", e))
714 };
715
716 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
717 code,
718 message,
719 data: None,
720 };
721 let response =
722 JsonRpcResponse::error_response(error, request.id.clone());
723
724 tracing::info!(
725 "🔄 [handle_request] Attempting to send error response for request: {:?}",
726 request.id
727 );
728 self.send_response(response).await?;
729 tracing::info!(
730 "✅ [handle_request] Error response sent successfully for request: {:?}",
731 request.id
732 );
733 }
734 }
735 } else {
736 // No handler configured
737 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
738 code: -32601,
739 message: "Sampling not supported".to_string(),
740 data: None,
741 };
742 let response = JsonRpcResponse::error_response(error, request.id);
743 self.send_response(response).await?;
744 }
745 }
746 "roots/list" => {
747 // Handle roots/list request from server
748 // Clone the handler Arc to avoid holding mutex across await
749 let handler_opt = self.inner.handlers.lock().roots.clone();
750
751 let roots_result = if let Some(handler) = handler_opt {
752 handler.handle_roots_request().await
753 } else {
754 // No handler - return empty list per MCP spec
755 Ok(Vec::new())
756 };
757
758 match roots_result {
759 Ok(roots) => {
760 let result_value =
761 serde_json::to_value(turbomcp_protocol::types::ListRootsResult {
762 roots,
763 _meta: None,
764 })
765 .map_err(|e| {
766 Error::internal(format!(
767 "Failed to serialize roots response: {}",
768 e
769 ))
770 })?;
771 let response = JsonRpcResponse::success(result_value, request.id);
772 self.send_response(response).await?;
773 }
774 Err(e) => {
775 // FIXED: Extract error code from HandlerError instead of hardcoding -32603
776 // Roots handler returns HandlerResult<Vec<Root>>, so error is HandlerError
777 let json_err = e.into_jsonrpc_error();
778 let response = JsonRpcResponse::error_response(json_err, request.id);
779 self.send_response(response).await?;
780 }
781 }
782 }
783 "elicitation/create" => {
784 // Clone handler Arc before await to avoid holding mutex across await
785 let handler_opt = self.inner.handlers.lock().elicitation.clone();
786 if let Some(handler) = handler_opt {
787 // Parse elicitation request params as MCP protocol type
788 let proto_request: turbomcp_protocol::types::ElicitRequest =
789 serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
790 .map_err(|e| {
791 Error::internal(format!("Invalid elicitation params: {}", e))
792 })?;
793
794 // Wrap protocol request with ID for handler (preserves type safety!)
795 let handler_request =
796 crate::handlers::ElicitationRequest::new(request.id.clone(), proto_request);
797
798 // Call the registered elicitation handler
799 match handler.handle_elicitation(handler_request).await {
800 Ok(elicit_response) => {
801 // Convert handler response back to protocol type
802 let proto_result = elicit_response.into_protocol();
803 let result_value = serde_json::to_value(proto_result).map_err(|e| {
804 Error::internal(format!(
805 "Failed to serialize elicitation response: {}",
806 e
807 ))
808 })?;
809 let response = JsonRpcResponse::success(result_value, request.id);
810 self.send_response(response).await?;
811 }
812 Err(e) => {
813 // Convert handler error to JSON-RPC error using centralized mapping
814 let response =
815 JsonRpcResponse::error_response(e.into_jsonrpc_error(), request.id);
816 self.send_response(response).await?;
817 }
818 }
819 } else {
820 // No handler configured - elicitation not supported
821 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
822 code: -32601,
823 message: "Elicitation not supported - no handler registered".to_string(),
824 data: None,
825 };
826 let response = JsonRpcResponse::error_response(error, request.id);
827 self.send_response(response).await?;
828 }
829 }
830 _ => {
831 // Unknown method
832 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
833 code: -32601,
834 message: format!("Method not found: {}", request.method),
835 data: None,
836 };
837 let response = JsonRpcResponse::error_response(error, request.id);
838 self.send_response(response).await?;
839 }
840 }
841 Ok(())
842 }
843
844 /// Handle server-initiated notifications
845 ///
846 /// Routes notifications to appropriate handlers based on method name.
847 /// MCP defines several notification types that servers can send to clients:
848 ///
849 /// - `notifications/progress` - Progress updates for long-running operations
850 /// - `notifications/message` - Log messages from server
851 /// - `notifications/resources/updated` - Resource content changed
852 /// - `notifications/resources/list_changed` - Resource list changed
853 async fn handle_notification(&self, notification: JsonRpcNotification) -> Result<()> {
854 match notification.method.as_str() {
855 "notifications/progress" => {
856 // Route to progress handler
857 let handler_opt = self.inner.handlers.lock().get_progress_handler();
858
859 if let Some(handler) = handler_opt {
860 let progress: crate::handlers::ProgressNotification = serde_json::from_value(
861 notification.params.unwrap_or(serde_json::Value::Null),
862 )
863 .map_err(|e| {
864 Error::internal(format!("Invalid progress notification: {}", e))
865 })?;
866
867 if let Err(e) = handler.handle_progress(progress).await {
868 tracing::error!("Progress handler error: {}", e);
869 }
870 } else {
871 tracing::debug!("Progress notification received (no handler registered)");
872 }
873 }
874
875 "notifications/message" => {
876 // Route to log handler
877 let handler_opt = self.inner.handlers.lock().get_log_handler();
878
879 if let Some(handler) = handler_opt {
880 // Parse log message
881 let log: crate::handlers::LoggingNotification = serde_json::from_value(
882 notification.params.unwrap_or(serde_json::Value::Null),
883 )
884 .map_err(|e| Error::internal(format!("Invalid log notification: {}", e)))?;
885
886 // Call handler
887 if let Err(e) = handler.handle_log(log).await {
888 tracing::error!("Log handler error: {}", e);
889 }
890 } else {
891 tracing::debug!("Received log notification but no handler registered");
892 }
893 }
894
895 "notifications/resources/updated" => {
896 // Route to resource update handler
897 let handler_opt = self.inner.handlers.lock().get_resource_update_handler();
898
899 if let Some(handler) = handler_opt {
900 // Parse resource update notification
901 let update: crate::handlers::ResourceUpdatedNotification =
902 serde_json::from_value(
903 notification.params.unwrap_or(serde_json::Value::Null),
904 )
905 .map_err(|e| {
906 Error::internal(format!("Invalid resource update notification: {}", e))
907 })?;
908
909 // Call handler
910 if let Err(e) = handler.handle_resource_update(update).await {
911 tracing::error!("Resource update handler error: {}", e);
912 }
913 } else {
914 tracing::debug!(
915 "Received resource update notification but no handler registered"
916 );
917 }
918 }
919
920 "notifications/resources/list_changed" => {
921 // Route to resource list changed handler
922 let handler_opt = self
923 .inner
924 .handlers
925 .lock()
926 .get_resource_list_changed_handler();
927
928 if let Some(handler) = handler_opt {
929 if let Err(e) = handler.handle_resource_list_changed().await {
930 tracing::error!("Resource list changed handler error: {}", e);
931 }
932 } else {
933 tracing::debug!(
934 "Resource list changed notification received (no handler registered)"
935 );
936 }
937 }
938
939 "notifications/prompts/list_changed" => {
940 // Route to prompt list changed handler
941 let handler_opt = self.inner.handlers.lock().get_prompt_list_changed_handler();
942
943 if let Some(handler) = handler_opt {
944 if let Err(e) = handler.handle_prompt_list_changed().await {
945 tracing::error!("Prompt list changed handler error: {}", e);
946 }
947 } else {
948 tracing::debug!(
949 "Prompt list changed notification received (no handler registered)"
950 );
951 }
952 }
953
954 "notifications/tools/list_changed" => {
955 // Route to tool list changed handler
956 let handler_opt = self.inner.handlers.lock().get_tool_list_changed_handler();
957
958 if let Some(handler) = handler_opt {
959 if let Err(e) = handler.handle_tool_list_changed().await {
960 tracing::error!("Tool list changed handler error: {}", e);
961 }
962 } else {
963 tracing::debug!(
964 "Tool list changed notification received (no handler registered)"
965 );
966 }
967 }
968
969 "notifications/cancelled" => {
970 // Route to cancellation handler
971 let handler_opt = self.inner.handlers.lock().get_cancellation_handler();
972
973 if let Some(handler) = handler_opt {
974 // Parse cancellation notification
975 let cancellation: crate::handlers::CancelledNotification =
976 serde_json::from_value(
977 notification.params.unwrap_or(serde_json::Value::Null),
978 )
979 .map_err(|e| {
980 Error::internal(format!("Invalid cancellation notification: {}", e))
981 })?;
982
983 // Call handler
984 if let Err(e) = handler.handle_cancellation(cancellation).await {
985 tracing::error!("Cancellation handler error: {}", e);
986 }
987 } else {
988 tracing::debug!("Cancellation notification received (no handler registered)");
989 }
990 }
991
992 _ => {
993 // Unknown notification type
994 tracing::debug!("Received unknown notification: {}", notification.method);
995 }
996 }
997
998 Ok(())
999 }
1000
1001 async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
1002 tracing::info!(
1003 "📤 [send_response] Sending JSON-RPC response: id={:?}",
1004 response.id
1005 );
1006
1007 let payload = serde_json::to_vec(&response).map_err(|e| {
1008 tracing::error!("❌ [send_response] Failed to serialize response: {}", e);
1009 Error::internal(format!("Failed to serialize response: {}", e))
1010 })?;
1011
1012 tracing::debug!(
1013 "📤 [send_response] Response payload: {} bytes",
1014 payload.len()
1015 );
1016 tracing::debug!(
1017 "📤 [send_response] Response JSON: {}",
1018 String::from_utf8_lossy(&payload)
1019 );
1020
1021 let message = TransportMessage::new(
1022 turbomcp_protocol::MessageId::from("response".to_string()),
1023 payload.into(),
1024 );
1025
1026 self.inner
1027 .protocol
1028 .transport()
1029 .send(message)
1030 .await
1031 .map_err(|e| {
1032 tracing::error!("❌ [send_response] Transport send failed: {}", e);
1033 Error::transport(format!("Failed to send response: {}", e))
1034 })?;
1035
1036 tracing::info!(
1037 "✅ [send_response] Response sent successfully: id={:?}",
1038 response.id
1039 );
1040 Ok(())
1041 }
1042
1043 /// Initialize the connection with the MCP server
1044 ///
1045 /// Performs the initialization handshake with the server, negotiating capabilities
1046 /// and establishing the protocol version. This method must be called before
1047 /// any other operations can be performed.
1048 ///
1049 /// # Returns
1050 ///
1051 /// Returns an `InitializeResult` containing server information and negotiated capabilities.
1052 ///
1053 /// # Errors
1054 ///
1055 /// Returns an error if:
1056 /// - The transport connection fails
1057 /// - The server rejects the initialization request
1058 /// - Protocol negotiation fails
1059 ///
1060 /// # Examples
1061 ///
1062 /// ```rust,no_run
1063 /// # use turbomcp_client::Client;
1064 /// # use turbomcp_transport::stdio::StdioTransport;
1065 /// # async fn example() -> turbomcp_protocol::Result<()> {
1066 /// let mut client = Client::new(StdioTransport::new());
1067 ///
1068 /// let result = client.initialize().await?;
1069 /// println!("Server: {} v{}", result.server_info.name, result.server_info.version);
1070 /// # Ok(())
1071 /// # }
1072 /// ```
1073 pub async fn initialize(&self) -> Result<InitializeResult> {
1074 // Auto-connect transport if not already connected
1075 // This provides consistent DX across all transports (Stdio, TCP, HTTP, WebSocket, Unix)
1076 let transport = self.inner.protocol.transport();
1077 let transport_state = transport.state().await;
1078 if !matches!(
1079 transport_state,
1080 turbomcp_transport::TransportState::Connected
1081 ) {
1082 tracing::debug!(
1083 "Auto-connecting transport (current state: {:?})",
1084 transport_state
1085 );
1086 transport
1087 .connect()
1088 .await
1089 .map_err(|e| Error::transport(format!("Failed to connect transport: {}", e)))?;
1090 tracing::info!("Transport connected successfully");
1091 }
1092
1093 // Build client capabilities based on registered handlers (automatic detection)
1094 let mut client_caps = ProtocolClientCapabilities::default();
1095
1096 // Detect sampling capability from handler
1097 if let Some(sampling_caps) = self.get_sampling_capabilities() {
1098 client_caps.sampling = Some(sampling_caps);
1099 }
1100
1101 // Detect elicitation capability from handler
1102 if let Some(elicitation_caps) = self.get_elicitation_capabilities() {
1103 client_caps.elicitation = Some(elicitation_caps);
1104 }
1105
1106 // Detect roots capability from handler
1107 if let Some(roots_caps) = self.get_roots_capabilities() {
1108 client_caps.roots = Some(roots_caps);
1109 }
1110
1111 // Send MCP initialization request
1112 let request = InitializeRequest {
1113 protocol_version: PROTOCOL_VERSION.to_string(),
1114 capabilities: client_caps,
1115 client_info: turbomcp_protocol::types::Implementation {
1116 name: "turbomcp-client".to_string(),
1117 version: env!("CARGO_PKG_VERSION").to_string(),
1118 title: Some("TurboMCP Client".to_string()),
1119 ..Default::default()
1120 },
1121 _meta: None,
1122 };
1123
1124 let protocol_response: ProtocolInitializeResult = self
1125 .inner
1126 .protocol
1127 .request("initialize", Some(serde_json::to_value(request)?))
1128 .await?;
1129
1130 // AtomicBool: lock-free store with Ordering::Relaxed
1131 self.inner.initialized.store(true, Ordering::Relaxed);
1132
1133 // Send initialized notification
1134 self.inner
1135 .protocol
1136 .notify("notifications/initialized", None)
1137 .await?;
1138
1139 // Convert protocol response to client response type
1140 Ok(InitializeResult {
1141 server_info: protocol_response.server_info,
1142 server_capabilities: protocol_response.capabilities,
1143 })
1144 }
1145
1146 /// Subscribe to resource change notifications
1147 ///
1148 /// Registers interest in receiving notifications when the specified
1149 /// resource changes. The server will send notifications when the
1150 /// resource is modified, created, or deleted.
1151 ///
1152 /// # Arguments
1153 ///
1154 /// * `uri` - The URI of the resource to monitor
1155 ///
1156 /// # Returns
1157 ///
1158 /// Returns `EmptyResult` on successful subscription.
1159 ///
1160 /// # Errors
1161 ///
1162 /// Returns an error if:
1163 /// - The client is not initialized
1164 /// - The URI is invalid or empty
1165 /// - The server doesn't support subscriptions
1166 /// - The request fails
1167 ///
1168 /// # Examples
1169 ///
1170 /// ```rust,no_run
1171 /// # use turbomcp_client::Client;
1172 /// # use turbomcp_transport::stdio::StdioTransport;
1173 /// # async fn example() -> turbomcp_protocol::Result<()> {
1174 /// let mut client = Client::new(StdioTransport::new());
1175 /// client.initialize().await?;
1176 ///
1177 /// // Subscribe to file changes
1178 /// client.subscribe("file:///watch/directory").await?;
1179 /// println!("Subscribed to resource changes");
1180 /// # Ok(())
1181 /// # }
1182 /// ```
1183 pub async fn subscribe(&self, uri: &str) -> Result<EmptyResult> {
1184 if !self.inner.initialized.load(Ordering::Relaxed) {
1185 return Err(Error::invalid_request("Client not initialized"));
1186 }
1187
1188 if uri.is_empty() {
1189 return Err(Error::invalid_request("Subscription URI cannot be empty"));
1190 }
1191
1192 // Send resources/subscribe request
1193 let request = SubscribeRequest { uri: uri.into() };
1194
1195 self.inner
1196 .protocol
1197 .request(
1198 "resources/subscribe",
1199 Some(serde_json::to_value(request).map_err(|e| {
1200 Error::internal(format!("Failed to serialize subscribe request: {}", e))
1201 })?),
1202 )
1203 .await
1204 }
1205
1206 /// Unsubscribe from resource change notifications
1207 ///
1208 /// Cancels a previous subscription to resource changes. After unsubscribing,
1209 /// the client will no longer receive notifications for the specified resource.
1210 ///
1211 /// # Arguments
1212 ///
1213 /// * `uri` - The URI of the resource to stop monitoring
1214 ///
1215 /// # Returns
1216 ///
1217 /// Returns `EmptyResult` on successful unsubscription.
1218 ///
1219 /// # Errors
1220 ///
1221 /// Returns an error if:
1222 /// - The client is not initialized
1223 /// - The URI is invalid or empty
1224 /// - No active subscription exists for the URI
1225 /// - The request fails
1226 ///
1227 /// # Examples
1228 ///
1229 /// ```rust,no_run
1230 /// # use turbomcp_client::Client;
1231 /// # use turbomcp_transport::stdio::StdioTransport;
1232 /// # async fn example() -> turbomcp_protocol::Result<()> {
1233 /// let mut client = Client::new(StdioTransport::new());
1234 /// client.initialize().await?;
1235 ///
1236 /// // Unsubscribe from file changes
1237 /// client.unsubscribe("file:///watch/directory").await?;
1238 /// println!("Unsubscribed from resource changes");
1239 /// # Ok(())
1240 /// # }
1241 /// ```
1242 pub async fn unsubscribe(&self, uri: &str) -> Result<EmptyResult> {
1243 if !self.inner.initialized.load(Ordering::Relaxed) {
1244 return Err(Error::invalid_request("Client not initialized"));
1245 }
1246
1247 if uri.is_empty() {
1248 return Err(Error::invalid_request("Unsubscription URI cannot be empty"));
1249 }
1250
1251 // Send resources/unsubscribe request
1252 let request = UnsubscribeRequest { uri: uri.into() };
1253
1254 self.inner
1255 .protocol
1256 .request(
1257 "resources/unsubscribe",
1258 Some(serde_json::to_value(request).map_err(|e| {
1259 Error::internal(format!("Failed to serialize unsubscribe request: {}", e))
1260 })?),
1261 )
1262 .await
1263 }
1264
1265 /// Get the client's capabilities configuration
1266 #[must_use]
1267 pub fn capabilities(&self) -> &ClientCapabilities {
1268 &self.inner.capabilities
1269 }
1270
1271 // ============================================================================
1272 // Tasks API Methods (MCP 2025-11-25 Draft - SEP-1686)
1273 // ============================================================================
1274
1275 /// Retrieve the status of a task (tasks/get)
1276 ///
1277 /// Polls the server for the current status of a specific task.
1278 ///
1279 /// # Arguments
1280 ///
1281 /// * `task_id` - The ID of the task to query
1282 ///
1283 /// # Returns
1284 ///
1285 /// Returns the current `Task` state including status, timestamps, and messages.
1286 #[cfg(feature = "experimental-tasks")]
1287 pub async fn get_task(&self, task_id: &str) -> Result<Task> {
1288 let request = GetTaskRequest {
1289 task_id: task_id.to_string(),
1290 };
1291
1292 self.inner
1293 .protocol
1294 .request(
1295 "tasks/get",
1296 Some(serde_json::to_value(request).map_err(|e| {
1297 Error::internal(format!("Failed to serialize get_task request: {}", e))
1298 })?),
1299 )
1300 .await
1301 }
1302
1303 /// Cancel a running task (tasks/cancel)
1304 ///
1305 /// Attempts to cancel a task execution. This is a best-effort operation.
1306 ///
1307 /// # Arguments
1308 ///
1309 /// * `task_id` - The ID of the task to cancel
1310 ///
1311 /// # Returns
1312 ///
1313 /// Returns the updated `Task` state (typically with status "cancelled").
1314 #[cfg(feature = "experimental-tasks")]
1315 pub async fn cancel_task(&self, task_id: &str) -> Result<Task> {
1316 let request = CancelTaskRequest {
1317 task_id: task_id.to_string(),
1318 };
1319
1320 self.inner
1321 .protocol
1322 .request(
1323 "tasks/cancel",
1324 Some(serde_json::to_value(request).map_err(|e| {
1325 Error::internal(format!("Failed to serialize cancel_task request: {}", e))
1326 })?),
1327 )
1328 .await
1329 }
1330
1331 /// List all tasks (tasks/list)
1332 ///
1333 /// Retrieves a paginated list of tasks known to the server.
1334 ///
1335 /// # Arguments
1336 ///
1337 /// * `cursor` - Optional pagination cursor from a previous response
1338 /// * `limit` - Optional maximum number of tasks to return
1339 ///
1340 /// # Returns
1341 ///
1342 /// Returns a `ListTasksResult` containing the list of tasks and next cursor.
1343 #[cfg(feature = "experimental-tasks")]
1344 pub async fn list_tasks(
1345 &self,
1346 cursor: Option<String>,
1347 limit: Option<usize>,
1348 ) -> Result<ListTasksResult> {
1349 let request = ListTasksRequest { cursor, limit };
1350
1351 self.inner
1352 .protocol
1353 .request(
1354 "tasks/list",
1355 Some(serde_json::to_value(request).map_err(|e| {
1356 Error::internal(format!("Failed to serialize list_tasks request: {}", e))
1357 })?),
1358 )
1359 .await
1360 }
1361
1362 /// Retrieve the result of a completed task (tasks/result)
1363 ///
1364 /// Blocks until the task reaches a terminal state (completed, failed, or cancelled),
1365 /// then returns the operation result.
1366 ///
1367 /// # Arguments
1368 ///
1369 /// * `task_id` - The ID of the task to retrieve results for
1370 ///
1371 /// # Returns
1372 ///
1373 /// Returns a `GetTaskPayloadResult` containing the operation result (e.g. CallToolResult).
1374 #[cfg(feature = "experimental-tasks")]
1375 pub async fn get_task_result(&self, task_id: &str) -> Result<GetTaskPayloadResult> {
1376 let request = GetTaskPayloadRequest {
1377 task_id: task_id.to_string(),
1378 };
1379
1380 self.inner
1381 .protocol
1382 .request(
1383 "tasks/result",
1384 Some(serde_json::to_value(request).map_err(|e| {
1385 Error::internal(format!(
1386 "Failed to serialize get_task_result request: {}",
1387 e
1388 ))
1389 })?),
1390 )
1391 .await
1392 }
1393
1394 // Note: Capability detection methods (has_*_handler, get_*_capabilities)
1395 // are defined in their respective operation modules:
1396 // - sampling.rs: has_sampling_handler, get_sampling_capabilities
1397 // - handlers.rs: has_elicitation_handler, has_roots_handler
1398 //
1399 // Additional capability getters for elicitation and roots added below
1400 // since they're used during initialization
1401
1402 /// Get elicitation capabilities if handler is registered
1403 /// Automatically detects capability based on registered handler
1404 fn get_elicitation_capabilities(
1405 &self,
1406 ) -> Option<turbomcp_protocol::types::ElicitationCapabilities> {
1407 if self.has_elicitation_handler() {
1408 // Currently returns default capabilities. In the future, schema_validation support
1409 // could be detected from handler traits by adding a HasSchemaValidation marker trait
1410 // that handlers could implement. For now, handlers validate schemas themselves.
1411 Some(turbomcp_protocol::types::ElicitationCapabilities::default())
1412 } else {
1413 None
1414 }
1415 }
1416
1417 /// Get roots capabilities if handler is registered
1418 fn get_roots_capabilities(&self) -> Option<turbomcp_protocol::types::RootsCapabilities> {
1419 if self.has_roots_handler() {
1420 // Roots capabilities indicate whether list can change
1421 Some(turbomcp_protocol::types::RootsCapabilities {
1422 list_changed: Some(true), // Support dynamic roots by default
1423 })
1424 } else {
1425 None
1426 }
1427 }
1428}
1429
1430#[cfg(test)]
1431mod tests {
1432 use super::*;
1433 use std::future::Future;
1434 use std::pin::Pin;
1435 use turbomcp_transport::{
1436 TransportCapabilities, TransportConfig, TransportMessage, TransportMetrics,
1437 TransportResult, TransportState, TransportType,
1438 };
1439
1440 #[derive(Debug, Default)]
1441 struct NoopTransport {
1442 capabilities: TransportCapabilities,
1443 }
1444
1445 impl Transport for NoopTransport {
1446 fn transport_type(&self) -> TransportType {
1447 TransportType::Stdio
1448 }
1449
1450 fn capabilities(&self) -> &TransportCapabilities {
1451 &self.capabilities
1452 }
1453
1454 fn state(&self) -> Pin<Box<dyn Future<Output = TransportState> + Send + '_>> {
1455 Box::pin(async { TransportState::Disconnected })
1456 }
1457
1458 fn connect(&self) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + '_>> {
1459 Box::pin(async { Ok(()) })
1460 }
1461
1462 fn disconnect(&self) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + '_>> {
1463 Box::pin(async { Ok(()) })
1464 }
1465
1466 fn send(
1467 &self,
1468 _message: TransportMessage,
1469 ) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + '_>> {
1470 Box::pin(async { Ok(()) })
1471 }
1472
1473 fn receive(
1474 &self,
1475 ) -> Pin<Box<dyn Future<Output = TransportResult<Option<TransportMessage>>> + Send + '_>>
1476 {
1477 Box::pin(async { Ok(None) })
1478 }
1479
1480 fn metrics(&self) -> Pin<Box<dyn Future<Output = TransportMetrics> + Send + '_>> {
1481 Box::pin(async { TransportMetrics::default() })
1482 }
1483
1484 fn configure(
1485 &self,
1486 _config: TransportConfig,
1487 ) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + '_>> {
1488 Box::pin(async { Ok(()) })
1489 }
1490 }
1491
1492 #[tokio::test]
1493 async fn test_with_capabilities_and_config_uses_handler_limit() {
1494 let capabilities = ClientCapabilities {
1495 max_concurrent_handlers: 7,
1496 ..Default::default()
1497 };
1498
1499 let client = Client::with_capabilities_and_config(
1500 NoopTransport::default(),
1501 capabilities,
1502 TransportConfig::default(),
1503 );
1504
1505 assert_eq!(client.inner.handler_semaphore.available_permits(), 7);
1506 }
1507
1508 #[tokio::test]
1509 async fn test_shutdown_sets_shutdown_flag() {
1510 let client = Client::new(NoopTransport::default());
1511 assert!(!client.inner.shutdown_requested.load(Ordering::Relaxed));
1512
1513 client.shutdown().await.expect("shutdown should succeed");
1514
1515 assert!(client.inner.shutdown_requested.load(Ordering::Relaxed));
1516 }
1517}