turbomcp_client/client/core.rs
1//! Core Client implementation for MCP communication
2//!
3//! This module contains the main `Client<T>` struct and its implementation,
4//! providing the core MCP client functionality including:
5//!
6//! - Connection initialization and lifecycle management
7//! - Message processing and bidirectional communication
8//! - MCP operation support (tools, prompts, resources, sampling, etc.)
9//! - Plugin middleware integration
10//! - Handler registration and management
11//!
12//! # Architecture
13//!
14//! `Client<T>` is implemented as a cheaply-cloneable Arc wrapper with interior
15//! mutability (same pattern as reqwest and AWS SDK):
16//!
17//! - **AtomicBool** for initialized flag (lock-free)
18//! - **Arc<Mutex<...>>** for handlers/plugins (infrequent mutation)
19//! - **`Arc<ClientInner<T>>`** for cheap cloning
20
21use parking_lot::Mutex;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, Ordering};
24
25use tokio::sync::Semaphore;
26
27use turbomcp_protocol::jsonrpc::*;
28#[cfg(feature = "experimental-tasks")]
29use turbomcp_protocol::types::tasks::*;
30use turbomcp_protocol::types::{
31 ClientCapabilities as ProtocolClientCapabilities, InitializeResult as ProtocolInitializeResult,
32 *,
33};
34use turbomcp_protocol::{Error, PROTOCOL_VERSION, Result};
35use turbomcp_transport::{Transport, TransportMessage};
36
37use super::config::InitializeResult;
38use super::protocol::ProtocolClient;
39use crate::{
40 ClientCapabilities,
41 handlers::{HandlerError, HandlerRegistry},
42 sampling::SamplingHandler,
43};
44
45/// Inner client state with interior mutability
46///
47/// This structure contains the actual client state and is wrapped in Arc<...>
48/// to enable cheap cloning (same pattern as reqwest and AWS SDK).
49pub(super) struct ClientInner<T: Transport + 'static> {
50 /// Protocol client for low-level communication
51 pub(super) protocol: ProtocolClient<T>,
52
53 /// Client capabilities (immutable after construction)
54 pub(super) capabilities: ClientCapabilities,
55
56 /// Initialization state (lock-free atomic boolean)
57 pub(super) initialized: AtomicBool,
58
59 /// Optional sampling handler (mutex for dynamic updates)
60 pub(super) sampling_handler: Arc<Mutex<Option<Arc<dyn SamplingHandler>>>>,
61
62 /// Handler registry for bidirectional communication (mutex for registration)
63 pub(super) handlers: Arc<Mutex<HandlerRegistry>>,
64
65 /// ✅ Semaphore for bounded concurrency of request/notification handlers
66 /// Limits concurrent server-initiated request handlers to prevent resource exhaustion
67 pub(super) handler_semaphore: Arc<Semaphore>,
68}
69
70/// The core MCP client implementation
71///
72/// Client provides a comprehensive interface for communicating with MCP servers,
73/// supporting all protocol features including tools, prompts, resources, sampling,
74/// elicitation, and bidirectional communication patterns.
75///
76/// # Clone Pattern
77///
78/// `Client<T>` is cheaply cloneable via Arc (same pattern as reqwest and AWS SDK).
79/// All clones share the same underlying connection and state:
80///
81/// ```rust,no_run
82/// use turbomcp_client::Client;
83/// use turbomcp_transport::stdio::StdioTransport;
84///
85/// # async fn example() -> turbomcp_protocol::Result<()> {
86/// let client = Client::new(StdioTransport::new());
87/// client.initialize().await?;
88///
89/// // Cheap clone - shares same connection
90/// let client2 = client.clone();
91/// tokio::spawn(async move {
92/// client2.list_tools().await.ok();
93/// });
94/// # Ok(())
95/// # }
96/// ```
97///
98/// The client must be initialized before use by calling `initialize()` to perform
99/// the MCP handshake and capability negotiation.
100///
101/// # Features
102///
103/// - **Protocol Compliance**: Full MCP 2025-06-18 specification support
104/// - **Bidirectional Communication**: Server-initiated requests and client responses
105/// - **Plugin Middleware**: Extensible request/response processing
106/// - **Handler Registry**: Callbacks for server-initiated operations
107/// - **Connection Management**: Robust error handling and recovery
108/// - **Type Safety**: Compile-time guarantees for MCP message types
109/// - **Cheap Cloning**: Arc-based sharing like reqwest/AWS SDK
110///
111/// # Examples
112///
113/// ```rust,no_run
114/// use turbomcp_client::Client;
115/// use turbomcp_transport::stdio::StdioTransport;
116/// use std::collections::HashMap;
117///
118/// # async fn example() -> turbomcp_protocol::Result<()> {
119/// // Create and initialize client (no mut needed!)
120/// let client = Client::new(StdioTransport::new());
121/// let init_result = client.initialize().await?;
122/// println!("Connected to: {}", init_result.server_info.name);
123///
124/// // Use MCP operations
125/// let tools = client.list_tools().await?;
126/// let mut args = HashMap::new();
127/// args.insert("input".to_string(), serde_json::json!("test"));
128/// let result = client.call_tool("my_tool", Some(args), None).await?;
129/// # Ok(())
130/// # }
131/// ```
132pub struct Client<T: Transport + 'static> {
133 pub(super) inner: Arc<ClientInner<T>>,
134}
135
136/// Clone implementation via Arc (same pattern as reqwest/AWS SDK)
137///
138/// Cloning a Client is cheap (just an Arc clone) and all clones share
139/// the same underlying connection and state.
140impl<T: Transport + 'static> Clone for Client<T> {
141 fn clone(&self) -> Self {
142 Self {
143 inner: Arc::clone(&self.inner),
144 }
145 }
146}
147
148impl<T: Transport + 'static> Drop for ClientInner<T> {
149 fn drop(&mut self) {
150 // Best-effort cleanup if shutdown() wasn't called
151 // This is a safety net, but applications SHOULD call shutdown() explicitly
152
153 // Single clear warning
154 tracing::warn!(
155 "MCP Client dropped without explicit shutdown(). \
156 Call client.shutdown().await before dropping for clean resource cleanup. \
157 Background tasks (WebSocket reconnection) may continue running."
158 );
159
160 // We can shutdown the dispatcher (it's synchronous)
161 self.protocol.dispatcher().shutdown();
162 }
163}
164
165impl<T: Transport + 'static> Client<T> {
166 /// Create a new client with the specified transport
167 ///
168 /// Creates a new MCP client instance with default capabilities.
169 /// The client must be initialized before use by calling `initialize()`.
170 ///
171 /// # Arguments
172 ///
173 /// * `transport` - The transport implementation to use for communication
174 ///
175 /// # Examples
176 ///
177 /// ```rust,no_run
178 /// use turbomcp_client::Client;
179 /// use turbomcp_transport::stdio::StdioTransport;
180 ///
181 /// let transport = StdioTransport::new();
182 /// let client = Client::new(transport);
183 /// ```
184 pub fn new(transport: T) -> Self {
185 let capabilities = ClientCapabilities::default();
186 let client = Self {
187 inner: Arc::new(ClientInner {
188 protocol: ProtocolClient::new(transport),
189 capabilities: capabilities.clone(),
190 initialized: AtomicBool::new(false),
191 sampling_handler: Arc::new(Mutex::new(None)),
192 handlers: Arc::new(Mutex::new(HandlerRegistry::new())),
193 handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), // ✅ Configurable concurrent handlers
194 }),
195 };
196
197 // Register dispatcher handlers for bidirectional communication
198 client.register_dispatcher_handlers();
199
200 client
201 }
202
203 /// Create a new client with custom capabilities
204 ///
205 /// # Arguments
206 ///
207 /// * `transport` - The transport implementation to use
208 /// * `capabilities` - The client capabilities to negotiate
209 ///
210 /// # Examples
211 ///
212 /// ```rust,no_run
213 /// use turbomcp_client::{Client, ClientCapabilities};
214 /// use turbomcp_transport::stdio::StdioTransport;
215 ///
216 /// let capabilities = ClientCapabilities {
217 /// tools: true,
218 /// prompts: true,
219 /// resources: false,
220 /// sampling: false,
221 /// max_concurrent_handlers: 100,
222 /// };
223 ///
224 /// let transport = StdioTransport::new();
225 /// let client = Client::with_capabilities(transport, capabilities);
226 /// ```
227 pub fn with_capabilities(transport: T, capabilities: ClientCapabilities) -> Self {
228 let client = Self {
229 inner: Arc::new(ClientInner {
230 protocol: ProtocolClient::new(transport),
231 capabilities: capabilities.clone(),
232 initialized: AtomicBool::new(false),
233 sampling_handler: Arc::new(Mutex::new(None)),
234 handlers: Arc::new(Mutex::new(HandlerRegistry::new())),
235 handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), // ✅ Configurable concurrent handlers
236 }),
237 };
238
239 // Register dispatcher handlers for bidirectional communication
240 client.register_dispatcher_handlers();
241
242 client
243 }
244
245 /// Shutdown the client and clean up all resources
246 ///
247 /// This method performs a **graceful shutdown** of the MCP client by:
248 /// 1. Stopping the message dispatcher background task
249 /// 2. Disconnecting the transport (closes connection, stops background tasks)
250 ///
251 /// **CRITICAL**: After calling `shutdown()`, the client can no longer be used.
252 ///
253 /// # Why This Method Exists
254 ///
255 /// The `Drop` implementation cannot call async methods like `transport.disconnect()`,
256 /// which is required for proper cleanup of WebSocket connections and background tasks.
257 /// Without calling `shutdown()`, WebSocket reconnection tasks will continue running.
258 ///
259 /// # Best Practices
260 ///
261 /// - **Always call `shutdown()` before dropping** for clean resource cleanup
262 /// - For applications: call in signal handlers (`SIGINT`, `SIGTERM`)
263 /// - For tests: call in cleanup/teardown
264 /// - If forgotten, Drop will log warnings and do best-effort cleanup
265 ///
266 /// # Examples
267 ///
268 /// ```rust,no_run
269 /// # use turbomcp_client::Client;
270 /// # use turbomcp_transport::stdio::StdioTransport;
271 /// # async fn example() -> turbomcp_protocol::Result<()> {
272 /// let client = Client::new(StdioTransport::new());
273 /// client.initialize().await?;
274 ///
275 /// // Use client...
276 /// let _tools = client.list_tools().await?;
277 ///
278 /// // ✅ Clean shutdown
279 /// client.shutdown().await?;
280 /// # Ok(())
281 /// # }
282 /// ```
283 pub async fn shutdown(&self) -> Result<()> {
284 tracing::info!("🛑 Shutting down MCP client");
285
286 // 1. Shutdown message dispatcher
287 self.inner.protocol.dispatcher().shutdown();
288 tracing::debug!("✅ Message dispatcher stopped");
289
290 // 2. Disconnect transport (WebSocket: stops reconnection, HTTP: closes connections)
291 match self.inner.protocol.transport().disconnect().await {
292 Ok(()) => {
293 tracing::info!("✅ Transport disconnected successfully");
294 }
295 Err(e) => {
296 tracing::warn!("Transport disconnect error (may already be closed): {}", e);
297 }
298 }
299
300 tracing::info!("✅ MCP client shutdown complete");
301 Ok(())
302 }
303}
304
305// ============================================================================
306// HTTP-Specific Convenience Constructors (Feature-Gated)
307// ============================================================================
308
309#[cfg(feature = "http")]
310impl Client<turbomcp_transport::streamable_http_client::StreamableHttpClientTransport> {
311 /// Connect to an HTTP MCP server (convenience method)
312 ///
313 /// This is a convenient one-liner alternative to manual configuration.
314 /// Creates an HTTP client, connects, and initializes in one call.
315 ///
316 /// # Arguments
317 ///
318 /// * `url` - The base URL of the MCP server (e.g., "http://localhost:8080")
319 ///
320 /// # Returns
321 ///
322 /// Returns an initialized `Client` ready to use.
323 ///
324 /// # Errors
325 ///
326 /// Returns an error if:
327 /// - The URL is invalid
328 /// - Connection to the server fails
329 /// - Initialization handshake fails
330 ///
331 /// # Examples
332 ///
333 /// ```rust,no_run
334 /// use turbomcp_client::Client;
335 ///
336 /// # async fn example() -> turbomcp_protocol::Result<()> {
337 /// // Convenient one-liner - balanced with server DX
338 /// let client = Client::connect_http("http://localhost:8080").await?;
339 ///
340 /// // Now use it directly
341 /// let tools = client.list_tools().await?;
342 /// # Ok(())
343 /// # }
344 /// ```
345 ///
346 /// Compare to verbose approach (10+ lines):
347 /// ```rust,no_run
348 /// use turbomcp_client::Client;
349 /// use turbomcp_transport::streamable_http_client::{
350 /// StreamableHttpClientConfig, StreamableHttpClientTransport
351 /// };
352 ///
353 /// # async fn example() -> turbomcp_protocol::Result<()> {
354 /// let config = StreamableHttpClientConfig {
355 /// base_url: "http://localhost:8080".to_string(),
356 /// ..Default::default()
357 /// };
358 /// let transport = StreamableHttpClientTransport::new(config);
359 /// let client = Client::new(transport);
360 /// client.initialize().await?;
361 /// # Ok(())
362 /// # }
363 /// ```
364 pub async fn connect_http(url: impl Into<String>) -> Result<Self> {
365 use turbomcp_transport::streamable_http_client::{
366 StreamableHttpClientConfig, StreamableHttpClientTransport,
367 };
368
369 let config = StreamableHttpClientConfig {
370 base_url: url.into(),
371 ..Default::default()
372 };
373
374 let transport = StreamableHttpClientTransport::new(config);
375 let client = Self::new(transport);
376
377 // Initialize connection immediately
378 client.initialize().await?;
379
380 Ok(client)
381 }
382
383 /// Connect to an HTTP MCP server with custom configuration
384 ///
385 /// Provides more control than `connect_http()` while still being ergonomic.
386 ///
387 /// # Arguments
388 ///
389 /// * `url` - The base URL of the MCP server
390 /// * `config_fn` - Function to customize the configuration
391 ///
392 /// # Examples
393 ///
394 /// ```rust,no_run
395 /// use turbomcp_client::Client;
396 /// use std::time::Duration;
397 ///
398 /// # async fn example() -> turbomcp_protocol::Result<()> {
399 /// let client = Client::connect_http_with("http://localhost:8080", |config| {
400 /// config.timeout = Duration::from_secs(60);
401 /// config.endpoint_path = "/api/mcp".to_string();
402 /// }).await?;
403 /// # Ok(())
404 /// # }
405 /// ```
406 pub async fn connect_http_with<F>(url: impl Into<String>, config_fn: F) -> Result<Self>
407 where
408 F: FnOnce(&mut turbomcp_transport::streamable_http_client::StreamableHttpClientConfig),
409 {
410 use turbomcp_transport::streamable_http_client::{
411 StreamableHttpClientConfig, StreamableHttpClientTransport,
412 };
413
414 let mut config = StreamableHttpClientConfig {
415 base_url: url.into(),
416 ..Default::default()
417 };
418
419 config_fn(&mut config);
420
421 let transport = StreamableHttpClientTransport::new(config);
422 let client = Self::new(transport);
423
424 client.initialize().await?;
425
426 Ok(client)
427 }
428}
429
430// ============================================================================
431// TCP-Specific Convenience Constructors (Feature-Gated)
432// ============================================================================
433
434#[cfg(feature = "tcp")]
435impl Client<turbomcp_transport::tcp::TcpTransport> {
436 /// Connect to a TCP MCP server (convenience method)
437 ///
438 /// Convenient one-liner for TCP connections - balanced DX.
439 ///
440 /// # Arguments
441 ///
442 /// * `addr` - Server address (e.g., "127.0.0.1:8765" or localhost:8765")
443 ///
444 /// # Returns
445 ///
446 /// Returns an initialized `Client` ready to use.
447 ///
448 /// # Examples
449 ///
450 /// ```rust,no_run
451 /// # #[cfg(feature = "tcp")]
452 /// use turbomcp_client::Client;
453 ///
454 /// # async fn example() -> turbomcp_protocol::Result<()> {
455 /// let client = Client::connect_tcp("127.0.0.1:8765").await?;
456 /// let tools = client.list_tools().await?;
457 /// # Ok(())
458 /// # }
459 /// ```
460 pub async fn connect_tcp(addr: impl AsRef<str>) -> Result<Self> {
461 use std::net::SocketAddr;
462 use turbomcp_transport::tcp::TcpTransport;
463
464 let server_addr: SocketAddr = addr
465 .as_ref()
466 .parse()
467 .map_err(|e| Error::invalid_request(format!("Invalid address: {}", e)))?;
468
469 // Client binds to 0.0.0.0:0 (any available port)
470 let bind_addr: SocketAddr = if server_addr.is_ipv6() {
471 "[::]:0".parse().expect("valid IPv6 any-port address")
472 } else {
473 "0.0.0.0:0".parse().expect("valid IPv4 any-port address")
474 };
475
476 let transport = TcpTransport::new_client(bind_addr, server_addr);
477 let client = Self::new(transport);
478
479 client.initialize().await?;
480
481 Ok(client)
482 }
483}
484
485// ============================================================================
486// Unix Socket-Specific Convenience Constructors (Feature-Gated)
487// ============================================================================
488
489#[cfg(all(unix, feature = "unix"))]
490impl Client<turbomcp_transport::unix::UnixTransport> {
491 /// Connect to a Unix socket MCP server (convenience method)
492 ///
493 /// Convenient one-liner for Unix socket IPC - balanced DX.
494 ///
495 /// # Arguments
496 ///
497 /// * `path` - Socket file path (e.g., "/tmp/mcp.sock")
498 ///
499 /// # Returns
500 ///
501 /// Returns an initialized `Client` ready to use.
502 ///
503 /// # Examples
504 ///
505 /// ```rust,no_run
506 /// # #[cfg(all(unix, feature = "unix"))]
507 /// use turbomcp_client::Client;
508 ///
509 /// # async fn example() -> turbomcp_protocol::Result<()> {
510 /// let client = Client::connect_unix("/tmp/mcp.sock").await?;
511 /// let tools = client.list_tools().await?;
512 /// # Ok(())
513 /// # }
514 /// ```
515 pub async fn connect_unix(path: impl Into<std::path::PathBuf>) -> Result<Self> {
516 use turbomcp_transport::unix::UnixTransport;
517
518 let transport = UnixTransport::new_client(path.into());
519 let client = Self::new(transport);
520
521 client.initialize().await?;
522
523 Ok(client)
524 }
525}
526
527impl<T: Transport + 'static> Client<T> {
528 /// Register message handlers with the dispatcher
529 ///
530 /// This method sets up the callbacks that handle server-initiated requests
531 /// and notifications. The dispatcher's background task routes incoming
532 /// messages to these handlers.
533 ///
534 /// This is called automatically during Client construction (in `new()` and
535 /// `with_capabilities()`), so you don't need to call it manually.
536 ///
537 /// ## How It Works
538 ///
539 /// The handlers are synchronous closures that spawn async tasks to do the
540 /// actual work. This allows the dispatcher to continue routing messages
541 /// without blocking on handler execution.
542 fn register_dispatcher_handlers(&self) {
543 let dispatcher = self.inner.protocol.dispatcher();
544 let client_for_requests = self.clone();
545 let client_for_notifications = self.clone();
546
547 // Request handler (elicitation, sampling, etc.)
548 let semaphore = Arc::clone(&self.inner.handler_semaphore);
549 let request_handler = Arc::new(move |request: JsonRpcRequest| {
550 let client = client_for_requests.clone();
551 let method = request.method.clone();
552 let req_id = request.id.clone();
553 let semaphore = Arc::clone(&semaphore);
554
555 // ✅ Spawn async task with bounded concurrency
556 tokio::spawn(async move {
557 // Acquire permit (blocks if 100 requests already in flight)
558 let _permit = match semaphore.acquire().await {
559 Ok(permit) => permit,
560 Err(_) => {
561 tracing::warn!(
562 "Handler semaphore closed, dropping request: method={}",
563 method
564 );
565 return;
566 }
567 };
568
569 tracing::debug!(
570 "🔄 [request_handler] Handling server-initiated request: method={}, id={:?}",
571 method,
572 req_id
573 );
574 if let Err(e) = client.handle_request(request).await {
575 tracing::error!(
576 "❌ [request_handler] Error handling server request '{}': {}",
577 method,
578 e
579 );
580 tracing::error!(" Request ID: {:?}", req_id);
581 tracing::error!(" Error kind: {:?}", e.kind);
582 } else {
583 tracing::debug!(
584 "✅ [request_handler] Successfully handled server request: method={}, id={:?}",
585 method,
586 req_id
587 );
588 }
589 // Permit automatically released on drop ✅
590 });
591 Ok(())
592 });
593
594 // Notification handler
595 let semaphore_notif = Arc::clone(&self.inner.handler_semaphore);
596 let notification_handler = Arc::new(move |notification: JsonRpcNotification| {
597 let client = client_for_notifications.clone();
598 let semaphore = Arc::clone(&semaphore_notif);
599
600 // ✅ Spawn async task with bounded concurrency
601 tokio::spawn(async move {
602 // Acquire permit (blocks if 100 handlers already in flight)
603 let _permit = match semaphore.acquire().await {
604 Ok(permit) => permit,
605 Err(_) => {
606 tracing::warn!("Handler semaphore closed, dropping notification");
607 return;
608 }
609 };
610
611 if let Err(e) = client.handle_notification(notification).await {
612 tracing::error!("Error handling server notification: {}", e);
613 }
614 // Permit automatically released on drop ✅
615 });
616 Ok(())
617 });
618
619 // Register handlers synchronously - no race condition!
620 // The set_* methods are now synchronous with std::sync::Mutex
621 dispatcher.set_request_handler(request_handler);
622 dispatcher.set_notification_handler(notification_handler);
623 tracing::debug!("Dispatcher handlers registered successfully");
624 }
625
626 /// Handle server-initiated requests (elicitation, sampling, roots)
627 ///
628 /// This method is called by the MessageDispatcher when it receives a request
629 /// from the server. It routes the request to the appropriate handler based on
630 /// the method name.
631 async fn handle_request(&self, request: JsonRpcRequest) -> Result<()> {
632 match request.method.as_str() {
633 "sampling/createMessage" => {
634 let handler_opt = self.inner.sampling_handler.lock().clone();
635 if let Some(handler) = handler_opt {
636 // Extract request ID for proper correlation
637 let request_id = match &request.id {
638 turbomcp_protocol::MessageId::String(s) => s.clone(),
639 turbomcp_protocol::MessageId::Number(n) => n.to_string(),
640 turbomcp_protocol::MessageId::Uuid(u) => u.to_string(),
641 };
642
643 let params: CreateMessageRequest =
644 serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
645 .map_err(|e| {
646 Error::internal(format!("Invalid createMessage params: {}", e))
647 })?;
648
649 match handler.handle_create_message(request_id, params).await {
650 Ok(result) => {
651 let result_value = serde_json::to_value(result).map_err(|e| {
652 Error::internal(format!("Failed to serialize response: {}", e))
653 })?;
654 let response = JsonRpcResponse::success(result_value, request.id);
655 self.send_response(response).await?;
656 }
657 Err(e) => {
658 tracing::warn!(
659 "⚠️ [handle_request] Sampling handler returned error: {}",
660 e
661 );
662
663 // Preserve error semantics by checking actual error type
664 // This allows proper error code propagation for retry logic
665 let (code, message) = if let Some(handler_err) =
666 e.downcast_ref::<HandlerError>()
667 {
668 // HandlerError has explicit JSON-RPC code mapping
669 let json_err = handler_err.into_jsonrpc_error();
670 tracing::info!(
671 "📋 [handle_request] HandlerError mapped to JSON-RPC code: {}",
672 json_err.code
673 );
674 (json_err.code, json_err.message)
675 } else if let Some(proto_err) =
676 e.downcast_ref::<turbomcp_protocol::Error>()
677 {
678 // Protocol errors have ErrorKind-based mapping
679 tracing::info!(
680 "📋 [handle_request] Protocol error mapped to code: {}",
681 proto_err.jsonrpc_error_code()
682 );
683 (proto_err.jsonrpc_error_code(), proto_err.to_string())
684 } else {
685 // Generic errors default to Internal (-32603)
686 // Log the error type for debugging (should rarely hit this path)
687 tracing::warn!(
688 "📋 [handle_request] Sampling handler returned unknown error type (not HandlerError or Protocol error): {}",
689 std::any::type_name_of_val(&*e)
690 );
691 (-32603, format!("Sampling handler error: {}", e))
692 };
693
694 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
695 code,
696 message,
697 data: None,
698 };
699 let response =
700 JsonRpcResponse::error_response(error, request.id.clone());
701
702 tracing::info!(
703 "🔄 [handle_request] Attempting to send error response for request: {:?}",
704 request.id
705 );
706 self.send_response(response).await?;
707 tracing::info!(
708 "✅ [handle_request] Error response sent successfully for request: {:?}",
709 request.id
710 );
711 }
712 }
713 } else {
714 // No handler configured
715 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
716 code: -32601,
717 message: "Sampling not supported".to_string(),
718 data: None,
719 };
720 let response = JsonRpcResponse::error_response(error, request.id);
721 self.send_response(response).await?;
722 }
723 }
724 "roots/list" => {
725 // Handle roots/list request from server
726 // Clone the handler Arc to avoid holding mutex across await
727 let handler_opt = self.inner.handlers.lock().roots.clone();
728
729 let roots_result = if let Some(handler) = handler_opt {
730 handler.handle_roots_request().await
731 } else {
732 // No handler - return empty list per MCP spec
733 Ok(Vec::new())
734 };
735
736 match roots_result {
737 Ok(roots) => {
738 let result_value =
739 serde_json::to_value(turbomcp_protocol::types::ListRootsResult {
740 roots,
741 _meta: None,
742 })
743 .map_err(|e| {
744 Error::internal(format!(
745 "Failed to serialize roots response: {}",
746 e
747 ))
748 })?;
749 let response = JsonRpcResponse::success(result_value, request.id);
750 self.send_response(response).await?;
751 }
752 Err(e) => {
753 // FIXED: Extract error code from HandlerError instead of hardcoding -32603
754 // Roots handler returns HandlerResult<Vec<Root>>, so error is HandlerError
755 let json_err = e.into_jsonrpc_error();
756 let response = JsonRpcResponse::error_response(json_err, request.id);
757 self.send_response(response).await?;
758 }
759 }
760 }
761 "elicitation/create" => {
762 // Clone handler Arc before await to avoid holding mutex across await
763 let handler_opt = self.inner.handlers.lock().elicitation.clone();
764 if let Some(handler) = handler_opt {
765 // Parse elicitation request params as MCP protocol type
766 let proto_request: turbomcp_protocol::types::ElicitRequest =
767 serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
768 .map_err(|e| {
769 Error::internal(format!("Invalid elicitation params: {}", e))
770 })?;
771
772 // Wrap protocol request with ID for handler (preserves type safety!)
773 let handler_request =
774 crate::handlers::ElicitationRequest::new(request.id.clone(), proto_request);
775
776 // Call the registered elicitation handler
777 match handler.handle_elicitation(handler_request).await {
778 Ok(elicit_response) => {
779 // Convert handler response back to protocol type
780 let proto_result = elicit_response.into_protocol();
781 let result_value = serde_json::to_value(proto_result).map_err(|e| {
782 Error::internal(format!(
783 "Failed to serialize elicitation response: {}",
784 e
785 ))
786 })?;
787 let response = JsonRpcResponse::success(result_value, request.id);
788 self.send_response(response).await?;
789 }
790 Err(e) => {
791 // Convert handler error to JSON-RPC error using centralized mapping
792 let response =
793 JsonRpcResponse::error_response(e.into_jsonrpc_error(), request.id);
794 self.send_response(response).await?;
795 }
796 }
797 } else {
798 // No handler configured - elicitation not supported
799 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
800 code: -32601,
801 message: "Elicitation not supported - no handler registered".to_string(),
802 data: None,
803 };
804 let response = JsonRpcResponse::error_response(error, request.id);
805 self.send_response(response).await?;
806 }
807 }
808 _ => {
809 // Unknown method
810 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
811 code: -32601,
812 message: format!("Method not found: {}", request.method),
813 data: None,
814 };
815 let response = JsonRpcResponse::error_response(error, request.id);
816 self.send_response(response).await?;
817 }
818 }
819 Ok(())
820 }
821
822 /// Handle server-initiated notifications
823 ///
824 /// Routes notifications to appropriate handlers based on method name.
825 /// MCP defines several notification types that servers can send to clients:
826 ///
827 /// - `notifications/progress` - Progress updates for long-running operations
828 /// - `notifications/message` - Log messages from server
829 /// - `notifications/resources/updated` - Resource content changed
830 /// - `notifications/resources/list_changed` - Resource list changed
831 async fn handle_notification(&self, notification: JsonRpcNotification) -> Result<()> {
832 match notification.method.as_str() {
833 "notifications/progress" => {
834 // Progress tracking has been removed
835 tracing::debug!(
836 "Received progress notification - progress tracking has been removed"
837 );
838 }
839
840 "notifications/message" => {
841 // Route to log handler
842 let handler_opt = self.inner.handlers.lock().get_log_handler();
843
844 if let Some(handler) = handler_opt {
845 // Parse log message
846 let log: crate::handlers::LoggingNotification = serde_json::from_value(
847 notification.params.unwrap_or(serde_json::Value::Null),
848 )
849 .map_err(|e| Error::internal(format!("Invalid log notification: {}", e)))?;
850
851 // Call handler
852 if let Err(e) = handler.handle_log(log).await {
853 tracing::error!("Log handler error: {}", e);
854 }
855 } else {
856 tracing::debug!("Received log notification but no handler registered");
857 }
858 }
859
860 "notifications/resources/updated" => {
861 // Route to resource update handler
862 let handler_opt = self.inner.handlers.lock().get_resource_update_handler();
863
864 if let Some(handler) = handler_opt {
865 // Parse resource update notification
866 let update: crate::handlers::ResourceUpdatedNotification =
867 serde_json::from_value(
868 notification.params.unwrap_or(serde_json::Value::Null),
869 )
870 .map_err(|e| {
871 Error::internal(format!("Invalid resource update notification: {}", e))
872 })?;
873
874 // Call handler
875 if let Err(e) = handler.handle_resource_update(update).await {
876 tracing::error!("Resource update handler error: {}", e);
877 }
878 } else {
879 tracing::debug!(
880 "Received resource update notification but no handler registered"
881 );
882 }
883 }
884
885 "notifications/resources/list_changed" => {
886 // Route to resource list changed handler
887 let handler_opt = self
888 .inner
889 .handlers
890 .lock()
891 .get_resource_list_changed_handler();
892
893 if let Some(handler) = handler_opt {
894 if let Err(e) = handler.handle_resource_list_changed().await {
895 tracing::error!("Resource list changed handler error: {}", e);
896 }
897 } else {
898 tracing::debug!(
899 "Resource list changed notification received (no handler registered)"
900 );
901 }
902 }
903
904 "notifications/prompts/list_changed" => {
905 // Route to prompt list changed handler
906 let handler_opt = self.inner.handlers.lock().get_prompt_list_changed_handler();
907
908 if let Some(handler) = handler_opt {
909 if let Err(e) = handler.handle_prompt_list_changed().await {
910 tracing::error!("Prompt list changed handler error: {}", e);
911 }
912 } else {
913 tracing::debug!(
914 "Prompt list changed notification received (no handler registered)"
915 );
916 }
917 }
918
919 "notifications/tools/list_changed" => {
920 // Route to tool list changed handler
921 let handler_opt = self.inner.handlers.lock().get_tool_list_changed_handler();
922
923 if let Some(handler) = handler_opt {
924 if let Err(e) = handler.handle_tool_list_changed().await {
925 tracing::error!("Tool list changed handler error: {}", e);
926 }
927 } else {
928 tracing::debug!(
929 "Tool list changed notification received (no handler registered)"
930 );
931 }
932 }
933
934 "notifications/cancelled" => {
935 // Route to cancellation handler
936 let handler_opt = self.inner.handlers.lock().get_cancellation_handler();
937
938 if let Some(handler) = handler_opt {
939 // Parse cancellation notification
940 let cancellation: crate::handlers::CancelledNotification =
941 serde_json::from_value(
942 notification.params.unwrap_or(serde_json::Value::Null),
943 )
944 .map_err(|e| {
945 Error::internal(format!("Invalid cancellation notification: {}", e))
946 })?;
947
948 // Call handler
949 if let Err(e) = handler.handle_cancellation(cancellation).await {
950 tracing::error!("Cancellation handler error: {}", e);
951 }
952 } else {
953 tracing::debug!("Cancellation notification received (no handler registered)");
954 }
955 }
956
957 _ => {
958 // Unknown notification type
959 tracing::debug!("Received unknown notification: {}", notification.method);
960 }
961 }
962
963 Ok(())
964 }
965
966 async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
967 tracing::info!(
968 "📤 [send_response] Sending JSON-RPC response: id={:?}",
969 response.id
970 );
971
972 let payload = serde_json::to_vec(&response).map_err(|e| {
973 tracing::error!("❌ [send_response] Failed to serialize response: {}", e);
974 Error::internal(format!("Failed to serialize response: {}", e))
975 })?;
976
977 tracing::debug!(
978 "📤 [send_response] Response payload: {} bytes",
979 payload.len()
980 );
981 tracing::debug!(
982 "📤 [send_response] Response JSON: {}",
983 String::from_utf8_lossy(&payload)
984 );
985
986 let message = TransportMessage::new(
987 turbomcp_protocol::MessageId::from("response".to_string()),
988 payload.into(),
989 );
990
991 self.inner
992 .protocol
993 .transport()
994 .send(message)
995 .await
996 .map_err(|e| {
997 tracing::error!("❌ [send_response] Transport send failed: {}", e);
998 Error::transport(format!("Failed to send response: {}", e))
999 })?;
1000
1001 tracing::info!(
1002 "✅ [send_response] Response sent successfully: id={:?}",
1003 response.id
1004 );
1005 Ok(())
1006 }
1007
1008 /// Initialize the connection with the MCP server
1009 ///
1010 /// Performs the initialization handshake with the server, negotiating capabilities
1011 /// and establishing the protocol version. This method must be called before
1012 /// any other operations can be performed.
1013 ///
1014 /// # Returns
1015 ///
1016 /// Returns an `InitializeResult` containing server information and negotiated capabilities.
1017 ///
1018 /// # Errors
1019 ///
1020 /// Returns an error if:
1021 /// - The transport connection fails
1022 /// - The server rejects the initialization request
1023 /// - Protocol negotiation fails
1024 ///
1025 /// # Examples
1026 ///
1027 /// ```rust,no_run
1028 /// # use turbomcp_client::Client;
1029 /// # use turbomcp_transport::stdio::StdioTransport;
1030 /// # async fn example() -> turbomcp_protocol::Result<()> {
1031 /// let mut client = Client::new(StdioTransport::new());
1032 ///
1033 /// let result = client.initialize().await?;
1034 /// println!("Server: {} v{}", result.server_info.name, result.server_info.version);
1035 /// # Ok(())
1036 /// # }
1037 /// ```
1038 pub async fn initialize(&self) -> Result<InitializeResult> {
1039 // Auto-connect transport if not already connected
1040 // This provides consistent DX across all transports (Stdio, TCP, HTTP, WebSocket, Unix)
1041 let transport = self.inner.protocol.transport();
1042 let transport_state = transport.state().await;
1043 if !matches!(
1044 transport_state,
1045 turbomcp_transport::TransportState::Connected
1046 ) {
1047 tracing::debug!(
1048 "Auto-connecting transport (current state: {:?})",
1049 transport_state
1050 );
1051 transport
1052 .connect()
1053 .await
1054 .map_err(|e| Error::transport(format!("Failed to connect transport: {}", e)))?;
1055 tracing::info!("Transport connected successfully");
1056 }
1057
1058 // Build client capabilities based on registered handlers (automatic detection)
1059 let mut client_caps = ProtocolClientCapabilities::default();
1060
1061 // Detect sampling capability from handler
1062 if let Some(sampling_caps) = self.get_sampling_capabilities() {
1063 client_caps.sampling = Some(sampling_caps);
1064 }
1065
1066 // Detect elicitation capability from handler
1067 if let Some(elicitation_caps) = self.get_elicitation_capabilities() {
1068 client_caps.elicitation = Some(elicitation_caps);
1069 }
1070
1071 // Detect roots capability from handler
1072 if let Some(roots_caps) = self.get_roots_capabilities() {
1073 client_caps.roots = Some(roots_caps);
1074 }
1075
1076 // Send MCP initialization request
1077 let request = InitializeRequest {
1078 protocol_version: PROTOCOL_VERSION.to_string(),
1079 capabilities: client_caps,
1080 client_info: turbomcp_protocol::types::Implementation {
1081 name: "turbomcp-client".to_string(),
1082 version: env!("CARGO_PKG_VERSION").to_string(),
1083 title: Some("TurboMCP Client".to_string()),
1084 ..Default::default()
1085 },
1086 _meta: None,
1087 };
1088
1089 let protocol_response: ProtocolInitializeResult = self
1090 .inner
1091 .protocol
1092 .request("initialize", Some(serde_json::to_value(request)?))
1093 .await?;
1094
1095 // AtomicBool: lock-free store with Ordering::Relaxed
1096 self.inner.initialized.store(true, Ordering::Relaxed);
1097
1098 // Send initialized notification
1099 self.inner
1100 .protocol
1101 .notify("notifications/initialized", None)
1102 .await?;
1103
1104 // Convert protocol response to client response type
1105 Ok(InitializeResult {
1106 server_info: protocol_response.server_info,
1107 server_capabilities: protocol_response.capabilities,
1108 })
1109 }
1110
1111 /// Subscribe to resource change notifications
1112 ///
1113 /// Registers interest in receiving notifications when the specified
1114 /// resource changes. The server will send notifications when the
1115 /// resource is modified, created, or deleted.
1116 ///
1117 /// # Arguments
1118 ///
1119 /// * `uri` - The URI of the resource to monitor
1120 ///
1121 /// # Returns
1122 ///
1123 /// Returns `EmptyResult` on successful subscription.
1124 ///
1125 /// # Errors
1126 ///
1127 /// Returns an error if:
1128 /// - The client is not initialized
1129 /// - The URI is invalid or empty
1130 /// - The server doesn't support subscriptions
1131 /// - The request fails
1132 ///
1133 /// # Examples
1134 ///
1135 /// ```rust,no_run
1136 /// # use turbomcp_client::Client;
1137 /// # use turbomcp_transport::stdio::StdioTransport;
1138 /// # async fn example() -> turbomcp_protocol::Result<()> {
1139 /// let mut client = Client::new(StdioTransport::new());
1140 /// client.initialize().await?;
1141 ///
1142 /// // Subscribe to file changes
1143 /// client.subscribe("file:///watch/directory").await?;
1144 /// println!("Subscribed to resource changes");
1145 /// # Ok(())
1146 /// # }
1147 /// ```
1148 pub async fn subscribe(&self, uri: &str) -> Result<EmptyResult> {
1149 if !self.inner.initialized.load(Ordering::Relaxed) {
1150 return Err(Error::invalid_request("Client not initialized"));
1151 }
1152
1153 if uri.is_empty() {
1154 return Err(Error::invalid_request("Subscription URI cannot be empty"));
1155 }
1156
1157 // Send resources/subscribe request
1158 let request = SubscribeRequest {
1159 uri: uri.to_string(),
1160 };
1161
1162 self.inner
1163 .protocol
1164 .request(
1165 "resources/subscribe",
1166 Some(serde_json::to_value(request).map_err(|e| {
1167 Error::internal(format!("Failed to serialize subscribe request: {}", e))
1168 })?),
1169 )
1170 .await
1171 }
1172
1173 /// Unsubscribe from resource change notifications
1174 ///
1175 /// Cancels a previous subscription to resource changes. After unsubscribing,
1176 /// the client will no longer receive notifications for the specified resource.
1177 ///
1178 /// # Arguments
1179 ///
1180 /// * `uri` - The URI of the resource to stop monitoring
1181 ///
1182 /// # Returns
1183 ///
1184 /// Returns `EmptyResult` on successful unsubscription.
1185 ///
1186 /// # Errors
1187 ///
1188 /// Returns an error if:
1189 /// - The client is not initialized
1190 /// - The URI is invalid or empty
1191 /// - No active subscription exists for the URI
1192 /// - The request fails
1193 ///
1194 /// # Examples
1195 ///
1196 /// ```rust,no_run
1197 /// # use turbomcp_client::Client;
1198 /// # use turbomcp_transport::stdio::StdioTransport;
1199 /// # async fn example() -> turbomcp_protocol::Result<()> {
1200 /// let mut client = Client::new(StdioTransport::new());
1201 /// client.initialize().await?;
1202 ///
1203 /// // Unsubscribe from file changes
1204 /// client.unsubscribe("file:///watch/directory").await?;
1205 /// println!("Unsubscribed from resource changes");
1206 /// # Ok(())
1207 /// # }
1208 /// ```
1209 pub async fn unsubscribe(&self, uri: &str) -> Result<EmptyResult> {
1210 if !self.inner.initialized.load(Ordering::Relaxed) {
1211 return Err(Error::invalid_request("Client not initialized"));
1212 }
1213
1214 if uri.is_empty() {
1215 return Err(Error::invalid_request("Unsubscription URI cannot be empty"));
1216 }
1217
1218 // Send resources/unsubscribe request
1219 let request = UnsubscribeRequest {
1220 uri: uri.to_string(),
1221 };
1222
1223 self.inner
1224 .protocol
1225 .request(
1226 "resources/unsubscribe",
1227 Some(serde_json::to_value(request).map_err(|e| {
1228 Error::internal(format!("Failed to serialize unsubscribe request: {}", e))
1229 })?),
1230 )
1231 .await
1232 }
1233
1234 /// Get the client's capabilities configuration
1235 #[must_use]
1236 pub fn capabilities(&self) -> &ClientCapabilities {
1237 &self.inner.capabilities
1238 }
1239
1240 // ============================================================================
1241 // Tasks API Methods (MCP 2025-11-25 Draft - SEP-1686)
1242 // ============================================================================
1243
1244 /// Retrieve the status of a task (tasks/get)
1245 ///
1246 /// Polls the server for the current status of a specific task.
1247 ///
1248 /// # Arguments
1249 ///
1250 /// * `task_id` - The ID of the task to query
1251 ///
1252 /// # Returns
1253 ///
1254 /// Returns the current `Task` state including status, timestamps, and messages.
1255 #[cfg(feature = "experimental-tasks")]
1256 pub async fn get_task(&self, task_id: &str) -> Result<Task> {
1257 let request = GetTaskRequest {
1258 task_id: task_id.to_string(),
1259 };
1260
1261 self.inner
1262 .protocol
1263 .request(
1264 "tasks/get",
1265 Some(serde_json::to_value(request).map_err(|e| {
1266 Error::internal(format!("Failed to serialize get_task request: {}", e))
1267 })?),
1268 )
1269 .await
1270 }
1271
1272 /// Cancel a running task (tasks/cancel)
1273 ///
1274 /// Attempts to cancel a task execution. This is a best-effort operation.
1275 ///
1276 /// # Arguments
1277 ///
1278 /// * `task_id` - The ID of the task to cancel
1279 ///
1280 /// # Returns
1281 ///
1282 /// Returns the updated `Task` state (typically with status "cancelled").
1283 #[cfg(feature = "experimental-tasks")]
1284 pub async fn cancel_task(&self, task_id: &str) -> Result<Task> {
1285 let request = CancelTaskRequest {
1286 task_id: task_id.to_string(),
1287 };
1288
1289 self.inner
1290 .protocol
1291 .request(
1292 "tasks/cancel",
1293 Some(serde_json::to_value(request).map_err(|e| {
1294 Error::internal(format!("Failed to serialize cancel_task request: {}", e))
1295 })?),
1296 )
1297 .await
1298 }
1299
1300 /// List all tasks (tasks/list)
1301 ///
1302 /// Retrieves a paginated list of tasks known to the server.
1303 ///
1304 /// # Arguments
1305 ///
1306 /// * `cursor` - Optional pagination cursor from a previous response
1307 /// * `limit` - Optional maximum number of tasks to return
1308 ///
1309 /// # Returns
1310 ///
1311 /// Returns a `ListTasksResult` containing the list of tasks and next cursor.
1312 #[cfg(feature = "experimental-tasks")]
1313 pub async fn list_tasks(
1314 &self,
1315 cursor: Option<String>,
1316 limit: Option<usize>,
1317 ) -> Result<ListTasksResult> {
1318 let request = ListTasksRequest { cursor, limit };
1319
1320 self.inner
1321 .protocol
1322 .request(
1323 "tasks/list",
1324 Some(serde_json::to_value(request).map_err(|e| {
1325 Error::internal(format!("Failed to serialize list_tasks request: {}", e))
1326 })?),
1327 )
1328 .await
1329 }
1330
1331 /// Retrieve the result of a completed task (tasks/result)
1332 ///
1333 /// Blocks until the task reaches a terminal state (completed, failed, or cancelled),
1334 /// then returns the operation result.
1335 ///
1336 /// # Arguments
1337 ///
1338 /// * `task_id` - The ID of the task to retrieve results for
1339 ///
1340 /// # Returns
1341 ///
1342 /// Returns a `GetTaskPayloadResult` containing the operation result (e.g. CallToolResult).
1343 #[cfg(feature = "experimental-tasks")]
1344 pub async fn get_task_result(&self, task_id: &str) -> Result<GetTaskPayloadResult> {
1345 let request = GetTaskPayloadRequest {
1346 task_id: task_id.to_string(),
1347 };
1348
1349 self.inner
1350 .protocol
1351 .request(
1352 "tasks/result",
1353 Some(serde_json::to_value(request).map_err(|e| {
1354 Error::internal(format!(
1355 "Failed to serialize get_task_result request: {}",
1356 e
1357 ))
1358 })?),
1359 )
1360 .await
1361 }
1362
1363 // Note: Capability detection methods (has_*_handler, get_*_capabilities)
1364 // are defined in their respective operation modules:
1365 // - sampling.rs: has_sampling_handler, get_sampling_capabilities
1366 // - handlers.rs: has_elicitation_handler, has_roots_handler
1367 //
1368 // Additional capability getters for elicitation and roots added below
1369 // since they're used during initialization
1370
1371 /// Get elicitation capabilities if handler is registered
1372 /// Automatically detects capability based on registered handler
1373 fn get_elicitation_capabilities(
1374 &self,
1375 ) -> Option<turbomcp_protocol::types::ElicitationCapabilities> {
1376 if self.has_elicitation_handler() {
1377 // Currently returns default capabilities. In the future, schema_validation support
1378 // could be detected from handler traits by adding a HasSchemaValidation marker trait
1379 // that handlers could implement. For now, handlers validate schemas themselves.
1380 Some(turbomcp_protocol::types::ElicitationCapabilities::default())
1381 } else {
1382 None
1383 }
1384 }
1385
1386 /// Get roots capabilities if handler is registered
1387 fn get_roots_capabilities(&self) -> Option<turbomcp_protocol::types::RootsCapabilities> {
1388 if self.has_roots_handler() {
1389 // Roots capabilities indicate whether list can change
1390 Some(turbomcp_protocol::types::RootsCapabilities {
1391 list_changed: Some(true), // Support dynamic roots by default
1392 })
1393 } else {
1394 None
1395 }
1396 }
1397}