turbomcp_client/client/core.rs
1//! Core Client implementation for MCP communication
2//!
3//! This module contains the main `Client<T>` struct and its implementation,
4//! providing the core MCP client functionality including:
5//!
6//! - Connection initialization and lifecycle management
7//! - Message processing and bidirectional communication
8//! - MCP operation support (tools, prompts, resources, sampling, etc.)
9//! - Plugin middleware integration
10//! - Handler registration and management
11//!
12//! # Architecture
13//!
14//! `Client<T>` is implemented as a cheaply-cloneable Arc wrapper with interior
15//! mutability (same pattern as reqwest and AWS SDK):
16//!
17//! - **AtomicBool** for initialized flag (lock-free)
18//! - **Arc<Mutex<...>>** for handlers/plugins (infrequent mutation)
19//! - **`Arc<ClientInner<T>>`** for cheap cloning
20
21use parking_lot::Mutex;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, Ordering};
24
25use tokio::sync::Semaphore;
26
27use turbomcp_protocol::jsonrpc::*;
28#[cfg(feature = "experimental-tasks")]
29use turbomcp_protocol::types::tasks::*;
30use turbomcp_protocol::types::{
31 ClientCapabilities as ProtocolClientCapabilities, InitializeResult as ProtocolInitializeResult,
32 *,
33};
34use turbomcp_protocol::{Error, PROTOCOL_VERSION, Result};
35use turbomcp_transport::{Transport, TransportConfig, TransportMessage};
36
37use super::config::InitializeResult;
38use super::protocol::ProtocolClient;
39use crate::{
40 ClientCapabilities,
41 handlers::{HandlerError, HandlerRegistry},
42 sampling::SamplingHandler,
43};
44
45/// Inner client state with interior mutability
46///
47/// This structure contains the actual client state and is wrapped in Arc<...>
48/// to enable cheap cloning (same pattern as reqwest and AWS SDK).
49pub(super) struct ClientInner<T: Transport + 'static> {
50 /// Protocol client for low-level communication
51 pub(super) protocol: ProtocolClient<T>,
52
53 /// Client capabilities (immutable after construction)
54 pub(super) capabilities: ClientCapabilities,
55
56 /// Initialization state (lock-free atomic boolean)
57 pub(super) initialized: AtomicBool,
58
59 /// Tracks whether graceful shutdown has already been requested.
60 pub(super) shutdown_requested: AtomicBool,
61
62 /// Optional sampling handler (mutex for dynamic updates)
63 pub(super) sampling_handler: Arc<Mutex<Option<Arc<dyn SamplingHandler>>>>,
64
65 /// Handler registry for bidirectional communication (mutex for registration)
66 pub(super) handlers: Arc<Mutex<HandlerRegistry>>,
67
68 /// ✅ Semaphore for bounded concurrency of request/notification handlers
69 /// Limits concurrent server-initiated request handlers to prevent resource exhaustion
70 pub(super) handler_semaphore: Arc<Semaphore>,
71}
72
73/// The core MCP client implementation
74///
75/// Client provides a comprehensive interface for communicating with MCP servers,
76/// supporting all protocol features including tools, prompts, resources, sampling,
77/// elicitation, and bidirectional communication patterns.
78///
79/// # Clone Pattern
80///
81/// `Client<T>` is cheaply cloneable via Arc (same pattern as reqwest and AWS SDK).
82/// All clones share the same underlying connection and state:
83///
84/// ```rust,no_run
85/// use turbomcp_client::Client;
86/// use turbomcp_transport::stdio::StdioTransport;
87///
88/// # async fn example() -> turbomcp_protocol::Result<()> {
89/// let client = Client::new(StdioTransport::new());
90/// client.initialize().await?;
91///
92/// // Cheap clone - shares same connection
93/// let client2 = client.clone();
94/// tokio::spawn(async move {
95/// client2.list_tools().await.ok();
96/// });
97/// # Ok(())
98/// # }
99/// ```
100///
101/// The client must be initialized before use by calling `initialize()` to perform
102/// the MCP handshake and capability negotiation.
103///
104/// # Features
105///
106/// - **Protocol Compliance**: Full current MCP specification support
107/// - **Bidirectional Communication**: Server-initiated requests and client responses
108/// - **Plugin Middleware**: Extensible request/response processing
109/// - **Handler Registry**: Callbacks for server-initiated operations
110/// - **Connection Management**: Robust error handling and recovery
111/// - **Type Safety**: Compile-time guarantees for MCP message types
112/// - **Cheap Cloning**: Arc-based sharing like reqwest/AWS SDK
113///
114/// # Examples
115///
116/// ```rust,no_run
117/// use turbomcp_client::Client;
118/// use turbomcp_transport::stdio::StdioTransport;
119/// use std::collections::HashMap;
120///
121/// # async fn example() -> turbomcp_protocol::Result<()> {
122/// // Create and initialize client (no mut needed!)
123/// let client = Client::new(StdioTransport::new());
124/// let init_result = client.initialize().await?;
125/// println!("Connected to: {}", init_result.server_info.name);
126///
127/// // Use MCP operations
128/// let tools = client.list_tools().await?;
129/// let mut args = HashMap::new();
130/// args.insert("input".to_string(), serde_json::json!("test"));
131/// let result = client.call_tool("my_tool", Some(args), None).await?;
132/// # Ok(())
133/// # }
134/// ```
135pub struct Client<T: Transport + 'static> {
136 pub(super) inner: Arc<ClientInner<T>>,
137}
138
139/// Clone implementation via Arc (same pattern as reqwest/AWS SDK)
140///
141/// Cloning a Client is cheap (just an Arc clone) and all clones share
142/// the same underlying connection and state.
143impl<T: Transport + 'static> Clone for Client<T> {
144 fn clone(&self) -> Self {
145 Self {
146 inner: Arc::clone(&self.inner),
147 }
148 }
149}
150
151impl<T: Transport + 'static> Drop for ClientInner<T> {
152 fn drop(&mut self) {
153 // Best-effort cleanup if shutdown() wasn't called
154 // This is a safety net, but applications SHOULD call shutdown() explicitly
155
156 // Single clear warning
157 if !self.shutdown_requested.load(Ordering::Relaxed) {
158 tracing::warn!(
159 "MCP Client dropped without explicit shutdown(). \
160 Call client.shutdown().await before dropping for clean resource cleanup. \
161 Background tasks (WebSocket reconnection) may continue running."
162 );
163 }
164
165 // We can shutdown the dispatcher (it's synchronous)
166 self.protocol.dispatcher().shutdown();
167 }
168}
169
170impl<T: Transport + 'static> Client<T> {
171 /// Create a new client with the specified transport
172 ///
173 /// Creates a new MCP client instance with default capabilities.
174 /// The client must be initialized before use by calling `initialize()`.
175 ///
176 /// # Arguments
177 ///
178 /// * `transport` - The transport implementation to use for communication
179 ///
180 /// # Examples
181 ///
182 /// ```rust,no_run
183 /// use turbomcp_client::Client;
184 /// use turbomcp_transport::stdio::StdioTransport;
185 ///
186 /// let transport = StdioTransport::new();
187 /// let client = Client::new(transport);
188 /// ```
189 pub fn new(transport: T) -> Self {
190 Self::new_with_config(transport, TransportConfig::default())
191 }
192
193 /// Create a new client with an explicit transport configuration.
194 pub fn new_with_config(transport: T, config: TransportConfig) -> Self {
195 let capabilities = ClientCapabilities::default();
196 let client = Self {
197 inner: Arc::new(ClientInner {
198 protocol: ProtocolClient::with_config(transport, config),
199 capabilities: capabilities.clone(),
200 initialized: AtomicBool::new(false),
201 shutdown_requested: AtomicBool::new(false),
202 sampling_handler: Arc::new(Mutex::new(None)),
203 handlers: Arc::new(Mutex::new(HandlerRegistry::new())),
204 handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), // ✅ Configurable concurrent handlers
205 }),
206 };
207
208 // Register dispatcher handlers for bidirectional communication
209 client.register_dispatcher_handlers();
210
211 client
212 }
213
214 /// Create a new client with custom capabilities
215 ///
216 /// # Arguments
217 ///
218 /// * `transport` - The transport implementation to use
219 /// * `capabilities` - The client capabilities to negotiate
220 ///
221 /// # Examples
222 ///
223 /// ```rust,no_run
224 /// use turbomcp_client::{Client, ClientCapabilities};
225 /// use turbomcp_transport::stdio::StdioTransport;
226 ///
227 /// let capabilities = ClientCapabilities {
228 /// tools: true,
229 /// prompts: true,
230 /// resources: false,
231 /// sampling: false,
232 /// max_concurrent_handlers: 100,
233 /// };
234 ///
235 /// let transport = StdioTransport::new();
236 /// let client = Client::with_capabilities(transport, capabilities);
237 /// ```
238 pub fn with_capabilities(transport: T, capabilities: ClientCapabilities) -> Self {
239 Self::with_capabilities_and_config(transport, capabilities, TransportConfig::default())
240 }
241
242 /// Create a new client with custom capabilities and transport configuration.
243 pub fn with_capabilities_and_config(
244 transport: T,
245 capabilities: ClientCapabilities,
246 config: TransportConfig,
247 ) -> Self {
248 let client = Self {
249 inner: Arc::new(ClientInner {
250 protocol: ProtocolClient::with_config(transport, config),
251 capabilities: capabilities.clone(),
252 initialized: AtomicBool::new(false),
253 shutdown_requested: AtomicBool::new(false),
254 sampling_handler: Arc::new(Mutex::new(None)),
255 handlers: Arc::new(Mutex::new(HandlerRegistry::new())),
256 handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), // ✅ Configurable concurrent handlers
257 }),
258 };
259
260 // Register dispatcher handlers for bidirectional communication
261 client.register_dispatcher_handlers();
262
263 client
264 }
265
266 /// Shutdown the client and clean up all resources
267 ///
268 /// This method performs a **graceful shutdown** of the MCP client by:
269 /// 1. Stopping the message dispatcher background task
270 /// 2. Disconnecting the transport (closes connection, stops background tasks)
271 ///
272 /// **CRITICAL**: After calling `shutdown()`, the client can no longer be used.
273 ///
274 /// # Why This Method Exists
275 ///
276 /// The `Drop` implementation cannot call async methods like `transport.disconnect()`,
277 /// which is required for proper cleanup of WebSocket connections and background tasks.
278 /// Without calling `shutdown()`, WebSocket reconnection tasks will continue running.
279 ///
280 /// # Best Practices
281 ///
282 /// - **Always call `shutdown()` before dropping** for clean resource cleanup
283 /// - For applications: call in signal handlers (`SIGINT`, `SIGTERM`)
284 /// - For tests: call in cleanup/teardown
285 /// - If forgotten, Drop will log warnings and do best-effort cleanup
286 ///
287 /// # Examples
288 ///
289 /// ```rust,no_run
290 /// # use turbomcp_client::Client;
291 /// # use turbomcp_transport::stdio::StdioTransport;
292 /// # async fn example() -> turbomcp_protocol::Result<()> {
293 /// let client = Client::new(StdioTransport::new());
294 /// client.initialize().await?;
295 ///
296 /// // Use client...
297 /// let _tools = client.list_tools().await?;
298 ///
299 /// // ✅ Clean shutdown
300 /// client.shutdown().await?;
301 /// # Ok(())
302 /// # }
303 /// ```
304 pub async fn shutdown(&self) -> Result<()> {
305 self.inner.shutdown_requested.store(true, Ordering::Relaxed);
306 tracing::info!("Shutting down MCP client");
307
308 // 1. Shutdown message dispatcher
309 self.inner.protocol.dispatcher().shutdown();
310 tracing::debug!("Message dispatcher stopped");
311
312 // 2. Disconnect transport (WebSocket: stops reconnection, HTTP: closes connections)
313 match self.inner.protocol.transport().disconnect().await {
314 Ok(()) => {
315 tracing::info!("Transport disconnected successfully");
316 }
317 Err(e) => {
318 tracing::warn!("Transport disconnect error (may already be closed): {}", e);
319 }
320 }
321
322 tracing::info!("MCP client shutdown complete");
323 Ok(())
324 }
325}
326
327// ============================================================================
328// HTTP-Specific Convenience Constructors (Feature-Gated)
329// ============================================================================
330
331#[cfg(feature = "http")]
332impl Client<turbomcp_transport::streamable_http_client::StreamableHttpClientTransport> {
333 /// Connect to an HTTP MCP server (convenience method)
334 ///
335 /// This is a convenient one-liner alternative to manual configuration.
336 /// Creates an HTTP client, connects, and initializes in one call.
337 ///
338 /// # Arguments
339 ///
340 /// * `url` - The base URL of the MCP server (e.g., "http://localhost:8080")
341 ///
342 /// # Returns
343 ///
344 /// Returns an initialized `Client` ready to use.
345 ///
346 /// # Errors
347 ///
348 /// Returns an error if:
349 /// - The URL is invalid
350 /// - Connection to the server fails
351 /// - Initialization handshake fails
352 ///
353 /// # Examples
354 ///
355 /// ```rust,no_run
356 /// use turbomcp_client::Client;
357 ///
358 /// # async fn example() -> turbomcp_protocol::Result<()> {
359 /// // Convenient one-liner - balanced with server DX
360 /// let client = Client::connect_http("http://localhost:8080").await?;
361 ///
362 /// // Now use it directly
363 /// let tools = client.list_tools().await?;
364 /// # Ok(())
365 /// # }
366 /// ```
367 ///
368 /// Compare to verbose approach (10+ lines):
369 /// ```rust,no_run
370 /// use turbomcp_client::Client;
371 /// use turbomcp_transport::streamable_http_client::{
372 /// StreamableHttpClientConfig, StreamableHttpClientTransport
373 /// };
374 ///
375 /// # async fn example() -> turbomcp_protocol::Result<()> {
376 /// let config = StreamableHttpClientConfig {
377 /// base_url: "http://localhost:8080".to_string(),
378 /// ..Default::default()
379 /// };
380 /// let transport = StreamableHttpClientTransport::new(config)
381 /// .map_err(|e| turbomcp_protocol::Error::transport(e.to_string()))?;
382 /// let client = Client::new(transport);
383 /// client.initialize().await?;
384 /// # Ok(())
385 /// # }
386 /// ```
387 pub async fn connect_http(url: impl Into<String>) -> Result<Self> {
388 use turbomcp_transport::streamable_http_client::{
389 StreamableHttpClientConfig, StreamableHttpClientTransport,
390 };
391
392 let config = StreamableHttpClientConfig {
393 base_url: url.into(),
394 ..Default::default()
395 };
396
397 let transport = StreamableHttpClientTransport::new(config).map_err(|e| {
398 turbomcp_protocol::Error::transport(format!("Failed to build HTTP transport: {e}"))
399 })?;
400 let client = Self::new(transport);
401
402 // Initialize connection immediately
403 client.initialize().await?;
404
405 Ok(client)
406 }
407
408 /// Connect to an HTTP MCP server with custom configuration
409 ///
410 /// Provides more control than `connect_http()` while still being ergonomic.
411 ///
412 /// # Arguments
413 ///
414 /// * `url` - The base URL of the MCP server
415 /// * `config_fn` - Function to customize the configuration
416 ///
417 /// # Examples
418 ///
419 /// ```rust,no_run
420 /// use turbomcp_client::Client;
421 /// use std::time::Duration;
422 ///
423 /// # async fn example() -> turbomcp_protocol::Result<()> {
424 /// let client = Client::connect_http_with("http://localhost:8080", |config| {
425 /// config.timeout = Duration::from_secs(60);
426 /// config.endpoint_path = "/api/mcp".to_string();
427 /// }).await?;
428 /// # Ok(())
429 /// # }
430 /// ```
431 pub async fn connect_http_with<F>(url: impl Into<String>, config_fn: F) -> Result<Self>
432 where
433 F: FnOnce(&mut turbomcp_transport::streamable_http_client::StreamableHttpClientConfig),
434 {
435 use turbomcp_transport::streamable_http_client::{
436 StreamableHttpClientConfig, StreamableHttpClientTransport,
437 };
438
439 let mut config = StreamableHttpClientConfig {
440 base_url: url.into(),
441 ..Default::default()
442 };
443
444 config_fn(&mut config);
445
446 let transport = StreamableHttpClientTransport::new(config).map_err(|e| {
447 turbomcp_protocol::Error::transport(format!("Failed to build HTTP transport: {e}"))
448 })?;
449 let client = Self::new(transport);
450
451 client.initialize().await?;
452
453 Ok(client)
454 }
455}
456
457// ============================================================================
458// TCP-Specific Convenience Constructors (Feature-Gated)
459// ============================================================================
460
461#[cfg(feature = "tcp")]
462impl Client<turbomcp_transport::tcp::TcpTransport> {
463 /// Connect to a TCP MCP server (convenience method)
464 ///
465 /// Convenient one-liner for TCP connections - balanced DX.
466 ///
467 /// # Arguments
468 ///
469 /// * `addr` - Server address as a numeric `SocketAddr` (e.g. `"127.0.0.1:8765"`,
470 /// `"[::1]:8765"`). DNS hostnames like `"localhost:8765"` are NOT resolved here —
471 /// pre-resolve via `tokio::net::lookup_host` if needed.
472 ///
473 /// # Returns
474 ///
475 /// Returns an initialized `Client` ready to use.
476 ///
477 /// # Examples
478 ///
479 /// ```rust,no_run
480 /// # #[cfg(feature = "tcp")]
481 /// use turbomcp_client::Client;
482 ///
483 /// # async fn example() -> turbomcp_protocol::Result<()> {
484 /// let client = Client::connect_tcp("127.0.0.1:8765").await?;
485 /// let tools = client.list_tools().await?;
486 /// # Ok(())
487 /// # }
488 /// ```
489 pub async fn connect_tcp(addr: impl AsRef<str>) -> Result<Self> {
490 use std::net::SocketAddr;
491 use turbomcp_transport::tcp::TcpTransport;
492
493 let server_addr: SocketAddr = addr
494 .as_ref()
495 .parse()
496 .map_err(|e| Error::invalid_request(format!("Invalid address: {}", e)))?;
497
498 // Client binds to 0.0.0.0:0 (any available port)
499 let bind_addr: SocketAddr = if server_addr.is_ipv6() {
500 "[::]:0".parse().expect("valid IPv6 any-port address")
501 } else {
502 "0.0.0.0:0".parse().expect("valid IPv4 any-port address")
503 };
504
505 let transport = TcpTransport::new_client(bind_addr, server_addr);
506 let client = Self::new(transport);
507
508 client.initialize().await?;
509
510 Ok(client)
511 }
512}
513
514// ============================================================================
515// Unix Socket-Specific Convenience Constructors (Feature-Gated)
516// ============================================================================
517
518#[cfg(all(unix, feature = "unix"))]
519impl Client<turbomcp_transport::unix::UnixTransport> {
520 /// Connect to a Unix socket MCP server (convenience method)
521 ///
522 /// Convenient one-liner for Unix socket IPC - balanced DX.
523 ///
524 /// # Arguments
525 ///
526 /// * `path` - Socket file path (e.g., "/tmp/mcp.sock")
527 ///
528 /// # Returns
529 ///
530 /// Returns an initialized `Client` ready to use.
531 ///
532 /// # Examples
533 ///
534 /// ```rust,no_run
535 /// # #[cfg(all(unix, feature = "unix"))]
536 /// use turbomcp_client::Client;
537 ///
538 /// # async fn example() -> turbomcp_protocol::Result<()> {
539 /// let client = Client::connect_unix("/tmp/mcp.sock").await?;
540 /// let tools = client.list_tools().await?;
541 /// # Ok(())
542 /// # }
543 /// ```
544 pub async fn connect_unix(path: impl Into<std::path::PathBuf>) -> Result<Self> {
545 use turbomcp_transport::unix::UnixTransport;
546
547 let transport = UnixTransport::new_client(path.into());
548 let client = Self::new(transport);
549
550 client.initialize().await?;
551
552 Ok(client)
553 }
554}
555
556impl<T: Transport + 'static> Client<T> {
557 /// Register message handlers with the dispatcher
558 ///
559 /// This method sets up the callbacks that handle server-initiated requests
560 /// and notifications. The dispatcher's background task routes incoming
561 /// messages to these handlers.
562 ///
563 /// This is called automatically during Client construction (in `new()` and
564 /// `with_capabilities()`), so you don't need to call it manually.
565 ///
566 /// ## How It Works
567 ///
568 /// The handlers are synchronous closures that spawn async tasks to do the
569 /// actual work. This allows the dispatcher to continue routing messages
570 /// without blocking on handler execution.
571 fn register_dispatcher_handlers(&self) {
572 let dispatcher = self.inner.protocol.dispatcher();
573 let client_for_requests = self.clone();
574 let client_for_notifications = self.clone();
575
576 // Request handler (elicitation, sampling, etc.)
577 let semaphore = Arc::clone(&self.inner.handler_semaphore);
578 let request_handler = Arc::new(move |request: JsonRpcRequest| {
579 let client = client_for_requests.clone();
580 let method = request.method.clone();
581 let req_id = request.id.clone();
582 let semaphore = Arc::clone(&semaphore);
583
584 // ✅ Spawn async task with bounded concurrency
585 tokio::spawn(async move {
586 // Acquire permit (blocks if 100 requests already in flight)
587 let _permit = match semaphore.acquire().await {
588 Ok(permit) => permit,
589 Err(_) => {
590 tracing::warn!(
591 "Handler semaphore closed, dropping request: method={}",
592 method
593 );
594 return;
595 }
596 };
597
598 tracing::debug!(
599 "[request_handler] Handling server-initiated request: method={}, id={:?}",
600 method,
601 req_id
602 );
603 if let Err(e) = client.handle_request(request).await {
604 tracing::error!(
605 "[request_handler] Error handling server request '{}': {}",
606 method,
607 e
608 );
609 tracing::error!(" Request ID: {:?}", req_id);
610 tracing::error!(" Error kind: {:?}", e.kind);
611 } else {
612 tracing::debug!(
613 "[request_handler] Successfully handled server request: method={}, id={:?}",
614 method,
615 req_id
616 );
617 }
618 });
619 Ok(())
620 });
621
622 // Notification handler
623 let semaphore_notif = Arc::clone(&self.inner.handler_semaphore);
624 let notification_handler = Arc::new(move |notification: JsonRpcNotification| {
625 let client = client_for_notifications.clone();
626 let semaphore = Arc::clone(&semaphore_notif);
627
628 // ✅ Spawn async task with bounded concurrency
629 tokio::spawn(async move {
630 // Acquire permit (blocks if 100 handlers already in flight)
631 let _permit = match semaphore.acquire().await {
632 Ok(permit) => permit,
633 Err(_) => {
634 tracing::warn!("Handler semaphore closed, dropping notification");
635 return;
636 }
637 };
638
639 if let Err(e) = client.handle_notification(notification).await {
640 tracing::error!("Error handling server notification: {}", e);
641 }
642 // Permit automatically released on drop ✅
643 });
644 Ok(())
645 });
646
647 // Register handlers synchronously - no race condition!
648 // The set_* methods are now synchronous with std::sync::Mutex
649 dispatcher.set_request_handler(request_handler);
650 dispatcher.set_notification_handler(notification_handler);
651 tracing::debug!("Dispatcher handlers registered successfully");
652 }
653
654 /// Handle server-initiated requests (elicitation, sampling, roots)
655 ///
656 /// This method is called by the MessageDispatcher when it receives a request
657 /// from the server. It routes the request to the appropriate handler based on
658 /// the method name.
659 async fn handle_request(&self, request: JsonRpcRequest) -> Result<()> {
660 match request.method.as_str() {
661 "sampling/createMessage" => {
662 let handler_opt = self.inner.sampling_handler.lock().clone();
663 if let Some(handler) = handler_opt {
664 // Extract request ID for proper correlation
665 let request_id = match &request.id {
666 turbomcp_protocol::MessageId::String(s) => s.clone(),
667 turbomcp_protocol::MessageId::Number(n) => n.to_string(),
668 turbomcp_protocol::MessageId::Uuid(u) => u.to_string(),
669 };
670
671 let params: CreateMessageRequest =
672 serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
673 .map_err(|e| {
674 Error::internal(format!("Invalid createMessage params: {}", e))
675 })?;
676
677 match handler.handle_create_message(request_id, params).await {
678 Ok(result) => {
679 let result_value = serde_json::to_value(result).map_err(|e| {
680 Error::internal(format!("Failed to serialize response: {}", e))
681 })?;
682 let response = JsonRpcResponse::success(result_value, request.id);
683 self.send_response(response).await?;
684 }
685 Err(e) => {
686 tracing::warn!(
687 "[handle_request] Sampling handler returned error: {}",
688 e
689 );
690
691 // Preserve error semantics by checking actual error type
692 // This allows proper error code propagation for retry logic
693 let (code, message) = if let Some(handler_err) =
694 e.downcast_ref::<HandlerError>()
695 {
696 // HandlerError has explicit JSON-RPC code mapping
697 let json_err = handler_err.into_jsonrpc_error();
698 tracing::debug!(
699 "[handle_request] HandlerError mapped to JSON-RPC code: {}",
700 json_err.code
701 );
702 (json_err.code, json_err.message)
703 } else if let Some(proto_err) =
704 e.downcast_ref::<turbomcp_protocol::Error>()
705 {
706 // Protocol errors have ErrorKind-based mapping
707 tracing::debug!(
708 "[handle_request] Protocol error mapped to code: {}",
709 proto_err.jsonrpc_error_code()
710 );
711 (proto_err.jsonrpc_error_code(), proto_err.to_string())
712 } else {
713 // Generic errors default to Internal (-32603)
714 // Log the error type for debugging (should rarely hit this path)
715 tracing::warn!(
716 "[handle_request] Sampling handler returned unknown error type (not HandlerError or Protocol error): {}",
717 std::any::type_name_of_val(&*e)
718 );
719 (-32603, format!("Sampling handler error: {}", e))
720 };
721
722 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
723 code,
724 message,
725 data: None,
726 };
727 let response =
728 JsonRpcResponse::error_response(error, request.id.clone());
729
730 tracing::debug!(
731 "[handle_request] Sending error response for request: {:?}",
732 request.id
733 );
734 self.send_response(response).await?;
735 }
736 }
737 } else {
738 // No handler configured
739 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
740 code: -32601,
741 message: "Sampling not supported".to_string(),
742 data: None,
743 };
744 let response = JsonRpcResponse::error_response(error, request.id);
745 self.send_response(response).await?;
746 }
747 }
748 "roots/list" => {
749 // Handle roots/list request from server
750 // Clone the handler Arc to avoid holding mutex across await
751 let handler_opt = self.inner.handlers.lock().roots.clone();
752
753 let roots_result = if let Some(handler) = handler_opt {
754 handler.handle_roots_request().await
755 } else {
756 // No handler - return empty list per MCP spec
757 Ok(Vec::new())
758 };
759
760 match roots_result {
761 Ok(roots) => {
762 let result_value =
763 serde_json::to_value(turbomcp_protocol::types::ListRootsResult {
764 roots,
765 _meta: None,
766 })
767 .map_err(|e| {
768 Error::internal(format!(
769 "Failed to serialize roots response: {}",
770 e
771 ))
772 })?;
773 let response = JsonRpcResponse::success(result_value, request.id);
774 self.send_response(response).await?;
775 }
776 Err(e) => {
777 // FIXED: Extract error code from HandlerError instead of hardcoding -32603
778 // Roots handler returns HandlerResult<Vec<Root>>, so error is HandlerError
779 let json_err = e.into_jsonrpc_error();
780 let response = JsonRpcResponse::error_response(json_err, request.id);
781 self.send_response(response).await?;
782 }
783 }
784 }
785 "elicitation/create" => {
786 // Clone handler Arc before await to avoid holding mutex across await
787 let handler_opt = self.inner.handlers.lock().elicitation.clone();
788 if let Some(handler) = handler_opt {
789 // Parse elicitation request params as MCP protocol type
790 let proto_params: turbomcp_protocol::types::ElicitRequestParams =
791 serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
792 .map_err(|e| {
793 Error::internal(format!("Invalid elicitation params: {}", e))
794 })?;
795
796 // Wrap protocol params with ID for handler (preserves type safety!)
797 let handler_request =
798 crate::handlers::ElicitationRequest::new(request.id.clone(), proto_params);
799
800 // Call the registered elicitation handler
801 match handler.handle_elicitation(handler_request).await {
802 Ok(elicit_response) => {
803 // Convert handler response back to protocol type
804 let proto_result = elicit_response.into_protocol();
805 let result_value = serde_json::to_value(proto_result).map_err(|e| {
806 Error::internal(format!(
807 "Failed to serialize elicitation response: {}",
808 e
809 ))
810 })?;
811 let response = JsonRpcResponse::success(result_value, request.id);
812 self.send_response(response).await?;
813 }
814 Err(e) => {
815 // Convert handler error to JSON-RPC error using centralized mapping
816 let response =
817 JsonRpcResponse::error_response(e.into_jsonrpc_error(), request.id);
818 self.send_response(response).await?;
819 }
820 }
821 } else {
822 // No handler configured - elicitation not supported
823 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
824 code: -32601,
825 message: "Elicitation not supported - no handler registered".to_string(),
826 data: None,
827 };
828 let response = JsonRpcResponse::error_response(error, request.id);
829 self.send_response(response).await?;
830 }
831 }
832 _ => {
833 // Unknown method
834 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
835 code: -32601,
836 message: format!("Method not found: {}", request.method),
837 data: None,
838 };
839 let response = JsonRpcResponse::error_response(error, request.id);
840 self.send_response(response).await?;
841 }
842 }
843 Ok(())
844 }
845
846 /// Handle server-initiated notifications
847 ///
848 /// Routes notifications to appropriate handlers based on method name.
849 /// MCP defines several notification types that servers can send to clients:
850 ///
851 /// - `notifications/progress` - Progress updates for long-running operations
852 /// - `notifications/message` - Log messages from server
853 /// - `notifications/resources/updated` - Resource content changed
854 /// - `notifications/resources/list_changed` - Resource list changed
855 async fn handle_notification(&self, notification: JsonRpcNotification) -> Result<()> {
856 match notification.method.as_str() {
857 "notifications/progress" => {
858 // Route to progress handler
859 let handler_opt = self.inner.handlers.lock().get_progress_handler();
860
861 if let Some(handler) = handler_opt {
862 let progress: crate::handlers::ProgressNotification = serde_json::from_value(
863 notification.params.unwrap_or(serde_json::Value::Null),
864 )
865 .map_err(|e| {
866 Error::internal(format!("Invalid progress notification: {}", e))
867 })?;
868
869 if let Err(e) = handler.handle_progress(progress).await {
870 tracing::error!("Progress handler error: {}", e);
871 }
872 } else {
873 tracing::debug!("Progress notification received (no handler registered)");
874 }
875 }
876
877 "notifications/message" => {
878 // Route to log handler
879 let handler_opt = self.inner.handlers.lock().get_log_handler();
880
881 if let Some(handler) = handler_opt {
882 // Parse log message
883 let log: crate::handlers::LoggingNotification = serde_json::from_value(
884 notification.params.unwrap_or(serde_json::Value::Null),
885 )
886 .map_err(|e| Error::internal(format!("Invalid log notification: {}", e)))?;
887
888 // Call handler
889 if let Err(e) = handler.handle_log(log).await {
890 tracing::error!("Log handler error: {}", e);
891 }
892 } else {
893 tracing::debug!("Received log notification but no handler registered");
894 }
895 }
896
897 "notifications/resources/updated" => {
898 // Route to resource update handler
899 let handler_opt = self.inner.handlers.lock().get_resource_update_handler();
900
901 if let Some(handler) = handler_opt {
902 // Parse resource update notification
903 let update: crate::handlers::ResourceUpdatedNotification =
904 serde_json::from_value(
905 notification.params.unwrap_or(serde_json::Value::Null),
906 )
907 .map_err(|e| {
908 Error::internal(format!("Invalid resource update notification: {}", e))
909 })?;
910
911 // Call handler
912 if let Err(e) = handler.handle_resource_update(update).await {
913 tracing::error!("Resource update handler error: {}", e);
914 }
915 } else {
916 tracing::debug!(
917 "Received resource update notification but no handler registered"
918 );
919 }
920 }
921
922 "notifications/resources/list_changed" => {
923 // Route to resource list changed handler
924 let handler_opt = self
925 .inner
926 .handlers
927 .lock()
928 .get_resource_list_changed_handler();
929
930 if let Some(handler) = handler_opt {
931 if let Err(e) = handler.handle_resource_list_changed().await {
932 tracing::error!("Resource list changed handler error: {}", e);
933 }
934 } else {
935 tracing::debug!(
936 "Resource list changed notification received (no handler registered)"
937 );
938 }
939 }
940
941 "notifications/prompts/list_changed" => {
942 // Route to prompt list changed handler
943 let handler_opt = self.inner.handlers.lock().get_prompt_list_changed_handler();
944
945 if let Some(handler) = handler_opt {
946 if let Err(e) = handler.handle_prompt_list_changed().await {
947 tracing::error!("Prompt list changed handler error: {}", e);
948 }
949 } else {
950 tracing::debug!(
951 "Prompt list changed notification received (no handler registered)"
952 );
953 }
954 }
955
956 "notifications/tools/list_changed" => {
957 // Route to tool list changed handler
958 let handler_opt = self.inner.handlers.lock().get_tool_list_changed_handler();
959
960 if let Some(handler) = handler_opt {
961 if let Err(e) = handler.handle_tool_list_changed().await {
962 tracing::error!("Tool list changed handler error: {}", e);
963 }
964 } else {
965 tracing::debug!(
966 "Tool list changed notification received (no handler registered)"
967 );
968 }
969 }
970
971 "notifications/cancelled" => {
972 // Route to cancellation handler
973 let handler_opt = self.inner.handlers.lock().get_cancellation_handler();
974
975 if let Some(handler) = handler_opt {
976 // Parse cancellation notification
977 let cancellation: crate::handlers::CancelledNotification =
978 serde_json::from_value(
979 notification.params.unwrap_or(serde_json::Value::Null),
980 )
981 .map_err(|e| {
982 Error::internal(format!("Invalid cancellation notification: {}", e))
983 })?;
984
985 // Call handler
986 if let Err(e) = handler.handle_cancellation(cancellation).await {
987 tracing::error!("Cancellation handler error: {}", e);
988 }
989 } else {
990 tracing::debug!("Cancellation notification received (no handler registered)");
991 }
992 }
993
994 _ => {
995 // Unknown notification type
996 tracing::debug!("Received unknown notification: {}", notification.method);
997 }
998 }
999
1000 Ok(())
1001 }
1002
1003 async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
1004 tracing::debug!(
1005 response_id = ?response.id,
1006 "Sending JSON-RPC response"
1007 );
1008
1009 let payload = serde_json::to_vec(&response).map_err(|e| {
1010 tracing::error!("send_response failed to serialize: {e}");
1011 Error::internal(format!("Failed to serialize response: {}", e))
1012 })?;
1013
1014 // Server-initiated responses (sampling/createMessage, elicitation/create)
1015 // can carry sensitive LLM output, user-elicited credentials, or PII.
1016 // Log only size+id at debug; the body is `trace!`-only so operators
1017 // must opt in explicitly via RUST_LOG.
1018 tracing::debug!(
1019 response_id = ?response.id,
1020 payload_bytes = payload.len(),
1021 "Response serialized"
1022 );
1023 tracing::trace!(
1024 response_id = ?response.id,
1025 response_json = %String::from_utf8_lossy(&payload),
1026 "Response payload (trace-only; may contain sensitive data)"
1027 );
1028
1029 let message = TransportMessage::new(
1030 turbomcp_protocol::MessageId::from("response".to_string()),
1031 payload.into(),
1032 );
1033
1034 self.inner
1035 .protocol
1036 .transport()
1037 .send(message)
1038 .await
1039 .map_err(|e| {
1040 tracing::error!("send_response transport send failed: {e}");
1041 Error::transport(format!("Failed to send response: {}", e))
1042 })?;
1043
1044 tracing::debug!(response_id = ?response.id, "Response sent");
1045 Ok(())
1046 }
1047
1048 /// Initialize the connection with the MCP server
1049 ///
1050 /// Performs the initialization handshake with the server, negotiating capabilities
1051 /// and establishing the protocol version. This method must be called before
1052 /// any other operations can be performed.
1053 ///
1054 /// # Returns
1055 ///
1056 /// Returns an `InitializeResult` containing server information and negotiated capabilities.
1057 ///
1058 /// # Errors
1059 ///
1060 /// Returns an error if:
1061 /// - The transport connection fails
1062 /// - The server rejects the initialization request
1063 /// - Protocol negotiation fails
1064 ///
1065 /// # Examples
1066 ///
1067 /// ```rust,no_run
1068 /// # use turbomcp_client::Client;
1069 /// # use turbomcp_transport::stdio::StdioTransport;
1070 /// # async fn example() -> turbomcp_protocol::Result<()> {
1071 /// let mut client = Client::new(StdioTransport::new());
1072 ///
1073 /// let result = client.initialize().await?;
1074 /// println!("Server: {} v{}", result.server_info.name, result.server_info.version);
1075 /// # Ok(())
1076 /// # }
1077 /// ```
1078 pub async fn initialize(&self) -> Result<InitializeResult> {
1079 // Build client capabilities based on registered handlers (automatic detection)
1080 let mut client_caps = ProtocolClientCapabilities::default();
1081
1082 // Detect sampling capability from handler
1083 if let Some(sampling_caps) = self.get_sampling_capabilities() {
1084 client_caps.sampling = Some(sampling_caps);
1085 }
1086
1087 // Detect elicitation capability from handler
1088 if let Some(elicitation_caps) = self.get_elicitation_capabilities() {
1089 client_caps.elicitation = Some(elicitation_caps);
1090 }
1091
1092 // Detect roots capability from handler
1093 if let Some(roots_caps) = self.get_roots_capabilities() {
1094 client_caps.roots = Some(roots_caps);
1095 }
1096
1097 let request = InitializeRequest {
1098 protocol_version: PROTOCOL_VERSION.into(),
1099 capabilities: client_caps,
1100 client_info: turbomcp_protocol::types::Implementation {
1101 name: "turbomcp-client".to_string(),
1102 version: env!("CARGO_PKG_VERSION").to_string(),
1103 title: Some("TurboMCP Client".to_string()),
1104 ..Default::default()
1105 },
1106 meta: None,
1107 };
1108
1109 self.initialize_with_request(request).await
1110 }
1111
1112 /// Whether [`Client::initialize`] has completed for this client.
1113 ///
1114 /// Lock-free; safe to call from any task. Useful for callers that
1115 /// previously had to pattern-match on
1116 /// `Error::invalid_request("Client not initialized")` to detect the
1117 /// initialized state.
1118 #[must_use]
1119 pub fn is_initialized(&self) -> bool {
1120 self.inner.initialized.load(Ordering::Relaxed)
1121 }
1122
1123 /// Initialize the MCP session with an explicit initialize request.
1124 ///
1125 /// This is the opt-in path for draft protocol versions and capability
1126 /// fields such as official MCP `extensions`.
1127 pub async fn initialize_with_request(
1128 &self,
1129 request: InitializeRequest,
1130 ) -> Result<InitializeResult> {
1131 // Auto-connect transport if not already connected
1132 // This provides consistent DX across all transports (Stdio, TCP, HTTP, WebSocket, Unix)
1133 let transport = self.inner.protocol.transport();
1134 let transport_state = transport.state().await;
1135 if !matches!(
1136 transport_state,
1137 turbomcp_transport::TransportState::Connected
1138 ) {
1139 tracing::debug!(
1140 "Auto-connecting transport (current state: {:?})",
1141 transport_state
1142 );
1143 transport
1144 .connect()
1145 .await
1146 .map_err(|e| Error::transport(format!("Failed to connect transport: {}", e)))?;
1147 tracing::info!("Transport connected successfully");
1148 }
1149
1150 let protocol_response: ProtocolInitializeResult = self
1151 .inner
1152 .protocol
1153 .request("initialize", Some(serde_json::to_value(request)?))
1154 .await?;
1155
1156 // AtomicBool: lock-free store with Ordering::Relaxed
1157 self.inner.initialized.store(true, Ordering::Relaxed);
1158
1159 // Send initialized notification
1160 self.inner
1161 .protocol
1162 .notify("notifications/initialized", None)
1163 .await?;
1164
1165 // Convert protocol response to client response type
1166 Ok(InitializeResult {
1167 server_info: protocol_response.server_info,
1168 server_capabilities: protocol_response.capabilities,
1169 })
1170 }
1171
1172 /// Subscribe to resource change notifications
1173 ///
1174 /// Registers interest in receiving notifications when the specified
1175 /// resource changes. The server will send notifications when the
1176 /// resource is modified, created, or deleted.
1177 ///
1178 /// # Arguments
1179 ///
1180 /// * `uri` - The URI of the resource to monitor
1181 ///
1182 /// # Returns
1183 ///
1184 /// Returns `EmptyResult` on successful subscription.
1185 ///
1186 /// # Errors
1187 ///
1188 /// Returns an error if:
1189 /// - The client is not initialized
1190 /// - The URI is invalid or empty
1191 /// - The server doesn't support subscriptions
1192 /// - The request fails
1193 ///
1194 /// # Examples
1195 ///
1196 /// ```rust,no_run
1197 /// # use turbomcp_client::Client;
1198 /// # use turbomcp_transport::stdio::StdioTransport;
1199 /// # async fn example() -> turbomcp_protocol::Result<()> {
1200 /// let mut client = Client::new(StdioTransport::new());
1201 /// client.initialize().await?;
1202 ///
1203 /// // Subscribe to file changes
1204 /// client.subscribe("file:///watch/directory").await?;
1205 /// println!("Subscribed to resource changes");
1206 /// # Ok(())
1207 /// # }
1208 /// ```
1209 pub async fn subscribe(&self, uri: &str) -> Result<EmptyResult> {
1210 if !self.inner.initialized.load(Ordering::Relaxed) {
1211 return Err(Error::invalid_request("Client not initialized"));
1212 }
1213
1214 if uri.is_empty() {
1215 return Err(Error::invalid_request("Subscription URI cannot be empty"));
1216 }
1217
1218 // Send resources/subscribe request
1219 let request = SubscribeRequest {
1220 uri: uri.into(),
1221 _meta: None,
1222 };
1223
1224 self.inner
1225 .protocol
1226 .request(
1227 "resources/subscribe",
1228 Some(serde_json::to_value(request).map_err(|e| {
1229 Error::internal(format!("Failed to serialize subscribe request: {}", e))
1230 })?),
1231 )
1232 .await
1233 }
1234
1235 /// Unsubscribe from resource change notifications
1236 ///
1237 /// Cancels a previous subscription to resource changes. After unsubscribing,
1238 /// the client will no longer receive notifications for the specified resource.
1239 ///
1240 /// # Arguments
1241 ///
1242 /// * `uri` - The URI of the resource to stop monitoring
1243 ///
1244 /// # Returns
1245 ///
1246 /// Returns `EmptyResult` on successful unsubscription.
1247 ///
1248 /// # Errors
1249 ///
1250 /// Returns an error if:
1251 /// - The client is not initialized
1252 /// - The URI is invalid or empty
1253 /// - No active subscription exists for the URI
1254 /// - The request fails
1255 ///
1256 /// # Examples
1257 ///
1258 /// ```rust,no_run
1259 /// # use turbomcp_client::Client;
1260 /// # use turbomcp_transport::stdio::StdioTransport;
1261 /// # async fn example() -> turbomcp_protocol::Result<()> {
1262 /// let mut client = Client::new(StdioTransport::new());
1263 /// client.initialize().await?;
1264 ///
1265 /// // Unsubscribe from file changes
1266 /// client.unsubscribe("file:///watch/directory").await?;
1267 /// println!("Unsubscribed from resource changes");
1268 /// # Ok(())
1269 /// # }
1270 /// ```
1271 pub async fn unsubscribe(&self, uri: &str) -> Result<EmptyResult> {
1272 if !self.inner.initialized.load(Ordering::Relaxed) {
1273 return Err(Error::invalid_request("Client not initialized"));
1274 }
1275
1276 if uri.is_empty() {
1277 return Err(Error::invalid_request("Unsubscription URI cannot be empty"));
1278 }
1279
1280 // Send resources/unsubscribe request
1281 let request = UnsubscribeRequest {
1282 uri: uri.into(),
1283 _meta: None,
1284 };
1285
1286 self.inner
1287 .protocol
1288 .request(
1289 "resources/unsubscribe",
1290 Some(serde_json::to_value(request).map_err(|e| {
1291 Error::internal(format!("Failed to serialize unsubscribe request: {}", e))
1292 })?),
1293 )
1294 .await
1295 }
1296
1297 /// Get the client's capabilities configuration
1298 #[must_use]
1299 pub fn capabilities(&self) -> &ClientCapabilities {
1300 &self.inner.capabilities
1301 }
1302
1303 // ============================================================================
1304 // Tasks API Methods (MCP 2025-11-25 Draft - SEP-1686)
1305 // ============================================================================
1306
1307 /// Retrieve the status of a task (tasks/get)
1308 ///
1309 /// Polls the server for the current status of a specific task.
1310 ///
1311 /// # Arguments
1312 ///
1313 /// * `task_id` - The ID of the task to query
1314 ///
1315 /// # Returns
1316 ///
1317 /// Returns the current `Task` state including status, timestamps, and messages.
1318 #[cfg(feature = "experimental-tasks")]
1319 pub async fn get_task(&self, task_id: &str) -> Result<Task> {
1320 if !self.inner.initialized.load(Ordering::Relaxed) {
1321 return Err(Error::invalid_request("Client not initialized"));
1322 }
1323 let request = GetTaskRequest {
1324 task_id: task_id.to_string(),
1325 };
1326
1327 self.inner
1328 .protocol
1329 .request(
1330 "tasks/get",
1331 Some(serde_json::to_value(request).map_err(|e| {
1332 Error::internal(format!("Failed to serialize get_task request: {}", e))
1333 })?),
1334 )
1335 .await
1336 }
1337
1338 /// Cancel a running task (tasks/cancel)
1339 ///
1340 /// Attempts to cancel a task execution. This is a best-effort operation.
1341 ///
1342 /// # Arguments
1343 ///
1344 /// * `task_id` - The ID of the task to cancel
1345 ///
1346 /// # Returns
1347 ///
1348 /// Returns the updated `Task` state (typically with status "cancelled").
1349 #[cfg(feature = "experimental-tasks")]
1350 pub async fn cancel_task(&self, task_id: &str) -> Result<Task> {
1351 if !self.inner.initialized.load(Ordering::Relaxed) {
1352 return Err(Error::invalid_request("Client not initialized"));
1353 }
1354 let request = CancelTaskRequest {
1355 task_id: task_id.to_string(),
1356 };
1357
1358 self.inner
1359 .protocol
1360 .request(
1361 "tasks/cancel",
1362 Some(serde_json::to_value(request).map_err(|e| {
1363 Error::internal(format!("Failed to serialize cancel_task request: {}", e))
1364 })?),
1365 )
1366 .await
1367 }
1368
1369 /// List all tasks (tasks/list)
1370 ///
1371 /// Retrieves a paginated list of tasks known to the server.
1372 ///
1373 /// # Arguments
1374 ///
1375 /// * `cursor` - Optional pagination cursor from a previous response
1376 /// * `limit` - Optional maximum number of tasks to return
1377 ///
1378 /// # Returns
1379 ///
1380 /// Returns a `ListTasksResult` containing the list of tasks and next cursor.
1381 #[cfg(feature = "experimental-tasks")]
1382 pub async fn list_tasks(
1383 &self,
1384 cursor: Option<String>,
1385 limit: Option<usize>,
1386 ) -> Result<ListTasksResult> {
1387 if !self.inner.initialized.load(Ordering::Relaxed) {
1388 return Err(Error::invalid_request("Client not initialized"));
1389 }
1390 let request = ListTasksRequest { cursor, limit };
1391
1392 self.inner
1393 .protocol
1394 .request(
1395 "tasks/list",
1396 Some(serde_json::to_value(request).map_err(|e| {
1397 Error::internal(format!("Failed to serialize list_tasks request: {}", e))
1398 })?),
1399 )
1400 .await
1401 }
1402
1403 /// Retrieve the result of a completed task (tasks/result)
1404 ///
1405 /// Blocks until the task reaches a terminal state (completed, failed, or cancelled),
1406 /// then returns the operation result.
1407 ///
1408 /// # Arguments
1409 ///
1410 /// * `task_id` - The ID of the task to retrieve results for
1411 ///
1412 /// # Returns
1413 ///
1414 /// Returns a `GetTaskPayloadResult` containing the operation result (e.g. CallToolResult).
1415 #[cfg(feature = "experimental-tasks")]
1416 pub async fn get_task_result(&self, task_id: &str) -> Result<GetTaskPayloadResult> {
1417 if !self.inner.initialized.load(Ordering::Relaxed) {
1418 return Err(Error::invalid_request("Client not initialized"));
1419 }
1420 let request = GetTaskPayloadRequest {
1421 task_id: task_id.to_string(),
1422 };
1423
1424 self.inner
1425 .protocol
1426 .request(
1427 "tasks/result",
1428 Some(serde_json::to_value(request).map_err(|e| {
1429 Error::internal(format!(
1430 "Failed to serialize get_task_result request: {}",
1431 e
1432 ))
1433 })?),
1434 )
1435 .await
1436 }
1437
1438 // Note: Capability detection methods (has_*_handler, get_*_capabilities)
1439 // are defined in their respective operation modules:
1440 // - sampling.rs: has_sampling_handler, get_sampling_capabilities
1441 // - handlers.rs: has_elicitation_handler, has_roots_handler
1442 //
1443 // Additional capability getters for elicitation and roots added below
1444 // since they're used during initialization
1445
1446 /// Get elicitation capabilities if handler is registered
1447 /// Automatically detects capability based on registered handler
1448 fn get_elicitation_capabilities(
1449 &self,
1450 ) -> Option<turbomcp_protocol::types::ElicitationCapabilities> {
1451 if self.has_elicitation_handler() {
1452 // Currently returns default capabilities. In the future, schema_validation support
1453 // could be detected from handler traits by adding a HasSchemaValidation marker trait
1454 // that handlers could implement. For now, handlers validate schemas themselves.
1455 Some(turbomcp_protocol::types::ElicitationCapabilities::default())
1456 } else {
1457 None
1458 }
1459 }
1460
1461 /// Get roots capabilities if handler is registered
1462 fn get_roots_capabilities(&self) -> Option<turbomcp_protocol::types::RootsCapabilities> {
1463 if self.has_roots_handler() {
1464 // Roots capabilities indicate whether list can change
1465 Some(turbomcp_protocol::types::RootsCapabilities {
1466 list_changed: Some(true), // Support dynamic roots by default
1467 })
1468 } else {
1469 None
1470 }
1471 }
1472}
1473
1474#[cfg(test)]
1475mod tests {
1476 use super::*;
1477 use std::future::Future;
1478 use std::pin::Pin;
1479 use turbomcp_transport::{
1480 TransportCapabilities, TransportConfig, TransportMessage, TransportMetrics,
1481 TransportResult, TransportState, TransportType,
1482 };
1483
1484 #[derive(Debug, Default)]
1485 struct NoopTransport {
1486 capabilities: TransportCapabilities,
1487 }
1488
1489 impl Transport for NoopTransport {
1490 fn transport_type(&self) -> TransportType {
1491 TransportType::Stdio
1492 }
1493
1494 fn capabilities(&self) -> &TransportCapabilities {
1495 &self.capabilities
1496 }
1497
1498 fn state(&self) -> Pin<Box<dyn Future<Output = TransportState> + Send + '_>> {
1499 Box::pin(async { TransportState::Disconnected })
1500 }
1501
1502 fn connect(&self) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + '_>> {
1503 Box::pin(async { Ok(()) })
1504 }
1505
1506 fn disconnect(&self) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + '_>> {
1507 Box::pin(async { Ok(()) })
1508 }
1509
1510 fn send(
1511 &self,
1512 _message: TransportMessage,
1513 ) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + '_>> {
1514 Box::pin(async { Ok(()) })
1515 }
1516
1517 fn receive(
1518 &self,
1519 ) -> Pin<Box<dyn Future<Output = TransportResult<Option<TransportMessage>>> + Send + '_>>
1520 {
1521 Box::pin(async { Ok(None) })
1522 }
1523
1524 fn metrics(&self) -> Pin<Box<dyn Future<Output = TransportMetrics> + Send + '_>> {
1525 Box::pin(async { TransportMetrics::default() })
1526 }
1527
1528 fn configure(
1529 &self,
1530 _config: TransportConfig,
1531 ) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + '_>> {
1532 Box::pin(async { Ok(()) })
1533 }
1534 }
1535
1536 #[tokio::test]
1537 async fn test_with_capabilities_and_config_uses_handler_limit() {
1538 let capabilities = ClientCapabilities {
1539 max_concurrent_handlers: 7,
1540 ..Default::default()
1541 };
1542
1543 let client = Client::with_capabilities_and_config(
1544 NoopTransport::default(),
1545 capabilities,
1546 TransportConfig::default(),
1547 );
1548
1549 assert_eq!(client.inner.handler_semaphore.available_permits(), 7);
1550 }
1551
1552 #[tokio::test]
1553 async fn test_shutdown_sets_shutdown_flag() {
1554 let client = Client::new(NoopTransport::default());
1555 assert!(!client.inner.shutdown_requested.load(Ordering::Relaxed));
1556
1557 client.shutdown().await.expect("shutdown should succeed");
1558
1559 assert!(client.inner.shutdown_requested.load(Ordering::Relaxed));
1560 }
1561}