turbomcp_client/client/core.rs
1//! Core Client implementation for MCP communication
2//!
3//! This module contains the main `Client<T>` struct and its implementation,
4//! providing the core MCP client functionality including:
5//!
6//! - Connection initialization and lifecycle management
7//! - Message processing and bidirectional communication
8//! - MCP operation support (tools, prompts, resources, sampling, etc.)
9//! - Plugin middleware integration
10//! - Handler registration and management
11//!
12//! # Architecture
13//!
14//! `Client<T>` is implemented as a cheaply-cloneable Arc wrapper with interior
15//! mutability (same pattern as reqwest and AWS SDK):
16//!
17//! - **AtomicBool** for initialized flag (lock-free)
18//! - **Arc<Mutex<...>>** for handlers/plugins (infrequent mutation)
19//! - **`Arc<ClientInner<T>>`** for cheap cloning
20
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::{Arc, Mutex as StdMutex};
23
24use tokio::sync::Semaphore;
25
26use turbomcp_protocol::jsonrpc::*;
27use turbomcp_protocol::types::{
28 ClientCapabilities as ProtocolClientCapabilities, InitializeResult as ProtocolInitializeResult,
29 *,
30};
31use turbomcp_protocol::{Error, PROTOCOL_VERSION, Result};
32use turbomcp_transport::{Transport, TransportMessage};
33
34use super::config::InitializeResult;
35use super::protocol::ProtocolClient;
36use crate::{
37 ClientCapabilities,
38 handlers::{HandlerError, HandlerRegistry},
39 sampling::SamplingHandler,
40};
41
42/// Inner client state with interior mutability
43///
44/// This structure contains the actual client state and is wrapped in Arc<...>
45/// to enable cheap cloning (same pattern as reqwest and AWS SDK).
46pub(super) struct ClientInner<T: Transport + 'static> {
47 /// Protocol client for low-level communication
48 pub(super) protocol: ProtocolClient<T>,
49
50 /// Client capabilities (immutable after construction)
51 pub(super) capabilities: ClientCapabilities,
52
53 /// Initialization state (lock-free atomic boolean)
54 pub(super) initialized: AtomicBool,
55
56 /// Optional sampling handler (mutex for dynamic updates)
57 pub(super) sampling_handler: Arc<StdMutex<Option<Arc<dyn SamplingHandler>>>>,
58
59 /// Handler registry for bidirectional communication (mutex for registration)
60 pub(super) handlers: Arc<StdMutex<HandlerRegistry>>,
61
62 /// Plugin registry for middleware (tokio mutex - holds across await)
63 pub(super) plugin_registry: Arc<tokio::sync::Mutex<crate::plugins::PluginRegistry>>,
64
65 /// ✅ Semaphore for bounded concurrency of request/notification handlers
66 /// Limits concurrent server-initiated request handlers to prevent resource exhaustion
67 pub(super) handler_semaphore: Arc<Semaphore>,
68}
69
70/// The core MCP client implementation
71///
72/// Client provides a comprehensive interface for communicating with MCP servers,
73/// supporting all protocol features including tools, prompts, resources, sampling,
74/// elicitation, and bidirectional communication patterns.
75///
76/// # Clone Pattern
77///
78/// `Client<T>` is cheaply cloneable via Arc (same pattern as reqwest and AWS SDK).
79/// All clones share the same underlying connection and state:
80///
81/// ```rust,no_run
82/// use turbomcp_client::Client;
83/// use turbomcp_transport::stdio::StdioTransport;
84///
85/// # async fn example() -> turbomcp_protocol::Result<()> {
86/// let client = Client::new(StdioTransport::new());
87/// client.initialize().await?;
88///
89/// // Cheap clone - shares same connection
90/// let client2 = client.clone();
91/// tokio::spawn(async move {
92/// client2.list_tools().await.ok();
93/// });
94/// # Ok(())
95/// # }
96/// ```
97///
98/// The client must be initialized before use by calling `initialize()` to perform
99/// the MCP handshake and capability negotiation.
100///
101/// # Features
102///
103/// - **Protocol Compliance**: Full MCP 2025-06-18 specification support
104/// - **Bidirectional Communication**: Server-initiated requests and client responses
105/// - **Plugin Middleware**: Extensible request/response processing
106/// - **Handler Registry**: Callbacks for server-initiated operations
107/// - **Connection Management**: Robust error handling and recovery
108/// - **Type Safety**: Compile-time guarantees for MCP message types
109/// - **Cheap Cloning**: Arc-based sharing like reqwest/AWS SDK
110///
111/// # Examples
112///
113/// ```rust,no_run
114/// use turbomcp_client::Client;
115/// use turbomcp_transport::stdio::StdioTransport;
116/// use std::collections::HashMap;
117///
118/// # async fn example() -> turbomcp_protocol::Result<()> {
119/// // Create and initialize client (no mut needed!)
120/// let client = Client::new(StdioTransport::new());
121/// let init_result = client.initialize().await?;
122/// println!("Connected to: {}", init_result.server_info.name);
123///
124/// // Use MCP operations
125/// let tools = client.list_tools().await?;
126/// let mut args = HashMap::new();
127/// args.insert("input".to_string(), serde_json::json!("test"));
128/// let result = client.call_tool("my_tool", Some(args)).await?;
129/// # Ok(())
130/// # }
131/// ```
132pub struct Client<T: Transport + 'static> {
133 pub(super) inner: Arc<ClientInner<T>>,
134}
135
136/// Clone implementation via Arc (same pattern as reqwest/AWS SDK)
137///
138/// Cloning a Client is cheap (just an Arc clone) and all clones share
139/// the same underlying connection and state.
140impl<T: Transport + 'static> Clone for Client<T> {
141 fn clone(&self) -> Self {
142 Self {
143 inner: Arc::clone(&self.inner),
144 }
145 }
146}
147
148impl<T: Transport + 'static> Drop for ClientInner<T> {
149 fn drop(&mut self) {
150 // Best-effort cleanup if shutdown() wasn't called
151 // This is a safety net, but applications SHOULD call shutdown() explicitly
152
153 // Single warning to catch attention
154 tracing::warn!(
155 "MCP Client dropped without explicit shutdown() - call client.shutdown().await for clean resource cleanup"
156 );
157
158 // Details at debug level to avoid noise in production
159 tracing::debug!(" 📘 RECOMMENDATION: Call client.shutdown().await before dropping");
160 tracing::debug!(
161 " 🐛 IMPACT: Background tasks (especially WebSocket reconnection) may continue running"
162 );
163 tracing::debug!(" 💡 See client shutdown documentation for best practices");
164
165 // We can shutdown the dispatcher (it's synchronous)
166 self.protocol.dispatcher().shutdown();
167 tracing::debug!(" ✅ Message dispatcher stopped");
168
169 // ⚠️ LIMITATION: Cannot call transport.disconnect() from Drop because it's async
170 // This means:
171 // - WebSocket: Close frame not sent, reconnection tasks keep running
172 // - HTTP: Connections may not close cleanly
173 // - All transports: Background tasks may be orphaned
174 tracing::debug!(" ❌ Transport NOT disconnected (Drop cannot call async methods)");
175 tracing::debug!(" ⚠️ WebSocket reconnection and other background tasks may continue");
176 }
177}
178
179impl<T: Transport + 'static> Client<T> {
180 /// Create a new client with the specified transport
181 ///
182 /// Creates a new MCP client instance with default capabilities.
183 /// The client must be initialized before use by calling `initialize()`.
184 ///
185 /// # Arguments
186 ///
187 /// * `transport` - The transport implementation to use for communication
188 ///
189 /// # Examples
190 ///
191 /// ```rust,no_run
192 /// use turbomcp_client::Client;
193 /// use turbomcp_transport::stdio::StdioTransport;
194 ///
195 /// let transport = StdioTransport::new();
196 /// let client = Client::new(transport);
197 /// ```
198 pub fn new(transport: T) -> Self {
199 let capabilities = ClientCapabilities::default();
200 let client = Self {
201 inner: Arc::new(ClientInner {
202 protocol: ProtocolClient::new(transport),
203 capabilities: capabilities.clone(),
204 initialized: AtomicBool::new(false),
205 sampling_handler: Arc::new(StdMutex::new(None)),
206 handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
207 plugin_registry: Arc::new(tokio::sync::Mutex::new(
208 crate::plugins::PluginRegistry::new(),
209 )),
210 handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), // ✅ Configurable concurrent handlers
211 }),
212 };
213
214 // Register dispatcher handlers for bidirectional communication
215 client.register_dispatcher_handlers();
216
217 client
218 }
219
220 /// Create a new client with custom capabilities
221 ///
222 /// # Arguments
223 ///
224 /// * `transport` - The transport implementation to use
225 /// * `capabilities` - The client capabilities to negotiate
226 ///
227 /// # Examples
228 ///
229 /// ```rust,no_run
230 /// use turbomcp_client::{Client, ClientCapabilities};
231 /// use turbomcp_transport::stdio::StdioTransport;
232 ///
233 /// let capabilities = ClientCapabilities {
234 /// tools: true,
235 /// prompts: true,
236 /// resources: false,
237 /// sampling: false,
238 /// max_concurrent_handlers: 100,
239 /// };
240 ///
241 /// let transport = StdioTransport::new();
242 /// let client = Client::with_capabilities(transport, capabilities);
243 /// ```
244 pub fn with_capabilities(transport: T, capabilities: ClientCapabilities) -> Self {
245 let client = Self {
246 inner: Arc::new(ClientInner {
247 protocol: ProtocolClient::new(transport),
248 capabilities: capabilities.clone(),
249 initialized: AtomicBool::new(false),
250 sampling_handler: Arc::new(StdMutex::new(None)),
251 handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
252 plugin_registry: Arc::new(tokio::sync::Mutex::new(
253 crate::plugins::PluginRegistry::new(),
254 )),
255 handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), // ✅ Configurable concurrent handlers
256 }),
257 };
258
259 // Register dispatcher handlers for bidirectional communication
260 client.register_dispatcher_handlers();
261
262 client
263 }
264
265 /// Shutdown the client and clean up all resources
266 ///
267 /// This method performs a **graceful shutdown** of the MCP client by:
268 /// 1. Stopping the message dispatcher background task
269 /// 2. Disconnecting the transport (closes connection, stops background tasks)
270 ///
271 /// **CRITICAL**: After calling `shutdown()`, the client can no longer be used.
272 ///
273 /// # Why This Method Exists
274 ///
275 /// The `Drop` implementation cannot call async methods like `transport.disconnect()`,
276 /// which is required for proper cleanup of WebSocket connections and background tasks.
277 /// Without calling `shutdown()`, WebSocket reconnection tasks will continue running.
278 ///
279 /// # Best Practices
280 ///
281 /// - **Always call `shutdown()` before dropping** for clean resource cleanup
282 /// - For applications: call in signal handlers (`SIGINT`, `SIGTERM`)
283 /// - For tests: call in cleanup/teardown
284 /// - If forgotten, Drop will log warnings and do best-effort cleanup
285 ///
286 /// # Examples
287 ///
288 /// ```rust,no_run
289 /// # use turbomcp_client::Client;
290 /// # use turbomcp_transport::stdio::StdioTransport;
291 /// # async fn example() -> turbomcp_protocol::Result<()> {
292 /// let client = Client::new(StdioTransport::new());
293 /// client.initialize().await?;
294 ///
295 /// // Use client...
296 /// let _tools = client.list_tools().await?;
297 ///
298 /// // ✅ Clean shutdown
299 /// client.shutdown().await?;
300 /// # Ok(())
301 /// # }
302 /// ```
303 pub async fn shutdown(&self) -> Result<()> {
304 tracing::info!("🛑 Shutting down MCP client");
305
306 // 1. Shutdown message dispatcher
307 self.inner.protocol.dispatcher().shutdown();
308 tracing::debug!("✅ Message dispatcher stopped");
309
310 // 2. Disconnect transport (WebSocket: stops reconnection, HTTP: closes connections)
311 match self.inner.protocol.transport().disconnect().await {
312 Ok(()) => {
313 tracing::info!("✅ Transport disconnected successfully");
314 }
315 Err(e) => {
316 tracing::warn!("Transport disconnect error (may already be closed): {}", e);
317 }
318 }
319
320 tracing::info!("✅ MCP client shutdown complete");
321 Ok(())
322 }
323}
324
325// ============================================================================
326// HTTP-Specific Convenience Constructors (Feature-Gated)
327// ============================================================================
328
329#[cfg(feature = "http")]
330impl Client<turbomcp_transport::streamable_http_client::StreamableHttpClientTransport> {
331 /// Connect to an HTTP MCP server (convenience method)
332 ///
333 /// This is a convenient one-liner alternative to manual configuration.
334 /// Creates an HTTP client, connects, and initializes in one call.
335 ///
336 /// # Arguments
337 ///
338 /// * `url` - The base URL of the MCP server (e.g., "http://localhost:8080")
339 ///
340 /// # Returns
341 ///
342 /// Returns an initialized `Client` ready to use.
343 ///
344 /// # Errors
345 ///
346 /// Returns an error if:
347 /// - The URL is invalid
348 /// - Connection to the server fails
349 /// - Initialization handshake fails
350 ///
351 /// # Examples
352 ///
353 /// ```rust,no_run
354 /// use turbomcp_client::Client;
355 ///
356 /// # async fn example() -> turbomcp_protocol::Result<()> {
357 /// // Convenient one-liner - balanced with server DX
358 /// let client = Client::connect_http("http://localhost:8080").await?;
359 ///
360 /// // Now use it directly
361 /// let tools = client.list_tools().await?;
362 /// # Ok(())
363 /// # }
364 /// ```
365 ///
366 /// Compare to verbose approach (10+ lines):
367 /// ```rust,no_run
368 /// use turbomcp_client::Client;
369 /// use turbomcp_transport::streamable_http_client::{
370 /// StreamableHttpClientConfig, StreamableHttpClientTransport
371 /// };
372 ///
373 /// # async fn example() -> turbomcp_protocol::Result<()> {
374 /// let config = StreamableHttpClientConfig {
375 /// base_url: "http://localhost:8080".to_string(),
376 /// ..Default::default()
377 /// };
378 /// let transport = StreamableHttpClientTransport::new(config);
379 /// let client = Client::new(transport);
380 /// client.initialize().await?;
381 /// # Ok(())
382 /// # }
383 /// ```
384 pub async fn connect_http(url: impl Into<String>) -> Result<Self> {
385 use turbomcp_transport::streamable_http_client::{
386 StreamableHttpClientConfig, StreamableHttpClientTransport,
387 };
388
389 let config = StreamableHttpClientConfig {
390 base_url: url.into(),
391 ..Default::default()
392 };
393
394 let transport = StreamableHttpClientTransport::new(config);
395 let client = Self::new(transport);
396
397 // Initialize connection immediately
398 client.initialize().await?;
399
400 Ok(client)
401 }
402
403 /// Connect to an HTTP MCP server with custom configuration
404 ///
405 /// Provides more control than `connect_http()` while still being ergonomic.
406 ///
407 /// # Arguments
408 ///
409 /// * `url` - The base URL of the MCP server
410 /// * `config_fn` - Function to customize the configuration
411 ///
412 /// # Examples
413 ///
414 /// ```rust,no_run
415 /// use turbomcp_client::Client;
416 /// use std::time::Duration;
417 ///
418 /// # async fn example() -> turbomcp_protocol::Result<()> {
419 /// let client = Client::connect_http_with("http://localhost:8080", |config| {
420 /// config.timeout = Duration::from_secs(60);
421 /// config.endpoint_path = "/api/mcp".to_string();
422 /// }).await?;
423 /// # Ok(())
424 /// # }
425 /// ```
426 pub async fn connect_http_with<F>(url: impl Into<String>, config_fn: F) -> Result<Self>
427 where
428 F: FnOnce(&mut turbomcp_transport::streamable_http_client::StreamableHttpClientConfig),
429 {
430 use turbomcp_transport::streamable_http_client::{
431 StreamableHttpClientConfig, StreamableHttpClientTransport,
432 };
433
434 let mut config = StreamableHttpClientConfig {
435 base_url: url.into(),
436 ..Default::default()
437 };
438
439 config_fn(&mut config);
440
441 let transport = StreamableHttpClientTransport::new(config);
442 let client = Self::new(transport);
443
444 client.initialize().await?;
445
446 Ok(client)
447 }
448}
449
450// ============================================================================
451// TCP-Specific Convenience Constructors (Feature-Gated)
452// ============================================================================
453
454#[cfg(feature = "tcp")]
455impl Client<turbomcp_transport::tcp::TcpTransport> {
456 /// Connect to a TCP MCP server (convenience method)
457 ///
458 /// Convenient one-liner for TCP connections - balanced DX.
459 ///
460 /// # Arguments
461 ///
462 /// * `addr` - Server address (e.g., "127.0.0.1:8765" or localhost:8765")
463 ///
464 /// # Returns
465 ///
466 /// Returns an initialized `Client` ready to use.
467 ///
468 /// # Examples
469 ///
470 /// ```rust,no_run
471 /// # #[cfg(feature = "tcp")]
472 /// use turbomcp_client::Client;
473 ///
474 /// # async fn example() -> turbomcp_protocol::Result<()> {
475 /// let client = Client::connect_tcp("127.0.0.1:8765").await?;
476 /// let tools = client.list_tools().await?;
477 /// # Ok(())
478 /// # }
479 /// ```
480 pub async fn connect_tcp(addr: impl AsRef<str>) -> Result<Self> {
481 use std::net::SocketAddr;
482 use turbomcp_transport::tcp::TcpTransport;
483
484 let server_addr: SocketAddr = addr
485 .as_ref()
486 .parse()
487 .map_err(|e| Error::bad_request(format!("Invalid address: {}", e)))?;
488
489 // Client binds to 0.0.0.0:0 (any available port)
490 let bind_addr: SocketAddr = if server_addr.is_ipv6() {
491 "[::]:0".parse().unwrap()
492 } else {
493 "0.0.0.0:0".parse().unwrap()
494 };
495
496 let transport = TcpTransport::new_client(bind_addr, server_addr);
497 let client = Self::new(transport);
498
499 client.initialize().await?;
500
501 Ok(client)
502 }
503}
504
505// ============================================================================
506// Unix Socket-Specific Convenience Constructors (Feature-Gated)
507// ============================================================================
508
509#[cfg(all(unix, feature = "unix"))]
510impl Client<turbomcp_transport::unix::UnixTransport> {
511 /// Connect to a Unix socket MCP server (convenience method)
512 ///
513 /// Convenient one-liner for Unix socket IPC - balanced DX.
514 ///
515 /// # Arguments
516 ///
517 /// * `path` - Socket file path (e.g., "/tmp/mcp.sock")
518 ///
519 /// # Returns
520 ///
521 /// Returns an initialized `Client` ready to use.
522 ///
523 /// # Examples
524 ///
525 /// ```rust,no_run
526 /// # #[cfg(all(unix, feature = "unix"))]
527 /// use turbomcp_client::Client;
528 ///
529 /// # async fn example() -> turbomcp_protocol::Result<()> {
530 /// let client = Client::connect_unix("/tmp/mcp.sock").await?;
531 /// let tools = client.list_tools().await?;
532 /// # Ok(())
533 /// # }
534 /// ```
535 pub async fn connect_unix(path: impl Into<std::path::PathBuf>) -> Result<Self> {
536 use turbomcp_transport::unix::UnixTransport;
537
538 let transport = UnixTransport::new_client(path.into());
539 let client = Self::new(transport);
540
541 client.initialize().await?;
542
543 Ok(client)
544 }
545}
546
547impl<T: Transport + 'static> Client<T> {
548 /// Register message handlers with the dispatcher
549 ///
550 /// This method sets up the callbacks that handle server-initiated requests
551 /// and notifications. The dispatcher's background task routes incoming
552 /// messages to these handlers.
553 ///
554 /// This is called automatically during Client construction (in `new()` and
555 /// `with_capabilities()`), so you don't need to call it manually.
556 ///
557 /// ## How It Works
558 ///
559 /// The handlers are synchronous closures that spawn async tasks to do the
560 /// actual work. This allows the dispatcher to continue routing messages
561 /// without blocking on handler execution.
562 fn register_dispatcher_handlers(&self) {
563 let dispatcher = self.inner.protocol.dispatcher();
564 let client_for_requests = self.clone();
565 let client_for_notifications = self.clone();
566
567 // Request handler (elicitation, sampling, etc.)
568 let semaphore = Arc::clone(&self.inner.handler_semaphore);
569 let request_handler = Arc::new(move |request: JsonRpcRequest| {
570 let client = client_for_requests.clone();
571 let method = request.method.clone();
572 let req_id = request.id.clone();
573 let semaphore = Arc::clone(&semaphore);
574
575 // ✅ Spawn async task with bounded concurrency
576 tokio::spawn(async move {
577 // Acquire permit (blocks if 100 requests already in flight)
578 let _permit = semaphore
579 .acquire()
580 .await
581 .expect("Semaphore should not be closed");
582
583 tracing::debug!(
584 "🔄 [request_handler] Handling server-initiated request: method={}, id={:?}",
585 method,
586 req_id
587 );
588 if let Err(e) = client.handle_request(request).await {
589 tracing::error!(
590 "❌ [request_handler] Error handling server request '{}': {}",
591 method,
592 e
593 );
594 tracing::error!(" Request ID: {:?}", req_id);
595 tracing::error!(" Error kind: {:?}", e.kind);
596 } else {
597 tracing::debug!(
598 "✅ [request_handler] Successfully handled server request: method={}, id={:?}",
599 method,
600 req_id
601 );
602 }
603 // Permit automatically released on drop ✅
604 });
605 Ok(())
606 });
607
608 // Notification handler
609 let semaphore_notif = Arc::clone(&self.inner.handler_semaphore);
610 let notification_handler = Arc::new(move |notification: JsonRpcNotification| {
611 let client = client_for_notifications.clone();
612 let semaphore = Arc::clone(&semaphore_notif);
613
614 // ✅ Spawn async task with bounded concurrency
615 tokio::spawn(async move {
616 // Acquire permit (blocks if 100 handlers already in flight)
617 let _permit = semaphore
618 .acquire()
619 .await
620 .expect("Semaphore should not be closed");
621
622 if let Err(e) = client.handle_notification(notification).await {
623 tracing::error!("Error handling server notification: {}", e);
624 }
625 // Permit automatically released on drop ✅
626 });
627 Ok(())
628 });
629
630 // Register handlers synchronously - no race condition!
631 // The set_* methods are now synchronous with std::sync::Mutex
632 dispatcher.set_request_handler(request_handler);
633 dispatcher.set_notification_handler(notification_handler);
634 tracing::debug!("Dispatcher handlers registered successfully");
635 }
636
637 /// Handle server-initiated requests (elicitation, sampling, roots)
638 ///
639 /// This method is called by the MessageDispatcher when it receives a request
640 /// from the server. It routes the request to the appropriate handler based on
641 /// the method name.
642 async fn handle_request(&self, request: JsonRpcRequest) -> Result<()> {
643 match request.method.as_str() {
644 "sampling/createMessage" => {
645 let handler_opt = self
646 .inner
647 .sampling_handler
648 .lock()
649 .expect("sampling_handler mutex poisoned")
650 .clone();
651 if let Some(handler) = handler_opt {
652 // Extract request ID for proper correlation
653 let request_id = match &request.id {
654 turbomcp_protocol::MessageId::String(s) => s.clone(),
655 turbomcp_protocol::MessageId::Number(n) => n.to_string(),
656 turbomcp_protocol::MessageId::Uuid(u) => u.to_string(),
657 };
658
659 let params: CreateMessageRequest =
660 serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
661 .map_err(|e| {
662 Error::protocol(format!("Invalid createMessage params: {}", e))
663 })?;
664
665 match handler.handle_create_message(request_id, params).await {
666 Ok(result) => {
667 let result_value = serde_json::to_value(result).map_err(|e| {
668 Error::protocol(format!("Failed to serialize response: {}", e))
669 })?;
670 let response = JsonRpcResponse::success(result_value, request.id);
671 self.send_response(response).await?;
672 }
673 Err(e) => {
674 tracing::warn!(
675 "⚠️ [handle_request] Sampling handler returned error: {}",
676 e
677 );
678
679 // Preserve error semantics by checking actual error type
680 // This allows proper error code propagation for retry logic
681 let (code, message) = if let Some(handler_err) =
682 e.downcast_ref::<HandlerError>()
683 {
684 // HandlerError has explicit JSON-RPC code mapping
685 let json_err = handler_err.into_jsonrpc_error();
686 tracing::info!(
687 "📋 [handle_request] HandlerError mapped to JSON-RPC code: {}",
688 json_err.code
689 );
690 (json_err.code, json_err.message)
691 } else if let Some(proto_err) =
692 e.downcast_ref::<turbomcp_protocol::Error>()
693 {
694 // Protocol errors have ErrorKind-based mapping
695 tracing::info!(
696 "📋 [handle_request] Protocol error mapped to code: {}",
697 proto_err.jsonrpc_error_code()
698 );
699 (proto_err.jsonrpc_error_code(), proto_err.to_string())
700 } else {
701 // Generic errors default to Internal (-32603)
702 // Log the error type for debugging (should rarely hit this path)
703 tracing::warn!(
704 "📋 [handle_request] Sampling handler returned unknown error type (not HandlerError or Protocol error): {}",
705 std::any::type_name_of_val(&*e)
706 );
707 (-32603, format!("Sampling handler error: {}", e))
708 };
709
710 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
711 code,
712 message,
713 data: None,
714 };
715 let response =
716 JsonRpcResponse::error_response(error, request.id.clone());
717
718 tracing::info!(
719 "🔄 [handle_request] Attempting to send error response for request: {:?}",
720 request.id
721 );
722 self.send_response(response).await?;
723 tracing::info!(
724 "✅ [handle_request] Error response sent successfully for request: {:?}",
725 request.id
726 );
727 }
728 }
729 } else {
730 // No handler configured
731 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
732 code: -32601,
733 message: "Sampling not supported".to_string(),
734 data: None,
735 };
736 let response = JsonRpcResponse::error_response(error, request.id);
737 self.send_response(response).await?;
738 }
739 }
740 "roots/list" => {
741 // Handle roots/list request from server
742 // Clone the handler Arc to avoid holding mutex across await
743 let handler_opt = self
744 .inner
745 .handlers
746 .lock()
747 .expect("handlers mutex poisoned")
748 .roots
749 .clone();
750
751 let roots_result = if let Some(handler) = handler_opt {
752 handler.handle_roots_request().await
753 } else {
754 // No handler - return empty list per MCP spec
755 Ok(Vec::new())
756 };
757
758 match roots_result {
759 Ok(roots) => {
760 let result_value =
761 serde_json::to_value(turbomcp_protocol::types::ListRootsResult {
762 roots,
763 _meta: None,
764 })
765 .map_err(|e| {
766 Error::protocol(format!(
767 "Failed to serialize roots response: {}",
768 e
769 ))
770 })?;
771 let response = JsonRpcResponse::success(result_value, request.id);
772 self.send_response(response).await?;
773 }
774 Err(e) => {
775 // FIXED: Extract error code from HandlerError instead of hardcoding -32603
776 // Roots handler returns HandlerResult<Vec<Root>>, so error is HandlerError
777 let json_err = e.into_jsonrpc_error();
778 let response = JsonRpcResponse::error_response(json_err, request.id);
779 self.send_response(response).await?;
780 }
781 }
782 }
783 "elicitation/create" => {
784 // Clone handler Arc before await to avoid holding mutex across await
785 let handler_opt = self
786 .inner
787 .handlers
788 .lock()
789 .expect("handlers mutex poisoned")
790 .elicitation
791 .clone();
792 if let Some(handler) = handler_opt {
793 // Parse elicitation request params as MCP protocol type
794 let proto_request: turbomcp_protocol::types::ElicitRequest =
795 serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
796 .map_err(|e| {
797 Error::protocol(format!("Invalid elicitation params: {}", e))
798 })?;
799
800 // Wrap protocol request with ID for handler (preserves type safety!)
801 let handler_request =
802 crate::handlers::ElicitationRequest::new(request.id.clone(), proto_request);
803
804 // Call the registered elicitation handler
805 match handler.handle_elicitation(handler_request).await {
806 Ok(elicit_response) => {
807 // Convert handler response back to protocol type
808 let proto_result = elicit_response.into_protocol();
809 let result_value = serde_json::to_value(proto_result).map_err(|e| {
810 Error::protocol(format!(
811 "Failed to serialize elicitation response: {}",
812 e
813 ))
814 })?;
815 let response = JsonRpcResponse::success(result_value, request.id);
816 self.send_response(response).await?;
817 }
818 Err(e) => {
819 // Convert handler error to JSON-RPC error using centralized mapping
820 let response =
821 JsonRpcResponse::error_response(e.into_jsonrpc_error(), request.id);
822 self.send_response(response).await?;
823 }
824 }
825 } else {
826 // No handler configured - elicitation not supported
827 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
828 code: -32601,
829 message: "Elicitation not supported - no handler registered".to_string(),
830 data: None,
831 };
832 let response = JsonRpcResponse::error_response(error, request.id);
833 self.send_response(response).await?;
834 }
835 }
836 _ => {
837 // Unknown method
838 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
839 code: -32601,
840 message: format!("Method not found: {}", request.method),
841 data: None,
842 };
843 let response = JsonRpcResponse::error_response(error, request.id);
844 self.send_response(response).await?;
845 }
846 }
847 Ok(())
848 }
849
850 /// Handle server-initiated notifications
851 ///
852 /// Routes notifications to appropriate handlers based on method name.
853 /// MCP defines several notification types that servers can send to clients:
854 ///
855 /// - `notifications/progress` - Progress updates for long-running operations
856 /// - `notifications/message` - Log messages from server
857 /// - `notifications/resources/updated` - Resource content changed
858 /// - `notifications/resources/list_changed` - Resource list changed
859 async fn handle_notification(&self, notification: JsonRpcNotification) -> Result<()> {
860 match notification.method.as_str() {
861 "notifications/progress" => {
862 // Progress tracking has been removed
863 tracing::debug!(
864 "Received progress notification - progress tracking has been removed"
865 );
866 }
867
868 "notifications/message" => {
869 // Route to log handler
870 let handler_opt = self
871 .inner
872 .handlers
873 .lock()
874 .expect("handlers mutex poisoned")
875 .get_log_handler();
876
877 if let Some(handler) = handler_opt {
878 // Parse log message
879 let log: crate::handlers::LoggingNotification = serde_json::from_value(
880 notification.params.unwrap_or(serde_json::Value::Null),
881 )
882 .map_err(|e| Error::protocol(format!("Invalid log notification: {}", e)))?;
883
884 // Call handler
885 if let Err(e) = handler.handle_log(log).await {
886 tracing::error!("Log handler error: {}", e);
887 }
888 } else {
889 tracing::debug!("Received log notification but no handler registered");
890 }
891 }
892
893 "notifications/resources/updated" => {
894 // Route to resource update handler
895 let handler_opt = self
896 .inner
897 .handlers
898 .lock()
899 .expect("handlers mutex poisoned")
900 .get_resource_update_handler();
901
902 if let Some(handler) = handler_opt {
903 // Parse resource update notification
904 let update: crate::handlers::ResourceUpdatedNotification =
905 serde_json::from_value(
906 notification.params.unwrap_or(serde_json::Value::Null),
907 )
908 .map_err(|e| {
909 Error::protocol(format!("Invalid resource update notification: {}", e))
910 })?;
911
912 // Call handler
913 if let Err(e) = handler.handle_resource_update(update).await {
914 tracing::error!("Resource update handler error: {}", e);
915 }
916 } else {
917 tracing::debug!(
918 "Received resource update notification but no handler registered"
919 );
920 }
921 }
922
923 "notifications/resources/list_changed" => {
924 // Route to resource list changed handler
925 let handler_opt = self
926 .inner
927 .handlers
928 .lock()
929 .expect("handlers mutex poisoned")
930 .get_resource_list_changed_handler();
931
932 if let Some(handler) = handler_opt {
933 if let Err(e) = handler.handle_resource_list_changed().await {
934 tracing::error!("Resource list changed handler error: {}", e);
935 }
936 } else {
937 tracing::debug!(
938 "Resource list changed notification received (no handler registered)"
939 );
940 }
941 }
942
943 "notifications/prompts/list_changed" => {
944 // Route to prompt list changed handler
945 let handler_opt = self
946 .inner
947 .handlers
948 .lock()
949 .expect("handlers mutex poisoned")
950 .get_prompt_list_changed_handler();
951
952 if let Some(handler) = handler_opt {
953 if let Err(e) = handler.handle_prompt_list_changed().await {
954 tracing::error!("Prompt list changed handler error: {}", e);
955 }
956 } else {
957 tracing::debug!(
958 "Prompt list changed notification received (no handler registered)"
959 );
960 }
961 }
962
963 "notifications/tools/list_changed" => {
964 // Route to tool list changed handler
965 let handler_opt = self
966 .inner
967 .handlers
968 .lock()
969 .expect("handlers mutex poisoned")
970 .get_tool_list_changed_handler();
971
972 if let Some(handler) = handler_opt {
973 if let Err(e) = handler.handle_tool_list_changed().await {
974 tracing::error!("Tool list changed handler error: {}", e);
975 }
976 } else {
977 tracing::debug!(
978 "Tool list changed notification received (no handler registered)"
979 );
980 }
981 }
982
983 "notifications/cancelled" => {
984 // Route to cancellation handler
985 let handler_opt = self
986 .inner
987 .handlers
988 .lock()
989 .expect("handlers mutex poisoned")
990 .get_cancellation_handler();
991
992 if let Some(handler) = handler_opt {
993 // Parse cancellation notification
994 let cancellation: crate::handlers::CancelledNotification =
995 serde_json::from_value(
996 notification.params.unwrap_or(serde_json::Value::Null),
997 )
998 .map_err(|e| {
999 Error::protocol(format!("Invalid cancellation notification: {}", e))
1000 })?;
1001
1002 // Call handler
1003 if let Err(e) = handler.handle_cancellation(cancellation).await {
1004 tracing::error!("Cancellation handler error: {}", e);
1005 }
1006 } else {
1007 tracing::debug!("Cancellation notification received (no handler registered)");
1008 }
1009 }
1010
1011 _ => {
1012 // Unknown notification type
1013 tracing::debug!("Received unknown notification: {}", notification.method);
1014 }
1015 }
1016
1017 Ok(())
1018 }
1019
1020 async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
1021 tracing::info!(
1022 "📤 [send_response] Sending JSON-RPC response: id={:?}",
1023 response.id
1024 );
1025
1026 let payload = serde_json::to_vec(&response).map_err(|e| {
1027 tracing::error!("❌ [send_response] Failed to serialize response: {}", e);
1028 Error::protocol(format!("Failed to serialize response: {}", e))
1029 })?;
1030
1031 tracing::debug!(
1032 "📤 [send_response] Response payload: {} bytes",
1033 payload.len()
1034 );
1035 tracing::debug!(
1036 "📤 [send_response] Response JSON: {}",
1037 String::from_utf8_lossy(&payload)
1038 );
1039
1040 let message = TransportMessage::new(
1041 turbomcp_protocol::MessageId::from("response".to_string()),
1042 payload.into(),
1043 );
1044
1045 self.inner
1046 .protocol
1047 .transport()
1048 .send(message)
1049 .await
1050 .map_err(|e| {
1051 tracing::error!("❌ [send_response] Transport send failed: {}", e);
1052 Error::transport(format!("Failed to send response: {}", e))
1053 })?;
1054
1055 tracing::info!(
1056 "✅ [send_response] Response sent successfully: id={:?}",
1057 response.id
1058 );
1059 Ok(())
1060 }
1061
1062 /// Initialize the connection with the MCP server
1063 ///
1064 /// Performs the initialization handshake with the server, negotiating capabilities
1065 /// and establishing the protocol version. This method must be called before
1066 /// any other operations can be performed.
1067 ///
1068 /// # Returns
1069 ///
1070 /// Returns an `InitializeResult` containing server information and negotiated capabilities.
1071 ///
1072 /// # Errors
1073 ///
1074 /// Returns an error if:
1075 /// - The transport connection fails
1076 /// - The server rejects the initialization request
1077 /// - Protocol negotiation fails
1078 ///
1079 /// # Examples
1080 ///
1081 /// ```rust,no_run
1082 /// # use turbomcp_client::Client;
1083 /// # use turbomcp_transport::stdio::StdioTransport;
1084 /// # async fn example() -> turbomcp_protocol::Result<()> {
1085 /// let mut client = Client::new(StdioTransport::new());
1086 ///
1087 /// let result = client.initialize().await?;
1088 /// println!("Server: {} v{}", result.server_info.name, result.server_info.version);
1089 /// # Ok(())
1090 /// # }
1091 /// ```
1092 pub async fn initialize(&self) -> Result<InitializeResult> {
1093 // Auto-connect transport if not already connected
1094 // This provides consistent DX across all transports (Stdio, TCP, HTTP, WebSocket, Unix)
1095 let transport = self.inner.protocol.transport();
1096 let transport_state = transport.state().await;
1097 if !matches!(
1098 transport_state,
1099 turbomcp_transport::TransportState::Connected
1100 ) {
1101 tracing::debug!(
1102 "Auto-connecting transport (current state: {:?})",
1103 transport_state
1104 );
1105 transport
1106 .connect()
1107 .await
1108 .map_err(|e| Error::transport(format!("Failed to connect transport: {}", e)))?;
1109 tracing::info!("Transport connected successfully");
1110 }
1111
1112 // Build client capabilities based on registered handlers (automatic detection)
1113 let mut client_caps = ProtocolClientCapabilities::default();
1114
1115 // Detect sampling capability from handler
1116 if let Some(sampling_caps) = self.get_sampling_capabilities() {
1117 client_caps.sampling = Some(sampling_caps);
1118 }
1119
1120 // Detect elicitation capability from handler
1121 if let Some(elicitation_caps) = self.get_elicitation_capabilities() {
1122 client_caps.elicitation = Some(elicitation_caps);
1123 }
1124
1125 // Detect roots capability from handler
1126 if let Some(roots_caps) = self.get_roots_capabilities() {
1127 client_caps.roots = Some(roots_caps);
1128 }
1129
1130 // Send MCP initialization request
1131 let request = InitializeRequest {
1132 protocol_version: PROTOCOL_VERSION.to_string(),
1133 capabilities: client_caps,
1134 client_info: turbomcp_protocol::types::Implementation {
1135 name: "turbomcp-client".to_string(),
1136 version: env!("CARGO_PKG_VERSION").to_string(),
1137 title: Some("TurboMCP Client".to_string()),
1138 },
1139 _meta: None,
1140 };
1141
1142 let protocol_response: ProtocolInitializeResult = self
1143 .inner
1144 .protocol
1145 .request("initialize", Some(serde_json::to_value(request)?))
1146 .await?;
1147
1148 // AtomicBool: lock-free store with Ordering::Relaxed
1149 self.inner.initialized.store(true, Ordering::Relaxed);
1150
1151 // Send initialized notification
1152 self.inner
1153 .protocol
1154 .notify("notifications/initialized", None)
1155 .await?;
1156
1157 // Convert protocol response to client response type
1158 Ok(InitializeResult {
1159 server_info: protocol_response.server_info,
1160 server_capabilities: protocol_response.capabilities,
1161 })
1162 }
1163
1164 /// Execute a protocol method with plugin middleware
1165 ///
1166 /// This is a generic helper for wrapping protocol calls with plugin middleware.
1167 pub(crate) async fn execute_with_plugins<R>(
1168 &self,
1169 method_name: &str,
1170 params: Option<serde_json::Value>,
1171 ) -> Result<R>
1172 where
1173 R: serde::de::DeserializeOwned + serde::Serialize + Clone,
1174 {
1175 // Create JSON-RPC request for plugin context
1176 let json_rpc_request = turbomcp_protocol::jsonrpc::JsonRpcRequest {
1177 jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
1178 id: turbomcp_protocol::MessageId::Number(1),
1179 method: method_name.to_string(),
1180 params: params.clone(),
1181 };
1182
1183 // 1. Create request context for plugins
1184 let mut req_ctx =
1185 crate::plugins::RequestContext::new(json_rpc_request, std::collections::HashMap::new());
1186
1187 // 2. Execute before_request plugin middleware
1188 if let Err(e) = self
1189 .inner
1190 .plugin_registry
1191 .lock()
1192 .await
1193 .execute_before_request(&mut req_ctx)
1194 .await
1195 {
1196 return Err(Error::bad_request(format!(
1197 "Plugin before_request failed: {}",
1198 e
1199 )));
1200 }
1201
1202 // 3. Execute the actual protocol call
1203 let start_time = std::time::Instant::now();
1204 let protocol_result: Result<R> = self
1205 .inner
1206 .protocol
1207 .request(method_name, req_ctx.params().cloned())
1208 .await;
1209 let duration = start_time.elapsed();
1210
1211 // 4. Prepare response context
1212 let mut resp_ctx = match protocol_result {
1213 Ok(ref response) => {
1214 let response_value = serde_json::to_value(response.clone())?;
1215 crate::plugins::ResponseContext::new(req_ctx, Some(response_value), None, duration)
1216 }
1217 Err(ref e) => {
1218 crate::plugins::ResponseContext::new(req_ctx, None, Some(*e.clone()), duration)
1219 }
1220 };
1221
1222 // 5. Execute after_response plugin middleware
1223 if let Err(e) = self
1224 .inner
1225 .plugin_registry
1226 .lock()
1227 .await
1228 .execute_after_response(&mut resp_ctx)
1229 .await
1230 {
1231 return Err(Error::bad_request(format!(
1232 "Plugin after_response failed: {}",
1233 e
1234 )));
1235 }
1236
1237 // 6. Return the final result, checking for plugin modifications
1238 match protocol_result {
1239 Ok(ref response) => {
1240 // Check if plugins modified the response
1241 if let Some(modified_response) = resp_ctx.response {
1242 // Try to deserialize the modified response
1243 if let Ok(modified_result) =
1244 serde_json::from_value::<R>(modified_response.clone())
1245 {
1246 return Ok(modified_result);
1247 }
1248 }
1249
1250 // No plugin modifications, use original response
1251 Ok(response.clone())
1252 }
1253 Err(e) => {
1254 // Check if plugins provided an error recovery response
1255 if let Some(recovery_response) = resp_ctx.response {
1256 if let Ok(recovery_result) = serde_json::from_value::<R>(recovery_response) {
1257 Ok(recovery_result)
1258 } else {
1259 Err(e)
1260 }
1261 } else {
1262 Err(e)
1263 }
1264 }
1265 }
1266 }
1267
1268 /// Subscribe to resource change notifications
1269 ///
1270 /// Registers interest in receiving notifications when the specified
1271 /// resource changes. The server will send notifications when the
1272 /// resource is modified, created, or deleted.
1273 ///
1274 /// # Arguments
1275 ///
1276 /// * `uri` - The URI of the resource to monitor
1277 ///
1278 /// # Returns
1279 ///
1280 /// Returns `EmptyResult` on successful subscription.
1281 ///
1282 /// # Errors
1283 ///
1284 /// Returns an error if:
1285 /// - The client is not initialized
1286 /// - The URI is invalid or empty
1287 /// - The server doesn't support subscriptions
1288 /// - The request fails
1289 ///
1290 /// # Examples
1291 ///
1292 /// ```rust,no_run
1293 /// # use turbomcp_client::Client;
1294 /// # use turbomcp_transport::stdio::StdioTransport;
1295 /// # async fn example() -> turbomcp_protocol::Result<()> {
1296 /// let mut client = Client::new(StdioTransport::new());
1297 /// client.initialize().await?;
1298 ///
1299 /// // Subscribe to file changes
1300 /// client.subscribe("file:///watch/directory").await?;
1301 /// println!("Subscribed to resource changes");
1302 /// # Ok(())
1303 /// # }
1304 /// ```
1305 pub async fn subscribe(&self, uri: &str) -> Result<EmptyResult> {
1306 if !self.inner.initialized.load(Ordering::Relaxed) {
1307 return Err(Error::bad_request("Client not initialized"));
1308 }
1309
1310 if uri.is_empty() {
1311 return Err(Error::bad_request("Subscription URI cannot be empty"));
1312 }
1313
1314 // Send resources/subscribe request with plugin middleware
1315 let request = SubscribeRequest {
1316 uri: uri.to_string(),
1317 };
1318
1319 self.execute_with_plugins(
1320 "resources/subscribe",
1321 Some(serde_json::to_value(request).map_err(|e| {
1322 Error::protocol(format!("Failed to serialize subscribe request: {}", e))
1323 })?),
1324 )
1325 .await
1326 }
1327
1328 /// Unsubscribe from resource change notifications
1329 ///
1330 /// Cancels a previous subscription to resource changes. After unsubscribing,
1331 /// the client will no longer receive notifications for the specified resource.
1332 ///
1333 /// # Arguments
1334 ///
1335 /// * `uri` - The URI of the resource to stop monitoring
1336 ///
1337 /// # Returns
1338 ///
1339 /// Returns `EmptyResult` on successful unsubscription.
1340 ///
1341 /// # Errors
1342 ///
1343 /// Returns an error if:
1344 /// - The client is not initialized
1345 /// - The URI is invalid or empty
1346 /// - No active subscription exists for the URI
1347 /// - The request fails
1348 ///
1349 /// # Examples
1350 ///
1351 /// ```rust,no_run
1352 /// # use turbomcp_client::Client;
1353 /// # use turbomcp_transport::stdio::StdioTransport;
1354 /// # async fn example() -> turbomcp_protocol::Result<()> {
1355 /// let mut client = Client::new(StdioTransport::new());
1356 /// client.initialize().await?;
1357 ///
1358 /// // Unsubscribe from file changes
1359 /// client.unsubscribe("file:///watch/directory").await?;
1360 /// println!("Unsubscribed from resource changes");
1361 /// # Ok(())
1362 /// # }
1363 /// ```
1364 pub async fn unsubscribe(&self, uri: &str) -> Result<EmptyResult> {
1365 if !self.inner.initialized.load(Ordering::Relaxed) {
1366 return Err(Error::bad_request("Client not initialized"));
1367 }
1368
1369 if uri.is_empty() {
1370 return Err(Error::bad_request("Unsubscription URI cannot be empty"));
1371 }
1372
1373 // Send resources/unsubscribe request with plugin middleware
1374 let request = UnsubscribeRequest {
1375 uri: uri.to_string(),
1376 };
1377
1378 self.execute_with_plugins(
1379 "resources/unsubscribe",
1380 Some(serde_json::to_value(request).map_err(|e| {
1381 Error::protocol(format!("Failed to serialize unsubscribe request: {}", e))
1382 })?),
1383 )
1384 .await
1385 }
1386
1387 /// Get the client's capabilities configuration
1388 #[must_use]
1389 pub fn capabilities(&self) -> &ClientCapabilities {
1390 &self.inner.capabilities
1391 }
1392
1393 /// Initialize all registered plugins
1394 ///
1395 /// This should be called after registration but before using the client.
1396 pub async fn initialize_plugins(&self) -> Result<()> {
1397 // Set up client context for plugins with actual client capabilities
1398 let mut capabilities = std::collections::HashMap::new();
1399 capabilities.insert(
1400 "protocol_version".to_string(),
1401 serde_json::json!("2024-11-05"),
1402 );
1403 capabilities.insert(
1404 "mcp_version".to_string(),
1405 serde_json::json!(env!("CARGO_PKG_VERSION")),
1406 );
1407 capabilities.insert(
1408 "supports_notifications".to_string(),
1409 serde_json::json!(true),
1410 );
1411 capabilities.insert(
1412 "supports_sampling".to_string(),
1413 serde_json::json!(self.has_sampling_handler()),
1414 );
1415 capabilities.insert("supports_progress".to_string(), serde_json::json!(true));
1416 capabilities.insert("supports_roots".to_string(), serde_json::json!(true));
1417
1418 // Extract client configuration
1419 let mut config = std::collections::HashMap::new();
1420 config.insert(
1421 "client_name".to_string(),
1422 serde_json::json!("turbomcp-client"),
1423 );
1424 config.insert(
1425 "initialized".to_string(),
1426 serde_json::json!(self.inner.initialized.load(Ordering::Relaxed)),
1427 );
1428 config.insert(
1429 "plugin_count".to_string(),
1430 serde_json::json!(self.inner.plugin_registry.lock().await.plugin_count()),
1431 );
1432
1433 let context = crate::plugins::PluginContext::new(
1434 "turbomcp-client".to_string(),
1435 env!("CARGO_PKG_VERSION").to_string(),
1436 capabilities,
1437 config,
1438 vec![], // Will be populated by the registry
1439 );
1440
1441 self.inner
1442 .plugin_registry
1443 .lock()
1444 .await
1445 .set_client_context(context);
1446
1447 // Note: Individual plugins are initialized automatically during registration
1448 // via PluginRegistry::register_plugin(). This method ensures the registry
1449 // has proper client context for any future plugin registrations.
1450 Ok(())
1451 }
1452
1453 /// Cleanup all registered plugins
1454 ///
1455 /// This should be called when the client is being shut down.
1456 pub async fn cleanup_plugins(&self) -> Result<()> {
1457 // Clear the plugin registry - plugins will be dropped and cleaned up automatically
1458 // The Rust ownership system ensures proper cleanup when the Arc<dyn ClientPlugin>
1459 // references are dropped.
1460
1461 // Note: The plugin system uses RAII (Resource Acquisition Is Initialization)
1462 // pattern where plugins clean up their resources in their Drop implementation.
1463 // Replace the registry with a fresh one (mutex ensures safe access)
1464 *self.inner.plugin_registry.lock().await = crate::plugins::PluginRegistry::new();
1465 Ok(())
1466 }
1467
1468 // Note: Capability detection methods (has_*_handler, get_*_capabilities)
1469 // are defined in their respective operation modules:
1470 // - sampling.rs: has_sampling_handler, get_sampling_capabilities
1471 // - handlers.rs: has_elicitation_handler, has_roots_handler
1472 //
1473 // Additional capability getters for elicitation and roots added below
1474 // since they're used during initialization
1475
1476 /// Get elicitation capabilities if handler is registered
1477 /// Automatically detects capability based on registered handler
1478 fn get_elicitation_capabilities(
1479 &self,
1480 ) -> Option<turbomcp_protocol::types::ElicitationCapabilities> {
1481 if self.has_elicitation_handler() {
1482 // Currently returns default capabilities. In the future, schema_validation support
1483 // could be detected from handler traits by adding a HasSchemaValidation marker trait
1484 // that handlers could implement. For now, handlers validate schemas themselves.
1485 Some(turbomcp_protocol::types::ElicitationCapabilities::default())
1486 } else {
1487 None
1488 }
1489 }
1490
1491 /// Get roots capabilities if handler is registered
1492 fn get_roots_capabilities(&self) -> Option<turbomcp_protocol::types::RootsCapabilities> {
1493 if self.has_roots_handler() {
1494 // Roots capabilities indicate whether list can change
1495 Some(turbomcp_protocol::types::RootsCapabilities {
1496 list_changed: Some(true), // Support dynamic roots by default
1497 })
1498 } else {
1499 None
1500 }
1501 }
1502}