turbomcp_client/client/core.rs
1//! Core Client implementation for MCP communication
2//!
3//! This module contains the main `Client<T>` struct and its implementation,
4//! providing the core MCP client functionality including:
5//!
6//! - Connection initialization and lifecycle management
7//! - Message processing and bidirectional communication
8//! - MCP operation support (tools, prompts, resources, sampling, etc.)
9//! - Plugin middleware integration
10//! - Handler registration and management
11//!
12//! # Architecture
13//!
14//! `Client<T>` is implemented as a cheaply-cloneable Arc wrapper with interior
15//! mutability (same pattern as reqwest and AWS SDK):
16//!
17//! - **AtomicBool** for initialized flag (lock-free)
18//! - **Arc<Mutex<...>>** for handlers/plugins (infrequent mutation)
19//! - **`Arc<ClientInner<T>>`** for cheap cloning
20
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::{Arc, Mutex as StdMutex};
23
24use tokio::sync::Semaphore;
25
26use turbomcp_protocol::jsonrpc::*;
27#[cfg(feature = "mcp-tasks")]
28use turbomcp_protocol::types::tasks::*;
29use turbomcp_protocol::types::{
30 ClientCapabilities as ProtocolClientCapabilities, InitializeResult as ProtocolInitializeResult,
31 *,
32};
33use turbomcp_protocol::{Error, PROTOCOL_VERSION, Result};
34use turbomcp_transport::{Transport, TransportMessage};
35
36use super::config::InitializeResult;
37use super::protocol::ProtocolClient;
38use crate::{
39 ClientCapabilities,
40 handlers::{HandlerError, HandlerRegistry},
41 sampling::SamplingHandler,
42};
43
44/// Inner client state with interior mutability
45///
46/// This structure contains the actual client state and is wrapped in Arc<...>
47/// to enable cheap cloning (same pattern as reqwest and AWS SDK).
48pub(super) struct ClientInner<T: Transport + 'static> {
49 /// Protocol client for low-level communication
50 pub(super) protocol: ProtocolClient<T>,
51
52 /// Client capabilities (immutable after construction)
53 pub(super) capabilities: ClientCapabilities,
54
55 /// Initialization state (lock-free atomic boolean)
56 pub(super) initialized: AtomicBool,
57
58 /// Optional sampling handler (mutex for dynamic updates)
59 pub(super) sampling_handler: Arc<StdMutex<Option<Arc<dyn SamplingHandler>>>>,
60
61 /// Handler registry for bidirectional communication (mutex for registration)
62 pub(super) handlers: Arc<StdMutex<HandlerRegistry>>,
63
64 /// Plugin registry for middleware (tokio mutex - holds across await)
65 pub(super) plugin_registry: Arc<tokio::sync::Mutex<crate::plugins::PluginRegistry>>,
66
67 /// ✅ Semaphore for bounded concurrency of request/notification handlers
68 /// Limits concurrent server-initiated request handlers to prevent resource exhaustion
69 pub(super) handler_semaphore: Arc<Semaphore>,
70}
71
72/// The core MCP client implementation
73///
74/// Client provides a comprehensive interface for communicating with MCP servers,
75/// supporting all protocol features including tools, prompts, resources, sampling,
76/// elicitation, and bidirectional communication patterns.
77///
78/// # Clone Pattern
79///
80/// `Client<T>` is cheaply cloneable via Arc (same pattern as reqwest and AWS SDK).
81/// All clones share the same underlying connection and state:
82///
83/// ```rust,no_run
84/// use turbomcp_client::Client;
85/// use turbomcp_transport::stdio::StdioTransport;
86///
87/// # async fn example() -> turbomcp_protocol::Result<()> {
88/// let client = Client::new(StdioTransport::new());
89/// client.initialize().await?;
90///
91/// // Cheap clone - shares same connection
92/// let client2 = client.clone();
93/// tokio::spawn(async move {
94/// client2.list_tools().await.ok();
95/// });
96/// # Ok(())
97/// # }
98/// ```
99///
100/// The client must be initialized before use by calling `initialize()` to perform
101/// the MCP handshake and capability negotiation.
102///
103/// # Features
104///
105/// - **Protocol Compliance**: Full MCP 2025-06-18 specification support
106/// - **Bidirectional Communication**: Server-initiated requests and client responses
107/// - **Plugin Middleware**: Extensible request/response processing
108/// - **Handler Registry**: Callbacks for server-initiated operations
109/// - **Connection Management**: Robust error handling and recovery
110/// - **Type Safety**: Compile-time guarantees for MCP message types
111/// - **Cheap Cloning**: Arc-based sharing like reqwest/AWS SDK
112///
113/// # Examples
114///
115/// ```rust,no_run
116/// use turbomcp_client::Client;
117/// use turbomcp_transport::stdio::StdioTransport;
118/// use std::collections::HashMap;
119///
120/// # async fn example() -> turbomcp_protocol::Result<()> {
121/// // Create and initialize client (no mut needed!)
122/// let client = Client::new(StdioTransport::new());
123/// let init_result = client.initialize().await?;
124/// println!("Connected to: {}", init_result.server_info.name);
125///
126/// // Use MCP operations
127/// let tools = client.list_tools().await?;
128/// let mut args = HashMap::new();
129/// args.insert("input".to_string(), serde_json::json!("test"));
130/// let result = client.call_tool("my_tool", Some(args)).await?;
131/// # Ok(())
132/// # }
133/// ```
134pub struct Client<T: Transport + 'static> {
135 pub(super) inner: Arc<ClientInner<T>>,
136}
137
138/// Clone implementation via Arc (same pattern as reqwest/AWS SDK)
139///
140/// Cloning a Client is cheap (just an Arc clone) and all clones share
141/// the same underlying connection and state.
142impl<T: Transport + 'static> Clone for Client<T> {
143 fn clone(&self) -> Self {
144 Self {
145 inner: Arc::clone(&self.inner),
146 }
147 }
148}
149
150impl<T: Transport + 'static> Drop for ClientInner<T> {
151 fn drop(&mut self) {
152 // Best-effort cleanup if shutdown() wasn't called
153 // This is a safety net, but applications SHOULD call shutdown() explicitly
154
155 // Single warning to catch attention
156 tracing::warn!(
157 "MCP Client dropped without explicit shutdown() - call client.shutdown().await for clean resource cleanup"
158 );
159
160 // Details at debug level to avoid noise in production
161 tracing::debug!(" 📘 RECOMMENDATION: Call client.shutdown().await before dropping");
162 tracing::debug!(
163 " 🐛 IMPACT: Background tasks (especially WebSocket reconnection) may continue running"
164 );
165 tracing::debug!(" 💡 See client shutdown documentation for best practices");
166
167 // We can shutdown the dispatcher (it's synchronous)
168 self.protocol.dispatcher().shutdown();
169 tracing::debug!(" ✅ Message dispatcher stopped");
170
171 // ⚠️ LIMITATION: Cannot call transport.disconnect() from Drop because it's async
172 // This means:
173 // - WebSocket: Close frame not sent, reconnection tasks keep running
174 // - HTTP: Connections may not close cleanly
175 // - All transports: Background tasks may be orphaned
176 tracing::debug!(" ❌ Transport NOT disconnected (Drop cannot call async methods)");
177 tracing::debug!(" ⚠️ WebSocket reconnection and other background tasks may continue");
178 }
179}
180
181impl<T: Transport + 'static> Client<T> {
182 /// Create a new client with the specified transport
183 ///
184 /// Creates a new MCP client instance with default capabilities.
185 /// The client must be initialized before use by calling `initialize()`.
186 ///
187 /// # Arguments
188 ///
189 /// * `transport` - The transport implementation to use for communication
190 ///
191 /// # Examples
192 ///
193 /// ```rust,no_run
194 /// use turbomcp_client::Client;
195 /// use turbomcp_transport::stdio::StdioTransport;
196 ///
197 /// let transport = StdioTransport::new();
198 /// let client = Client::new(transport);
199 /// ```
200 pub fn new(transport: T) -> Self {
201 let capabilities = ClientCapabilities::default();
202 let client = Self {
203 inner: Arc::new(ClientInner {
204 protocol: ProtocolClient::new(transport),
205 capabilities: capabilities.clone(),
206 initialized: AtomicBool::new(false),
207 sampling_handler: Arc::new(StdMutex::new(None)),
208 handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
209 plugin_registry: Arc::new(tokio::sync::Mutex::new(
210 crate::plugins::PluginRegistry::new(),
211 )),
212 handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), // ✅ Configurable concurrent handlers
213 }),
214 };
215
216 // Register dispatcher handlers for bidirectional communication
217 client.register_dispatcher_handlers();
218
219 client
220 }
221
222 /// Create a new client with custom capabilities
223 ///
224 /// # Arguments
225 ///
226 /// * `transport` - The transport implementation to use
227 /// * `capabilities` - The client capabilities to negotiate
228 ///
229 /// # Examples
230 ///
231 /// ```rust,no_run
232 /// use turbomcp_client::{Client, ClientCapabilities};
233 /// use turbomcp_transport::stdio::StdioTransport;
234 ///
235 /// let capabilities = ClientCapabilities {
236 /// tools: true,
237 /// prompts: true,
238 /// resources: false,
239 /// sampling: false,
240 /// max_concurrent_handlers: 100,
241 /// };
242 ///
243 /// let transport = StdioTransport::new();
244 /// let client = Client::with_capabilities(transport, capabilities);
245 /// ```
246 pub fn with_capabilities(transport: T, capabilities: ClientCapabilities) -> Self {
247 let client = Self {
248 inner: Arc::new(ClientInner {
249 protocol: ProtocolClient::new(transport),
250 capabilities: capabilities.clone(),
251 initialized: AtomicBool::new(false),
252 sampling_handler: Arc::new(StdMutex::new(None)),
253 handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
254 plugin_registry: Arc::new(tokio::sync::Mutex::new(
255 crate::plugins::PluginRegistry::new(),
256 )),
257 handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), // ✅ Configurable concurrent handlers
258 }),
259 };
260
261 // Register dispatcher handlers for bidirectional communication
262 client.register_dispatcher_handlers();
263
264 client
265 }
266
267 /// Shutdown the client and clean up all resources
268 ///
269 /// This method performs a **graceful shutdown** of the MCP client by:
270 /// 1. Stopping the message dispatcher background task
271 /// 2. Disconnecting the transport (closes connection, stops background tasks)
272 ///
273 /// **CRITICAL**: After calling `shutdown()`, the client can no longer be used.
274 ///
275 /// # Why This Method Exists
276 ///
277 /// The `Drop` implementation cannot call async methods like `transport.disconnect()`,
278 /// which is required for proper cleanup of WebSocket connections and background tasks.
279 /// Without calling `shutdown()`, WebSocket reconnection tasks will continue running.
280 ///
281 /// # Best Practices
282 ///
283 /// - **Always call `shutdown()` before dropping** for clean resource cleanup
284 /// - For applications: call in signal handlers (`SIGINT`, `SIGTERM`)
285 /// - For tests: call in cleanup/teardown
286 /// - If forgotten, Drop will log warnings and do best-effort cleanup
287 ///
288 /// # Examples
289 ///
290 /// ```rust,no_run
291 /// # use turbomcp_client::Client;
292 /// # use turbomcp_transport::stdio::StdioTransport;
293 /// # async fn example() -> turbomcp_protocol::Result<()> {
294 /// let client = Client::new(StdioTransport::new());
295 /// client.initialize().await?;
296 ///
297 /// // Use client...
298 /// let _tools = client.list_tools().await?;
299 ///
300 /// // ✅ Clean shutdown
301 /// client.shutdown().await?;
302 /// # Ok(())
303 /// # }
304 /// ```
305 pub async fn shutdown(&self) -> Result<()> {
306 tracing::info!("🛑 Shutting down MCP client");
307
308 // 1. Shutdown message dispatcher
309 self.inner.protocol.dispatcher().shutdown();
310 tracing::debug!("✅ Message dispatcher stopped");
311
312 // 2. Disconnect transport (WebSocket: stops reconnection, HTTP: closes connections)
313 match self.inner.protocol.transport().disconnect().await {
314 Ok(()) => {
315 tracing::info!("✅ Transport disconnected successfully");
316 }
317 Err(e) => {
318 tracing::warn!("Transport disconnect error (may already be closed): {}", e);
319 }
320 }
321
322 tracing::info!("✅ MCP client shutdown complete");
323 Ok(())
324 }
325}
326
327// ============================================================================
328// HTTP-Specific Convenience Constructors (Feature-Gated)
329// ============================================================================
330
331#[cfg(feature = "http")]
332impl Client<turbomcp_transport::streamable_http_client::StreamableHttpClientTransport> {
333 /// Connect to an HTTP MCP server (convenience method)
334 ///
335 /// This is a convenient one-liner alternative to manual configuration.
336 /// Creates an HTTP client, connects, and initializes in one call.
337 ///
338 /// # Arguments
339 ///
340 /// * `url` - The base URL of the MCP server (e.g., "http://localhost:8080")
341 ///
342 /// # Returns
343 ///
344 /// Returns an initialized `Client` ready to use.
345 ///
346 /// # Errors
347 ///
348 /// Returns an error if:
349 /// - The URL is invalid
350 /// - Connection to the server fails
351 /// - Initialization handshake fails
352 ///
353 /// # Examples
354 ///
355 /// ```rust,no_run
356 /// use turbomcp_client::Client;
357 ///
358 /// # async fn example() -> turbomcp_protocol::Result<()> {
359 /// // Convenient one-liner - balanced with server DX
360 /// let client = Client::connect_http("http://localhost:8080").await?;
361 ///
362 /// // Now use it directly
363 /// let tools = client.list_tools().await?;
364 /// # Ok(())
365 /// # }
366 /// ```
367 ///
368 /// Compare to verbose approach (10+ lines):
369 /// ```rust,no_run
370 /// use turbomcp_client::Client;
371 /// use turbomcp_transport::streamable_http_client::{
372 /// StreamableHttpClientConfig, StreamableHttpClientTransport
373 /// };
374 ///
375 /// # async fn example() -> turbomcp_protocol::Result<()> {
376 /// let config = StreamableHttpClientConfig {
377 /// base_url: "http://localhost:8080".to_string(),
378 /// ..Default::default()
379 /// };
380 /// let transport = StreamableHttpClientTransport::new(config);
381 /// let client = Client::new(transport);
382 /// client.initialize().await?;
383 /// # Ok(())
384 /// # }
385 /// ```
386 pub async fn connect_http(url: impl Into<String>) -> Result<Self> {
387 use turbomcp_transport::streamable_http_client::{
388 StreamableHttpClientConfig, StreamableHttpClientTransport,
389 };
390
391 let config = StreamableHttpClientConfig {
392 base_url: url.into(),
393 ..Default::default()
394 };
395
396 let transport = StreamableHttpClientTransport::new(config);
397 let client = Self::new(transport);
398
399 // Initialize connection immediately
400 client.initialize().await?;
401
402 Ok(client)
403 }
404
405 /// Connect to an HTTP MCP server with custom configuration
406 ///
407 /// Provides more control than `connect_http()` while still being ergonomic.
408 ///
409 /// # Arguments
410 ///
411 /// * `url` - The base URL of the MCP server
412 /// * `config_fn` - Function to customize the configuration
413 ///
414 /// # Examples
415 ///
416 /// ```rust,no_run
417 /// use turbomcp_client::Client;
418 /// use std::time::Duration;
419 ///
420 /// # async fn example() -> turbomcp_protocol::Result<()> {
421 /// let client = Client::connect_http_with("http://localhost:8080", |config| {
422 /// config.timeout = Duration::from_secs(60);
423 /// config.endpoint_path = "/api/mcp".to_string();
424 /// }).await?;
425 /// # Ok(())
426 /// # }
427 /// ```
428 pub async fn connect_http_with<F>(url: impl Into<String>, config_fn: F) -> Result<Self>
429 where
430 F: FnOnce(&mut turbomcp_transport::streamable_http_client::StreamableHttpClientConfig),
431 {
432 use turbomcp_transport::streamable_http_client::{
433 StreamableHttpClientConfig, StreamableHttpClientTransport,
434 };
435
436 let mut config = StreamableHttpClientConfig {
437 base_url: url.into(),
438 ..Default::default()
439 };
440
441 config_fn(&mut config);
442
443 let transport = StreamableHttpClientTransport::new(config);
444 let client = Self::new(transport);
445
446 client.initialize().await?;
447
448 Ok(client)
449 }
450}
451
452// ============================================================================
453// TCP-Specific Convenience Constructors (Feature-Gated)
454// ============================================================================
455
456#[cfg(feature = "tcp")]
457impl Client<turbomcp_transport::tcp::TcpTransport> {
458 /// Connect to a TCP MCP server (convenience method)
459 ///
460 /// Convenient one-liner for TCP connections - balanced DX.
461 ///
462 /// # Arguments
463 ///
464 /// * `addr` - Server address (e.g., "127.0.0.1:8765" or localhost:8765")
465 ///
466 /// # Returns
467 ///
468 /// Returns an initialized `Client` ready to use.
469 ///
470 /// # Examples
471 ///
472 /// ```rust,no_run
473 /// # #[cfg(feature = "tcp")]
474 /// use turbomcp_client::Client;
475 ///
476 /// # async fn example() -> turbomcp_protocol::Result<()> {
477 /// let client = Client::connect_tcp("127.0.0.1:8765").await?;
478 /// let tools = client.list_tools().await?;
479 /// # Ok(())
480 /// # }
481 /// ```
482 pub async fn connect_tcp(addr: impl AsRef<str>) -> Result<Self> {
483 use std::net::SocketAddr;
484 use turbomcp_transport::tcp::TcpTransport;
485
486 let server_addr: SocketAddr = addr
487 .as_ref()
488 .parse()
489 .map_err(|e| Error::bad_request(format!("Invalid address: {}", e)))?;
490
491 // Client binds to 0.0.0.0:0 (any available port)
492 let bind_addr: SocketAddr = if server_addr.is_ipv6() {
493 "[::]:0".parse().unwrap()
494 } else {
495 "0.0.0.0:0".parse().unwrap()
496 };
497
498 let transport = TcpTransport::new_client(bind_addr, server_addr);
499 let client = Self::new(transport);
500
501 client.initialize().await?;
502
503 Ok(client)
504 }
505}
506
507// ============================================================================
508// Unix Socket-Specific Convenience Constructors (Feature-Gated)
509// ============================================================================
510
511#[cfg(all(unix, feature = "unix"))]
512impl Client<turbomcp_transport::unix::UnixTransport> {
513 /// Connect to a Unix socket MCP server (convenience method)
514 ///
515 /// Convenient one-liner for Unix socket IPC - balanced DX.
516 ///
517 /// # Arguments
518 ///
519 /// * `path` - Socket file path (e.g., "/tmp/mcp.sock")
520 ///
521 /// # Returns
522 ///
523 /// Returns an initialized `Client` ready to use.
524 ///
525 /// # Examples
526 ///
527 /// ```rust,no_run
528 /// # #[cfg(all(unix, feature = "unix"))]
529 /// use turbomcp_client::Client;
530 ///
531 /// # async fn example() -> turbomcp_protocol::Result<()> {
532 /// let client = Client::connect_unix("/tmp/mcp.sock").await?;
533 /// let tools = client.list_tools().await?;
534 /// # Ok(())
535 /// # }
536 /// ```
537 pub async fn connect_unix(path: impl Into<std::path::PathBuf>) -> Result<Self> {
538 use turbomcp_transport::unix::UnixTransport;
539
540 let transport = UnixTransport::new_client(path.into());
541 let client = Self::new(transport);
542
543 client.initialize().await?;
544
545 Ok(client)
546 }
547}
548
549impl<T: Transport + 'static> Client<T> {
550 /// Register message handlers with the dispatcher
551 ///
552 /// This method sets up the callbacks that handle server-initiated requests
553 /// and notifications. The dispatcher's background task routes incoming
554 /// messages to these handlers.
555 ///
556 /// This is called automatically during Client construction (in `new()` and
557 /// `with_capabilities()`), so you don't need to call it manually.
558 ///
559 /// ## How It Works
560 ///
561 /// The handlers are synchronous closures that spawn async tasks to do the
562 /// actual work. This allows the dispatcher to continue routing messages
563 /// without blocking on handler execution.
564 fn register_dispatcher_handlers(&self) {
565 let dispatcher = self.inner.protocol.dispatcher();
566 let client_for_requests = self.clone();
567 let client_for_notifications = self.clone();
568
569 // Request handler (elicitation, sampling, etc.)
570 let semaphore = Arc::clone(&self.inner.handler_semaphore);
571 let request_handler = Arc::new(move |request: JsonRpcRequest| {
572 let client = client_for_requests.clone();
573 let method = request.method.clone();
574 let req_id = request.id.clone();
575 let semaphore = Arc::clone(&semaphore);
576
577 // ✅ Spawn async task with bounded concurrency
578 tokio::spawn(async move {
579 // Acquire permit (blocks if 100 requests already in flight)
580 let _permit = semaphore
581 .acquire()
582 .await
583 .expect("Semaphore should not be closed");
584
585 tracing::debug!(
586 "🔄 [request_handler] Handling server-initiated request: method={}, id={:?}",
587 method,
588 req_id
589 );
590 if let Err(e) = client.handle_request(request).await {
591 tracing::error!(
592 "❌ [request_handler] Error handling server request '{}': {}",
593 method,
594 e
595 );
596 tracing::error!(" Request ID: {:?}", req_id);
597 tracing::error!(" Error kind: {:?}", e.kind);
598 } else {
599 tracing::debug!(
600 "✅ [request_handler] Successfully handled server request: method={}, id={:?}",
601 method,
602 req_id
603 );
604 }
605 // Permit automatically released on drop ✅
606 });
607 Ok(())
608 });
609
610 // Notification handler
611 let semaphore_notif = Arc::clone(&self.inner.handler_semaphore);
612 let notification_handler = Arc::new(move |notification: JsonRpcNotification| {
613 let client = client_for_notifications.clone();
614 let semaphore = Arc::clone(&semaphore_notif);
615
616 // ✅ Spawn async task with bounded concurrency
617 tokio::spawn(async move {
618 // Acquire permit (blocks if 100 handlers already in flight)
619 let _permit = semaphore
620 .acquire()
621 .await
622 .expect("Semaphore should not be closed");
623
624 if let Err(e) = client.handle_notification(notification).await {
625 tracing::error!("Error handling server notification: {}", e);
626 }
627 // Permit automatically released on drop ✅
628 });
629 Ok(())
630 });
631
632 // Register handlers synchronously - no race condition!
633 // The set_* methods are now synchronous with std::sync::Mutex
634 dispatcher.set_request_handler(request_handler);
635 dispatcher.set_notification_handler(notification_handler);
636 tracing::debug!("Dispatcher handlers registered successfully");
637 }
638
639 /// Handle server-initiated requests (elicitation, sampling, roots)
640 ///
641 /// This method is called by the MessageDispatcher when it receives a request
642 /// from the server. It routes the request to the appropriate handler based on
643 /// the method name.
644 async fn handle_request(&self, request: JsonRpcRequest) -> Result<()> {
645 match request.method.as_str() {
646 "sampling/createMessage" => {
647 let handler_opt = self
648 .inner
649 .sampling_handler
650 .lock()
651 .expect("sampling_handler mutex poisoned")
652 .clone();
653 if let Some(handler) = handler_opt {
654 // Extract request ID for proper correlation
655 let request_id = match &request.id {
656 turbomcp_protocol::MessageId::String(s) => s.clone(),
657 turbomcp_protocol::MessageId::Number(n) => n.to_string(),
658 turbomcp_protocol::MessageId::Uuid(u) => u.to_string(),
659 };
660
661 let params: CreateMessageRequest =
662 serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
663 .map_err(|e| {
664 Error::protocol(format!("Invalid createMessage params: {}", e))
665 })?;
666
667 match handler.handle_create_message(request_id, params).await {
668 Ok(result) => {
669 let result_value = serde_json::to_value(result).map_err(|e| {
670 Error::protocol(format!("Failed to serialize response: {}", e))
671 })?;
672 let response = JsonRpcResponse::success(result_value, request.id);
673 self.send_response(response).await?;
674 }
675 Err(e) => {
676 tracing::warn!(
677 "⚠️ [handle_request] Sampling handler returned error: {}",
678 e
679 );
680
681 // Preserve error semantics by checking actual error type
682 // This allows proper error code propagation for retry logic
683 let (code, message) = if let Some(handler_err) =
684 e.downcast_ref::<HandlerError>()
685 {
686 // HandlerError has explicit JSON-RPC code mapping
687 let json_err = handler_err.into_jsonrpc_error();
688 tracing::info!(
689 "📋 [handle_request] HandlerError mapped to JSON-RPC code: {}",
690 json_err.code
691 );
692 (json_err.code, json_err.message)
693 } else if let Some(proto_err) =
694 e.downcast_ref::<turbomcp_protocol::Error>()
695 {
696 // Protocol errors have ErrorKind-based mapping
697 tracing::info!(
698 "📋 [handle_request] Protocol error mapped to code: {}",
699 proto_err.jsonrpc_error_code()
700 );
701 (proto_err.jsonrpc_error_code(), proto_err.to_string())
702 } else {
703 // Generic errors default to Internal (-32603)
704 // Log the error type for debugging (should rarely hit this path)
705 tracing::warn!(
706 "📋 [handle_request] Sampling handler returned unknown error type (not HandlerError or Protocol error): {}",
707 std::any::type_name_of_val(&*e)
708 );
709 (-32603, format!("Sampling handler error: {}", e))
710 };
711
712 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
713 code,
714 message,
715 data: None,
716 };
717 let response =
718 JsonRpcResponse::error_response(error, request.id.clone());
719
720 tracing::info!(
721 "🔄 [handle_request] Attempting to send error response for request: {:?}",
722 request.id
723 );
724 self.send_response(response).await?;
725 tracing::info!(
726 "✅ [handle_request] Error response sent successfully for request: {:?}",
727 request.id
728 );
729 }
730 }
731 } else {
732 // No handler configured
733 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
734 code: -32601,
735 message: "Sampling not supported".to_string(),
736 data: None,
737 };
738 let response = JsonRpcResponse::error_response(error, request.id);
739 self.send_response(response).await?;
740 }
741 }
742 "roots/list" => {
743 // Handle roots/list request from server
744 // Clone the handler Arc to avoid holding mutex across await
745 let handler_opt = self
746 .inner
747 .handlers
748 .lock()
749 .expect("handlers mutex poisoned")
750 .roots
751 .clone();
752
753 let roots_result = if let Some(handler) = handler_opt {
754 handler.handle_roots_request().await
755 } else {
756 // No handler - return empty list per MCP spec
757 Ok(Vec::new())
758 };
759
760 match roots_result {
761 Ok(roots) => {
762 let result_value =
763 serde_json::to_value(turbomcp_protocol::types::ListRootsResult {
764 roots,
765 _meta: None,
766 })
767 .map_err(|e| {
768 Error::protocol(format!(
769 "Failed to serialize roots response: {}",
770 e
771 ))
772 })?;
773 let response = JsonRpcResponse::success(result_value, request.id);
774 self.send_response(response).await?;
775 }
776 Err(e) => {
777 // FIXED: Extract error code from HandlerError instead of hardcoding -32603
778 // Roots handler returns HandlerResult<Vec<Root>>, so error is HandlerError
779 let json_err = e.into_jsonrpc_error();
780 let response = JsonRpcResponse::error_response(json_err, request.id);
781 self.send_response(response).await?;
782 }
783 }
784 }
785 "elicitation/create" => {
786 // Clone handler Arc before await to avoid holding mutex across await
787 let handler_opt = self
788 .inner
789 .handlers
790 .lock()
791 .expect("handlers mutex poisoned")
792 .elicitation
793 .clone();
794 if let Some(handler) = handler_opt {
795 // Parse elicitation request params as MCP protocol type
796 let proto_request: turbomcp_protocol::types::ElicitRequest =
797 serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
798 .map_err(|e| {
799 Error::protocol(format!("Invalid elicitation params: {}", e))
800 })?;
801
802 // Wrap protocol request with ID for handler (preserves type safety!)
803 let handler_request =
804 crate::handlers::ElicitationRequest::new(request.id.clone(), proto_request);
805
806 // Call the registered elicitation handler
807 match handler.handle_elicitation(handler_request).await {
808 Ok(elicit_response) => {
809 // Convert handler response back to protocol type
810 let proto_result = elicit_response.into_protocol();
811 let result_value = serde_json::to_value(proto_result).map_err(|e| {
812 Error::protocol(format!(
813 "Failed to serialize elicitation response: {}",
814 e
815 ))
816 })?;
817 let response = JsonRpcResponse::success(result_value, request.id);
818 self.send_response(response).await?;
819 }
820 Err(e) => {
821 // Convert handler error to JSON-RPC error using centralized mapping
822 let response =
823 JsonRpcResponse::error_response(e.into_jsonrpc_error(), request.id);
824 self.send_response(response).await?;
825 }
826 }
827 } else {
828 // No handler configured - elicitation not supported
829 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
830 code: -32601,
831 message: "Elicitation not supported - no handler registered".to_string(),
832 data: None,
833 };
834 let response = JsonRpcResponse::error_response(error, request.id);
835 self.send_response(response).await?;
836 }
837 }
838 _ => {
839 // Unknown method
840 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
841 code: -32601,
842 message: format!("Method not found: {}", request.method),
843 data: None,
844 };
845 let response = JsonRpcResponse::error_response(error, request.id);
846 self.send_response(response).await?;
847 }
848 }
849 Ok(())
850 }
851
852 /// Handle server-initiated notifications
853 ///
854 /// Routes notifications to appropriate handlers based on method name.
855 /// MCP defines several notification types that servers can send to clients:
856 ///
857 /// - `notifications/progress` - Progress updates for long-running operations
858 /// - `notifications/message` - Log messages from server
859 /// - `notifications/resources/updated` - Resource content changed
860 /// - `notifications/resources/list_changed` - Resource list changed
861 async fn handle_notification(&self, notification: JsonRpcNotification) -> Result<()> {
862 match notification.method.as_str() {
863 "notifications/progress" => {
864 // Progress tracking has been removed
865 tracing::debug!(
866 "Received progress notification - progress tracking has been removed"
867 );
868 }
869
870 "notifications/message" => {
871 // Route to log handler
872 let handler_opt = self
873 .inner
874 .handlers
875 .lock()
876 .expect("handlers mutex poisoned")
877 .get_log_handler();
878
879 if let Some(handler) = handler_opt {
880 // Parse log message
881 let log: crate::handlers::LoggingNotification = serde_json::from_value(
882 notification.params.unwrap_or(serde_json::Value::Null),
883 )
884 .map_err(|e| Error::protocol(format!("Invalid log notification: {}", e)))?;
885
886 // Call handler
887 if let Err(e) = handler.handle_log(log).await {
888 tracing::error!("Log handler error: {}", e);
889 }
890 } else {
891 tracing::debug!("Received log notification but no handler registered");
892 }
893 }
894
895 "notifications/resources/updated" => {
896 // Route to resource update handler
897 let handler_opt = self
898 .inner
899 .handlers
900 .lock()
901 .expect("handlers mutex poisoned")
902 .get_resource_update_handler();
903
904 if let Some(handler) = handler_opt {
905 // Parse resource update notification
906 let update: crate::handlers::ResourceUpdatedNotification =
907 serde_json::from_value(
908 notification.params.unwrap_or(serde_json::Value::Null),
909 )
910 .map_err(|e| {
911 Error::protocol(format!("Invalid resource update notification: {}", e))
912 })?;
913
914 // Call handler
915 if let Err(e) = handler.handle_resource_update(update).await {
916 tracing::error!("Resource update handler error: {}", e);
917 }
918 } else {
919 tracing::debug!(
920 "Received resource update notification but no handler registered"
921 );
922 }
923 }
924
925 "notifications/resources/list_changed" => {
926 // Route to resource list changed handler
927 let handler_opt = self
928 .inner
929 .handlers
930 .lock()
931 .expect("handlers mutex poisoned")
932 .get_resource_list_changed_handler();
933
934 if let Some(handler) = handler_opt {
935 if let Err(e) = handler.handle_resource_list_changed().await {
936 tracing::error!("Resource list changed handler error: {}", e);
937 }
938 } else {
939 tracing::debug!(
940 "Resource list changed notification received (no handler registered)"
941 );
942 }
943 }
944
945 "notifications/prompts/list_changed" => {
946 // Route to prompt list changed handler
947 let handler_opt = self
948 .inner
949 .handlers
950 .lock()
951 .expect("handlers mutex poisoned")
952 .get_prompt_list_changed_handler();
953
954 if let Some(handler) = handler_opt {
955 if let Err(e) = handler.handle_prompt_list_changed().await {
956 tracing::error!("Prompt list changed handler error: {}", e);
957 }
958 } else {
959 tracing::debug!(
960 "Prompt list changed notification received (no handler registered)"
961 );
962 }
963 }
964
965 "notifications/tools/list_changed" => {
966 // Route to tool list changed handler
967 let handler_opt = self
968 .inner
969 .handlers
970 .lock()
971 .expect("handlers mutex poisoned")
972 .get_tool_list_changed_handler();
973
974 if let Some(handler) = handler_opt {
975 if let Err(e) = handler.handle_tool_list_changed().await {
976 tracing::error!("Tool list changed handler error: {}", e);
977 }
978 } else {
979 tracing::debug!(
980 "Tool list changed notification received (no handler registered)"
981 );
982 }
983 }
984
985 "notifications/cancelled" => {
986 // Route to cancellation handler
987 let handler_opt = self
988 .inner
989 .handlers
990 .lock()
991 .expect("handlers mutex poisoned")
992 .get_cancellation_handler();
993
994 if let Some(handler) = handler_opt {
995 // Parse cancellation notification
996 let cancellation: crate::handlers::CancelledNotification =
997 serde_json::from_value(
998 notification.params.unwrap_or(serde_json::Value::Null),
999 )
1000 .map_err(|e| {
1001 Error::protocol(format!("Invalid cancellation notification: {}", e))
1002 })?;
1003
1004 // Call handler
1005 if let Err(e) = handler.handle_cancellation(cancellation).await {
1006 tracing::error!("Cancellation handler error: {}", e);
1007 }
1008 } else {
1009 tracing::debug!("Cancellation notification received (no handler registered)");
1010 }
1011 }
1012
1013 _ => {
1014 // Unknown notification type
1015 tracing::debug!("Received unknown notification: {}", notification.method);
1016 }
1017 }
1018
1019 Ok(())
1020 }
1021
1022 async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
1023 tracing::info!(
1024 "📤 [send_response] Sending JSON-RPC response: id={:?}",
1025 response.id
1026 );
1027
1028 let payload = serde_json::to_vec(&response).map_err(|e| {
1029 tracing::error!("❌ [send_response] Failed to serialize response: {}", e);
1030 Error::protocol(format!("Failed to serialize response: {}", e))
1031 })?;
1032
1033 tracing::debug!(
1034 "📤 [send_response] Response payload: {} bytes",
1035 payload.len()
1036 );
1037 tracing::debug!(
1038 "📤 [send_response] Response JSON: {}",
1039 String::from_utf8_lossy(&payload)
1040 );
1041
1042 let message = TransportMessage::new(
1043 turbomcp_protocol::MessageId::from("response".to_string()),
1044 payload.into(),
1045 );
1046
1047 self.inner
1048 .protocol
1049 .transport()
1050 .send(message)
1051 .await
1052 .map_err(|e| {
1053 tracing::error!("❌ [send_response] Transport send failed: {}", e);
1054 Error::transport(format!("Failed to send response: {}", e))
1055 })?;
1056
1057 tracing::info!(
1058 "✅ [send_response] Response sent successfully: id={:?}",
1059 response.id
1060 );
1061 Ok(())
1062 }
1063
1064 /// Initialize the connection with the MCP server
1065 ///
1066 /// Performs the initialization handshake with the server, negotiating capabilities
1067 /// and establishing the protocol version. This method must be called before
1068 /// any other operations can be performed.
1069 ///
1070 /// # Returns
1071 ///
1072 /// Returns an `InitializeResult` containing server information and negotiated capabilities.
1073 ///
1074 /// # Errors
1075 ///
1076 /// Returns an error if:
1077 /// - The transport connection fails
1078 /// - The server rejects the initialization request
1079 /// - Protocol negotiation fails
1080 ///
1081 /// # Examples
1082 ///
1083 /// ```rust,no_run
1084 /// # use turbomcp_client::Client;
1085 /// # use turbomcp_transport::stdio::StdioTransport;
1086 /// # async fn example() -> turbomcp_protocol::Result<()> {
1087 /// let mut client = Client::new(StdioTransport::new());
1088 ///
1089 /// let result = client.initialize().await?;
1090 /// println!("Server: {} v{}", result.server_info.name, result.server_info.version);
1091 /// # Ok(())
1092 /// # }
1093 /// ```
1094 pub async fn initialize(&self) -> Result<InitializeResult> {
1095 // Auto-connect transport if not already connected
1096 // This provides consistent DX across all transports (Stdio, TCP, HTTP, WebSocket, Unix)
1097 let transport = self.inner.protocol.transport();
1098 let transport_state = transport.state().await;
1099 if !matches!(
1100 transport_state,
1101 turbomcp_transport::TransportState::Connected
1102 ) {
1103 tracing::debug!(
1104 "Auto-connecting transport (current state: {:?})",
1105 transport_state
1106 );
1107 transport
1108 .connect()
1109 .await
1110 .map_err(|e| Error::transport(format!("Failed to connect transport: {}", e)))?;
1111 tracing::info!("Transport connected successfully");
1112 }
1113
1114 // Build client capabilities based on registered handlers (automatic detection)
1115 let mut client_caps = ProtocolClientCapabilities::default();
1116
1117 // Detect sampling capability from handler
1118 if let Some(sampling_caps) = self.get_sampling_capabilities() {
1119 client_caps.sampling = Some(sampling_caps);
1120 }
1121
1122 // Detect elicitation capability from handler
1123 if let Some(elicitation_caps) = self.get_elicitation_capabilities() {
1124 client_caps.elicitation = Some(elicitation_caps);
1125 }
1126
1127 // Detect roots capability from handler
1128 if let Some(roots_caps) = self.get_roots_capabilities() {
1129 client_caps.roots = Some(roots_caps);
1130 }
1131
1132 // Send MCP initialization request
1133 let request = InitializeRequest {
1134 protocol_version: PROTOCOL_VERSION.to_string(),
1135 capabilities: client_caps,
1136 client_info: turbomcp_protocol::types::Implementation {
1137 name: "turbomcp-client".to_string(),
1138 version: env!("CARGO_PKG_VERSION").to_string(),
1139 title: Some("TurboMCP Client".to_string()),
1140 ..Default::default()
1141 },
1142 _meta: None,
1143 };
1144
1145 let protocol_response: ProtocolInitializeResult = self
1146 .inner
1147 .protocol
1148 .request("initialize", Some(serde_json::to_value(request)?))
1149 .await?;
1150
1151 // AtomicBool: lock-free store with Ordering::Relaxed
1152 self.inner.initialized.store(true, Ordering::Relaxed);
1153
1154 // Send initialized notification
1155 self.inner
1156 .protocol
1157 .notify("notifications/initialized", None)
1158 .await?;
1159
1160 // Convert protocol response to client response type
1161 Ok(InitializeResult {
1162 server_info: protocol_response.server_info,
1163 server_capabilities: protocol_response.capabilities,
1164 })
1165 }
1166
1167 /// Execute a protocol method with plugin middleware
1168 ///
1169 /// This is a generic helper for wrapping protocol calls with plugin middleware.
1170 pub(crate) async fn execute_with_plugins<R>(
1171 &self,
1172 method_name: &str,
1173 params: Option<serde_json::Value>,
1174 ) -> Result<R>
1175 where
1176 R: serde::de::DeserializeOwned + serde::Serialize + Clone,
1177 {
1178 // Create JSON-RPC request for plugin context
1179 let json_rpc_request = turbomcp_protocol::jsonrpc::JsonRpcRequest {
1180 jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
1181 id: turbomcp_protocol::MessageId::Number(1),
1182 method: method_name.to_string(),
1183 params: params.clone(),
1184 };
1185
1186 // 1. Create request context for plugins
1187 let mut req_ctx =
1188 crate::plugins::RequestContext::new(json_rpc_request, std::collections::HashMap::new());
1189
1190 // 2. Execute before_request plugin middleware
1191 if let Err(e) = self
1192 .inner
1193 .plugin_registry
1194 .lock()
1195 .await
1196 .execute_before_request(&mut req_ctx)
1197 .await
1198 {
1199 return Err(Error::bad_request(format!(
1200 "Plugin before_request failed: {}",
1201 e
1202 )));
1203 }
1204
1205 // 3. Execute the actual protocol call
1206 let start_time = std::time::Instant::now();
1207 let protocol_result: Result<R> = self
1208 .inner
1209 .protocol
1210 .request(method_name, req_ctx.params().cloned())
1211 .await;
1212 let duration = start_time.elapsed();
1213
1214 // 4. Prepare response context
1215 let mut resp_ctx = match protocol_result {
1216 Ok(ref response) => {
1217 let response_value = serde_json::to_value(response.clone())?;
1218 crate::plugins::ResponseContext::new(req_ctx, Some(response_value), None, duration)
1219 }
1220 Err(ref e) => {
1221 crate::plugins::ResponseContext::new(req_ctx, None, Some(*e.clone()), duration)
1222 }
1223 };
1224
1225 // 5. Execute after_response plugin middleware
1226 if let Err(e) = self
1227 .inner
1228 .plugin_registry
1229 .lock()
1230 .await
1231 .execute_after_response(&mut resp_ctx)
1232 .await
1233 {
1234 return Err(Error::bad_request(format!(
1235 "Plugin after_response failed: {}",
1236 e
1237 )));
1238 }
1239
1240 // 6. Return the final result, checking for plugin modifications
1241 match protocol_result {
1242 Ok(ref response) => {
1243 // Check if plugins modified the response
1244 if let Some(modified_response) = resp_ctx.response {
1245 // Try to deserialize the modified response
1246 if let Ok(modified_result) =
1247 serde_json::from_value::<R>(modified_response.clone())
1248 {
1249 return Ok(modified_result);
1250 }
1251 }
1252
1253 // No plugin modifications, use original response
1254 Ok(response.clone())
1255 }
1256 Err(e) => {
1257 // Check if plugins provided an error recovery response
1258 if let Some(recovery_response) = resp_ctx.response {
1259 if let Ok(recovery_result) = serde_json::from_value::<R>(recovery_response) {
1260 Ok(recovery_result)
1261 } else {
1262 Err(e)
1263 }
1264 } else {
1265 Err(e)
1266 }
1267 }
1268 }
1269 }
1270
1271 /// Subscribe to resource change notifications
1272 ///
1273 /// Registers interest in receiving notifications when the specified
1274 /// resource changes. The server will send notifications when the
1275 /// resource is modified, created, or deleted.
1276 ///
1277 /// # Arguments
1278 ///
1279 /// * `uri` - The URI of the resource to monitor
1280 ///
1281 /// # Returns
1282 ///
1283 /// Returns `EmptyResult` on successful subscription.
1284 ///
1285 /// # Errors
1286 ///
1287 /// Returns an error if:
1288 /// - The client is not initialized
1289 /// - The URI is invalid or empty
1290 /// - The server doesn't support subscriptions
1291 /// - The request fails
1292 ///
1293 /// # Examples
1294 ///
1295 /// ```rust,no_run
1296 /// # use turbomcp_client::Client;
1297 /// # use turbomcp_transport::stdio::StdioTransport;
1298 /// # async fn example() -> turbomcp_protocol::Result<()> {
1299 /// let mut client = Client::new(StdioTransport::new());
1300 /// client.initialize().await?;
1301 ///
1302 /// // Subscribe to file changes
1303 /// client.subscribe("file:///watch/directory").await?;
1304 /// println!("Subscribed to resource changes");
1305 /// # Ok(())
1306 /// # }
1307 /// ```
1308 pub async fn subscribe(&self, uri: &str) -> Result<EmptyResult> {
1309 if !self.inner.initialized.load(Ordering::Relaxed) {
1310 return Err(Error::bad_request("Client not initialized"));
1311 }
1312
1313 if uri.is_empty() {
1314 return Err(Error::bad_request("Subscription URI cannot be empty"));
1315 }
1316
1317 // Send resources/subscribe request with plugin middleware
1318 let request = SubscribeRequest {
1319 uri: uri.to_string(),
1320 };
1321
1322 self.execute_with_plugins(
1323 "resources/subscribe",
1324 Some(serde_json::to_value(request).map_err(|e| {
1325 Error::protocol(format!("Failed to serialize subscribe request: {}", e))
1326 })?),
1327 )
1328 .await
1329 }
1330
1331 /// Unsubscribe from resource change notifications
1332 ///
1333 /// Cancels a previous subscription to resource changes. After unsubscribing,
1334 /// the client will no longer receive notifications for the specified resource.
1335 ///
1336 /// # Arguments
1337 ///
1338 /// * `uri` - The URI of the resource to stop monitoring
1339 ///
1340 /// # Returns
1341 ///
1342 /// Returns `EmptyResult` on successful unsubscription.
1343 ///
1344 /// # Errors
1345 ///
1346 /// Returns an error if:
1347 /// - The client is not initialized
1348 /// - The URI is invalid or empty
1349 /// - No active subscription exists for the URI
1350 /// - The request fails
1351 ///
1352 /// # Examples
1353 ///
1354 /// ```rust,no_run
1355 /// # use turbomcp_client::Client;
1356 /// # use turbomcp_transport::stdio::StdioTransport;
1357 /// # async fn example() -> turbomcp_protocol::Result<()> {
1358 /// let mut client = Client::new(StdioTransport::new());
1359 /// client.initialize().await?;
1360 ///
1361 /// // Unsubscribe from file changes
1362 /// client.unsubscribe("file:///watch/directory").await?;
1363 /// println!("Unsubscribed from resource changes");
1364 /// # Ok(())
1365 /// # }
1366 /// ```
1367 pub async fn unsubscribe(&self, uri: &str) -> Result<EmptyResult> {
1368 if !self.inner.initialized.load(Ordering::Relaxed) {
1369 return Err(Error::bad_request("Client not initialized"));
1370 }
1371
1372 if uri.is_empty() {
1373 return Err(Error::bad_request("Unsubscription URI cannot be empty"));
1374 }
1375
1376 // Send resources/unsubscribe request with plugin middleware
1377 let request = UnsubscribeRequest {
1378 uri: uri.to_string(),
1379 };
1380
1381 self.execute_with_plugins(
1382 "resources/unsubscribe",
1383 Some(serde_json::to_value(request).map_err(|e| {
1384 Error::protocol(format!("Failed to serialize unsubscribe request: {}", e))
1385 })?),
1386 )
1387 .await
1388 }
1389
1390 /// Get the client's capabilities configuration
1391 #[must_use]
1392 pub fn capabilities(&self) -> &ClientCapabilities {
1393 &self.inner.capabilities
1394 }
1395
1396 /// Initialize all registered plugins
1397 ///
1398 /// This should be called after registration but before using the client.
1399 pub async fn initialize_plugins(&self) -> Result<()> {
1400 // Set up client context for plugins with actual client capabilities
1401 let mut capabilities = std::collections::HashMap::new();
1402 capabilities.insert(
1403 "protocol_version".to_string(),
1404 serde_json::json!("2024-11-05"),
1405 );
1406 capabilities.insert(
1407 "mcp_version".to_string(),
1408 serde_json::json!(env!("CARGO_PKG_VERSION")),
1409 );
1410 capabilities.insert(
1411 "supports_notifications".to_string(),
1412 serde_json::json!(true),
1413 );
1414 capabilities.insert(
1415 "supports_sampling".to_string(),
1416 serde_json::json!(self.has_sampling_handler()),
1417 );
1418 capabilities.insert("supports_progress".to_string(), serde_json::json!(true));
1419 capabilities.insert("supports_roots".to_string(), serde_json::json!(true));
1420
1421 // Extract client configuration
1422 let mut config = std::collections::HashMap::new();
1423 config.insert(
1424 "client_name".to_string(),
1425 serde_json::json!("turbomcp-client"),
1426 );
1427 config.insert(
1428 "initialized".to_string(),
1429 serde_json::json!(self.inner.initialized.load(Ordering::Relaxed)),
1430 );
1431 config.insert(
1432 "plugin_count".to_string(),
1433 serde_json::json!(self.inner.plugin_registry.lock().await.plugin_count()),
1434 );
1435
1436 let context = crate::plugins::PluginContext::new(
1437 "turbomcp-client".to_string(),
1438 env!("CARGO_PKG_VERSION").to_string(),
1439 capabilities,
1440 config,
1441 vec![], // Will be populated by the registry
1442 );
1443
1444 self.inner
1445 .plugin_registry
1446 .lock()
1447 .await
1448 .set_client_context(context);
1449
1450 // Note: Individual plugins are initialized automatically during registration
1451 // via PluginRegistry::register_plugin(). This method ensures the registry
1452 // has proper client context for any future plugin registrations.
1453 Ok(())
1454 }
1455
1456 /// Cleanup all registered plugins
1457 ///
1458 /// This should be called when the client is being shut down.
1459 pub async fn cleanup_plugins(&self) -> Result<()> {
1460 // Clear the plugin registry - plugins will be dropped and cleaned up automatically
1461 // The Rust ownership system ensures proper cleanup when the Arc<dyn ClientPlugin>
1462 // references are dropped.
1463
1464 // Note: The plugin system uses RAII (Resource Acquisition Is Initialization)
1465 // pattern where plugins clean up their resources in their Drop implementation.
1466 // Replace the registry with a fresh one (mutex ensures safe access)
1467 *self.inner.plugin_registry.lock().await = crate::plugins::PluginRegistry::new();
1468 Ok(())
1469 }
1470
1471 // ============================================================================
1472 // Tasks API Methods (MCP 2025-11-25 Draft - SEP-1686)
1473 // ============================================================================
1474
1475 /// Retrieve the status of a task (tasks/get)
1476 ///
1477 /// Polls the server for the current status of a specific task.
1478 ///
1479 /// # Arguments
1480 ///
1481 /// * `task_id` - The ID of the task to query
1482 ///
1483 /// # Returns
1484 ///
1485 /// Returns the current `Task` state including status, timestamps, and messages.
1486 #[cfg(feature = "mcp-tasks")]
1487 pub async fn get_task(&self, task_id: &str) -> Result<Task> {
1488 let request = GetTaskRequest {
1489 task_id: task_id.to_string(),
1490 };
1491
1492 self.execute_with_plugins(
1493 "tasks/get",
1494 Some(serde_json::to_value(request).map_err(|e| {
1495 Error::protocol(format!("Failed to serialize get_task request: {}", e))
1496 })?),
1497 )
1498 .await
1499 }
1500
1501 /// Cancel a running task (tasks/cancel)
1502 ///
1503 /// Attempts to cancel a task execution. This is a best-effort operation.
1504 ///
1505 /// # Arguments
1506 ///
1507 /// * `task_id` - The ID of the task to cancel
1508 ///
1509 /// # Returns
1510 ///
1511 /// Returns the updated `Task` state (typically with status "cancelled").
1512 #[cfg(feature = "mcp-tasks")]
1513 pub async fn cancel_task(&self, task_id: &str) -> Result<Task> {
1514 let request = CancelTaskRequest {
1515 task_id: task_id.to_string(),
1516 };
1517
1518 self.execute_with_plugins(
1519 "tasks/cancel",
1520 Some(serde_json::to_value(request).map_err(|e| {
1521 Error::protocol(format!("Failed to serialize cancel_task request: {}", e))
1522 })?),
1523 )
1524 .await
1525 }
1526
1527 /// List all tasks (tasks/list)
1528 ///
1529 /// Retrieves a paginated list of tasks known to the server.
1530 ///
1531 /// # Arguments
1532 ///
1533 /// * `cursor` - Optional pagination cursor from a previous response
1534 /// * `limit` - Optional maximum number of tasks to return
1535 ///
1536 /// # Returns
1537 ///
1538 /// Returns a `ListTasksResult` containing the list of tasks and next cursor.
1539 #[cfg(feature = "mcp-tasks")]
1540 pub async fn list_tasks(
1541 &self,
1542 cursor: Option<String>,
1543 limit: Option<usize>,
1544 ) -> Result<ListTasksResult> {
1545 let request = ListTasksRequest { cursor, limit };
1546
1547 self.execute_with_plugins(
1548 "tasks/list",
1549 Some(serde_json::to_value(request).map_err(|e| {
1550 Error::protocol(format!("Failed to serialize list_tasks request: {}", e))
1551 })?),
1552 )
1553 .await
1554 }
1555
1556 /// Retrieve the result of a completed task (tasks/result)
1557 ///
1558 /// Blocks until the task reaches a terminal state (completed, failed, or cancelled),
1559 /// then returns the operation result.
1560 ///
1561 /// # Arguments
1562 ///
1563 /// * `task_id` - The ID of the task to retrieve results for
1564 ///
1565 /// # Returns
1566 ///
1567 /// Returns a `GetTaskPayloadResult` containing the operation result (e.g. CallToolResult).
1568 #[cfg(feature = "mcp-tasks")]
1569 pub async fn get_task_result(&self, task_id: &str) -> Result<GetTaskPayloadResult> {
1570 let request = GetTaskPayloadRequest {
1571 task_id: task_id.to_string(),
1572 };
1573
1574 self.execute_with_plugins(
1575 "tasks/result",
1576 Some(serde_json::to_value(request).map_err(|e| {
1577 Error::protocol(format!(
1578 "Failed to serialize get_task_result request: {}",
1579 e
1580 ))
1581 })?),
1582 )
1583 .await
1584 }
1585
1586 // Note: Capability detection methods (has_*_handler, get_*_capabilities)
1587 // are defined in their respective operation modules:
1588 // - sampling.rs: has_sampling_handler, get_sampling_capabilities
1589 // - handlers.rs: has_elicitation_handler, has_roots_handler
1590 //
1591 // Additional capability getters for elicitation and roots added below
1592 // since they're used during initialization
1593
1594 /// Get elicitation capabilities if handler is registered
1595 /// Automatically detects capability based on registered handler
1596 fn get_elicitation_capabilities(
1597 &self,
1598 ) -> Option<turbomcp_protocol::types::ElicitationCapabilities> {
1599 if self.has_elicitation_handler() {
1600 // Currently returns default capabilities. In the future, schema_validation support
1601 // could be detected from handler traits by adding a HasSchemaValidation marker trait
1602 // that handlers could implement. For now, handlers validate schemas themselves.
1603 Some(turbomcp_protocol::types::ElicitationCapabilities::default())
1604 } else {
1605 None
1606 }
1607 }
1608
1609 /// Get roots capabilities if handler is registered
1610 fn get_roots_capabilities(&self) -> Option<turbomcp_protocol::types::RootsCapabilities> {
1611 if self.has_roots_handler() {
1612 // Roots capabilities indicate whether list can change
1613 Some(turbomcp_protocol::types::RootsCapabilities {
1614 list_changed: Some(true), // Support dynamic roots by default
1615 })
1616 } else {
1617 None
1618 }
1619 }
1620}