turbomcp_client/client/core.rs
1//! Core Client implementation for MCP communication
2//!
3//! This module contains the main `Client<T>` struct and its implementation,
4//! providing the core MCP client functionality including:
5//!
6//! - Connection initialization and lifecycle management
7//! - Message processing and bidirectional communication
8//! - MCP operation support (tools, prompts, resources, sampling, etc.)
9//! - Plugin middleware integration
10//! - Handler registration and management
11//!
12//! # Architecture
13//!
14//! `Client<T>` is implemented as a cheaply-cloneable Arc wrapper with interior
15//! mutability (same pattern as reqwest and AWS SDK):
16//!
17//! - **AtomicBool** for initialized flag (lock-free)
18//! - **Arc<Mutex<...>>** for handlers/plugins (infrequent mutation)
19//! - **`Arc<ClientInner<T>>`** for cheap cloning
20
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::{Arc, Mutex as StdMutex};
23
24use turbomcp_protocol::jsonrpc::*;
25use turbomcp_protocol::types::{
26 ClientCapabilities as ProtocolClientCapabilities, InitializeResult as ProtocolInitializeResult,
27 *,
28};
29use turbomcp_protocol::{Error, PROTOCOL_VERSION, Result};
30use turbomcp_transport::{Transport, TransportMessage};
31
32use super::config::InitializeResult;
33use super::protocol::ProtocolClient;
34use crate::{
35 ClientCapabilities,
36 handlers::{HandlerError, HandlerRegistry},
37 sampling::SamplingHandler,
38};
39
40/// Inner client state with interior mutability
41///
42/// This structure contains the actual client state and is wrapped in Arc<...>
43/// to enable cheap cloning (same pattern as reqwest and AWS SDK).
44pub(super) struct ClientInner<T: Transport + 'static> {
45 /// Protocol client for low-level communication
46 pub(super) protocol: ProtocolClient<T>,
47
48 /// Client capabilities (immutable after construction)
49 pub(super) capabilities: ClientCapabilities,
50
51 /// Initialization state (lock-free atomic boolean)
52 pub(super) initialized: AtomicBool,
53
54 /// Optional sampling handler (mutex for dynamic updates)
55 pub(super) sampling_handler: Arc<StdMutex<Option<Arc<dyn SamplingHandler>>>>,
56
57 /// Handler registry for bidirectional communication (mutex for registration)
58 pub(super) handlers: Arc<StdMutex<HandlerRegistry>>,
59
60 /// Plugin registry for middleware (tokio mutex - holds across await)
61 pub(super) plugin_registry: Arc<tokio::sync::Mutex<crate::plugins::PluginRegistry>>,
62}
63
64/// The core MCP client implementation
65///
66/// Client provides a comprehensive interface for communicating with MCP servers,
67/// supporting all protocol features including tools, prompts, resources, sampling,
68/// elicitation, and bidirectional communication patterns.
69///
70/// # Clone Pattern
71///
72/// `Client<T>` is cheaply cloneable via Arc (same pattern as reqwest and AWS SDK).
73/// All clones share the same underlying connection and state:
74///
75/// ```rust,no_run
76/// use turbomcp_client::Client;
77/// use turbomcp_transport::stdio::StdioTransport;
78///
79/// # async fn example() -> turbomcp_protocol::Result<()> {
80/// let client = Client::new(StdioTransport::new());
81/// client.initialize().await?;
82///
83/// // Cheap clone - shares same connection
84/// let client2 = client.clone();
85/// tokio::spawn(async move {
86/// client2.list_tools().await.ok();
87/// });
88/// # Ok(())
89/// # }
90/// ```
91///
92/// The client must be initialized before use by calling `initialize()` to perform
93/// the MCP handshake and capability negotiation.
94///
95/// # Features
96///
97/// - **Protocol Compliance**: Full MCP 2025-06-18 specification support
98/// - **Bidirectional Communication**: Server-initiated requests and client responses
99/// - **Plugin Middleware**: Extensible request/response processing
100/// - **Handler Registry**: Callbacks for server-initiated operations
101/// - **Connection Management**: Robust error handling and recovery
102/// - **Type Safety**: Compile-time guarantees for MCP message types
103/// - **Cheap Cloning**: Arc-based sharing like reqwest/AWS SDK
104///
105/// # Examples
106///
107/// ```rust,no_run
108/// use turbomcp_client::Client;
109/// use turbomcp_transport::stdio::StdioTransport;
110/// use std::collections::HashMap;
111///
112/// # async fn example() -> turbomcp_protocol::Result<()> {
113/// // Create and initialize client (no mut needed!)
114/// let client = Client::new(StdioTransport::new());
115/// let init_result = client.initialize().await?;
116/// println!("Connected to: {}", init_result.server_info.name);
117///
118/// // Use MCP operations
119/// let tools = client.list_tools().await?;
120/// let mut args = HashMap::new();
121/// args.insert("input".to_string(), serde_json::json!("test"));
122/// let result = client.call_tool("my_tool", Some(args)).await?;
123/// # Ok(())
124/// # }
125/// ```
126pub struct Client<T: Transport + 'static> {
127 pub(super) inner: Arc<ClientInner<T>>,
128}
129
130/// Clone implementation via Arc (same pattern as reqwest/AWS SDK)
131///
132/// Cloning a Client is cheap (just an Arc clone) and all clones share
133/// the same underlying connection and state.
134impl<T: Transport + 'static> Clone for Client<T> {
135 fn clone(&self) -> Self {
136 Self {
137 inner: Arc::clone(&self.inner),
138 }
139 }
140}
141
142impl<T: Transport + 'static> Drop for ClientInner<T> {
143 fn drop(&mut self) {
144 // Best-effort cleanup if shutdown() wasn't called
145 // This is a safety net, but applications SHOULD call shutdown() explicitly
146
147 // Single warning to catch attention
148 tracing::warn!(
149 "MCP Client dropped without explicit shutdown() - call client.shutdown().await for clean resource cleanup"
150 );
151
152 // Details at debug level to avoid noise in production
153 tracing::debug!(" 📘 RECOMMENDATION: Call client.shutdown().await before dropping");
154 tracing::debug!(
155 " 🐛 IMPACT: Background tasks (especially WebSocket reconnection) may continue running"
156 );
157 tracing::debug!(" 💡 See client shutdown documentation for best practices");
158
159 // We can shutdown the dispatcher (it's synchronous)
160 self.protocol.dispatcher().shutdown();
161 tracing::debug!(" ✅ Message dispatcher stopped");
162
163 // ⚠️ LIMITATION: Cannot call transport.disconnect() from Drop because it's async
164 // This means:
165 // - WebSocket: Close frame not sent, reconnection tasks keep running
166 // - HTTP: Connections may not close cleanly
167 // - All transports: Background tasks may be orphaned
168 tracing::debug!(" ❌ Transport NOT disconnected (Drop cannot call async methods)");
169 tracing::debug!(" ⚠️ WebSocket reconnection and other background tasks may continue");
170 }
171}
172
173impl<T: Transport + 'static> Client<T> {
174 /// Create a new client with the specified transport
175 ///
176 /// Creates a new MCP client instance with default capabilities.
177 /// The client must be initialized before use by calling `initialize()`.
178 ///
179 /// # Arguments
180 ///
181 /// * `transport` - The transport implementation to use for communication
182 ///
183 /// # Examples
184 ///
185 /// ```rust,no_run
186 /// use turbomcp_client::Client;
187 /// use turbomcp_transport::stdio::StdioTransport;
188 ///
189 /// let transport = StdioTransport::new();
190 /// let client = Client::new(transport);
191 /// ```
192 pub fn new(transport: T) -> Self {
193 let client = Self {
194 inner: Arc::new(ClientInner {
195 protocol: ProtocolClient::new(transport),
196 capabilities: ClientCapabilities::default(),
197 initialized: AtomicBool::new(false),
198 sampling_handler: Arc::new(StdMutex::new(None)),
199 handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
200 plugin_registry: Arc::new(tokio::sync::Mutex::new(
201 crate::plugins::PluginRegistry::new(),
202 )),
203 }),
204 };
205
206 // Register dispatcher handlers for bidirectional communication
207 client.register_dispatcher_handlers();
208
209 client
210 }
211
212 /// Create a new client with custom capabilities
213 ///
214 /// # Arguments
215 ///
216 /// * `transport` - The transport implementation to use
217 /// * `capabilities` - The client capabilities to negotiate
218 ///
219 /// # Examples
220 ///
221 /// ```rust,no_run
222 /// use turbomcp_client::{Client, ClientCapabilities};
223 /// use turbomcp_transport::stdio::StdioTransport;
224 ///
225 /// let capabilities = ClientCapabilities {
226 /// tools: true,
227 /// prompts: true,
228 /// resources: false,
229 /// sampling: false,
230 /// };
231 ///
232 /// let transport = StdioTransport::new();
233 /// let client = Client::with_capabilities(transport, capabilities);
234 /// ```
235 pub fn with_capabilities(transport: T, capabilities: ClientCapabilities) -> Self {
236 let client = Self {
237 inner: Arc::new(ClientInner {
238 protocol: ProtocolClient::new(transport),
239 capabilities,
240 initialized: AtomicBool::new(false),
241 sampling_handler: Arc::new(StdMutex::new(None)),
242 handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
243 plugin_registry: Arc::new(tokio::sync::Mutex::new(
244 crate::plugins::PluginRegistry::new(),
245 )),
246 }),
247 };
248
249 // Register dispatcher handlers for bidirectional communication
250 client.register_dispatcher_handlers();
251
252 client
253 }
254
255 /// Shutdown the client and clean up all resources
256 ///
257 /// This method performs a **graceful shutdown** of the MCP client by:
258 /// 1. Stopping the message dispatcher background task
259 /// 2. Disconnecting the transport (closes connection, stops background tasks)
260 ///
261 /// **CRITICAL**: After calling `shutdown()`, the client can no longer be used.
262 ///
263 /// # Why This Method Exists
264 ///
265 /// The `Drop` implementation cannot call async methods like `transport.disconnect()`,
266 /// which is required for proper cleanup of WebSocket connections and background tasks.
267 /// Without calling `shutdown()`, WebSocket reconnection tasks will continue running.
268 ///
269 /// # Best Practices
270 ///
271 /// - **Always call `shutdown()` before dropping** for clean resource cleanup
272 /// - For applications: call in signal handlers (`SIGINT`, `SIGTERM`)
273 /// - For tests: call in cleanup/teardown
274 /// - If forgotten, Drop will log warnings and do best-effort cleanup
275 ///
276 /// # Examples
277 ///
278 /// ```rust,no_run
279 /// # use turbomcp_client::Client;
280 /// # use turbomcp_transport::stdio::StdioTransport;
281 /// # async fn example() -> turbomcp_protocol::Result<()> {
282 /// let client = Client::new(StdioTransport::new());
283 /// client.initialize().await?;
284 ///
285 /// // Use client...
286 /// let _tools = client.list_tools().await?;
287 ///
288 /// // ✅ Clean shutdown
289 /// client.shutdown().await?;
290 /// # Ok(())
291 /// # }
292 /// ```
293 pub async fn shutdown(&self) -> Result<()> {
294 tracing::info!("🛑 Shutting down MCP client");
295
296 // 1. Shutdown message dispatcher
297 self.inner.protocol.dispatcher().shutdown();
298 tracing::debug!("✅ Message dispatcher stopped");
299
300 // 2. Disconnect transport (WebSocket: stops reconnection, HTTP: closes connections)
301 match self.inner.protocol.transport().disconnect().await {
302 Ok(()) => {
303 tracing::info!("✅ Transport disconnected successfully");
304 }
305 Err(e) => {
306 tracing::warn!("Transport disconnect error (may already be closed): {}", e);
307 }
308 }
309
310 tracing::info!("✅ MCP client shutdown complete");
311 Ok(())
312 }
313}
314
315// ============================================================================
316// HTTP-Specific Convenience Constructors (Feature-Gated)
317// ============================================================================
318
319#[cfg(feature = "http")]
320impl Client<turbomcp_transport::streamable_http_client::StreamableHttpClientTransport> {
321 /// Connect to an HTTP MCP server (convenience method)
322 ///
323 /// This is a beautiful one-liner alternative to manual configuration.
324 /// Creates an HTTP client, connects, and initializes in one call.
325 ///
326 /// # Arguments
327 ///
328 /// * `url` - The base URL of the MCP server (e.g., "http://localhost:8080")
329 ///
330 /// # Returns
331 ///
332 /// Returns an initialized `Client` ready to use.
333 ///
334 /// # Errors
335 ///
336 /// Returns an error if:
337 /// - The URL is invalid
338 /// - Connection to the server fails
339 /// - Initialization handshake fails
340 ///
341 /// # Examples
342 ///
343 /// ```rust,no_run
344 /// use turbomcp_client::Client;
345 ///
346 /// # async fn example() -> turbomcp_protocol::Result<()> {
347 /// // Beautiful one-liner - balanced with server DX
348 /// let client = Client::connect_http("http://localhost:8080").await?;
349 ///
350 /// // Now use it directly
351 /// let tools = client.list_tools().await?;
352 /// # Ok(())
353 /// # }
354 /// ```
355 ///
356 /// Compare to verbose approach (10+ lines):
357 /// ```rust,no_run
358 /// use turbomcp_client::Client;
359 /// use turbomcp_transport::streamable_http_client::{
360 /// StreamableHttpClientConfig, StreamableHttpClientTransport
361 /// };
362 ///
363 /// # async fn example() -> turbomcp_protocol::Result<()> {
364 /// let config = StreamableHttpClientConfig {
365 /// base_url: "http://localhost:8080".to_string(),
366 /// ..Default::default()
367 /// };
368 /// let transport = StreamableHttpClientTransport::new(config);
369 /// let client = Client::new(transport);
370 /// client.initialize().await?;
371 /// # Ok(())
372 /// # }
373 /// ```
374 pub async fn connect_http(url: impl Into<String>) -> Result<Self> {
375 use turbomcp_transport::streamable_http_client::{
376 StreamableHttpClientConfig, StreamableHttpClientTransport,
377 };
378
379 let config = StreamableHttpClientConfig {
380 base_url: url.into(),
381 ..Default::default()
382 };
383
384 let transport = StreamableHttpClientTransport::new(config);
385 let client = Self::new(transport);
386
387 // Initialize connection immediately
388 client.initialize().await?;
389
390 Ok(client)
391 }
392
393 /// Connect to an HTTP MCP server with custom configuration
394 ///
395 /// Provides more control than `connect_http()` while still being ergonomic.
396 ///
397 /// # Arguments
398 ///
399 /// * `url` - The base URL of the MCP server
400 /// * `config_fn` - Function to customize the configuration
401 ///
402 /// # Examples
403 ///
404 /// ```rust,no_run
405 /// use turbomcp_client::Client;
406 /// use std::time::Duration;
407 ///
408 /// # async fn example() -> turbomcp_protocol::Result<()> {
409 /// let client = Client::connect_http_with("http://localhost:8080", |config| {
410 /// config.timeout = Duration::from_secs(60);
411 /// config.endpoint_path = "/api/mcp".to_string();
412 /// }).await?;
413 /// # Ok(())
414 /// # }
415 /// ```
416 pub async fn connect_http_with<F>(url: impl Into<String>, config_fn: F) -> Result<Self>
417 where
418 F: FnOnce(&mut turbomcp_transport::streamable_http_client::StreamableHttpClientConfig),
419 {
420 use turbomcp_transport::streamable_http_client::{
421 StreamableHttpClientConfig, StreamableHttpClientTransport,
422 };
423
424 let mut config = StreamableHttpClientConfig {
425 base_url: url.into(),
426 ..Default::default()
427 };
428
429 config_fn(&mut config);
430
431 let transport = StreamableHttpClientTransport::new(config);
432 let client = Self::new(transport);
433
434 client.initialize().await?;
435
436 Ok(client)
437 }
438}
439
440// ============================================================================
441// TCP-Specific Convenience Constructors (Feature-Gated)
442// ============================================================================
443
444#[cfg(feature = "tcp")]
445impl Client<turbomcp_transport::tcp::TcpTransport> {
446 /// Connect to a TCP MCP server (convenience method)
447 ///
448 /// Beautiful one-liner for TCP connections - balanced DX.
449 ///
450 /// # Arguments
451 ///
452 /// * `addr` - Server address (e.g., "127.0.0.1:8765" or localhost:8765")
453 ///
454 /// # Returns
455 ///
456 /// Returns an initialized `Client` ready to use.
457 ///
458 /// # Examples
459 ///
460 /// ```rust,no_run
461 /// # #[cfg(feature = "tcp")]
462 /// use turbomcp_client::Client;
463 ///
464 /// # async fn example() -> turbomcp_protocol::Result<()> {
465 /// let client = Client::connect_tcp("127.0.0.1:8765").await?;
466 /// let tools = client.list_tools().await?;
467 /// # Ok(())
468 /// # }
469 /// ```
470 pub async fn connect_tcp(addr: impl AsRef<str>) -> Result<Self> {
471 use std::net::SocketAddr;
472 use turbomcp_transport::tcp::TcpTransport;
473
474 let server_addr: SocketAddr = addr
475 .as_ref()
476 .parse()
477 .map_err(|e| Error::bad_request(format!("Invalid address: {}", e)))?;
478
479 // Client binds to 0.0.0.0:0 (any available port)
480 let bind_addr: SocketAddr = if server_addr.is_ipv6() {
481 "[::]:0".parse().unwrap()
482 } else {
483 "0.0.0.0:0".parse().unwrap()
484 };
485
486 let transport = TcpTransport::new_client(bind_addr, server_addr);
487 let client = Self::new(transport);
488
489 client.initialize().await?;
490
491 Ok(client)
492 }
493}
494
495// ============================================================================
496// Unix Socket-Specific Convenience Constructors (Feature-Gated)
497// ============================================================================
498
499#[cfg(all(unix, feature = "unix"))]
500impl Client<turbomcp_transport::unix::UnixTransport> {
501 /// Connect to a Unix socket MCP server (convenience method)
502 ///
503 /// Beautiful one-liner for Unix socket IPC - balanced DX.
504 ///
505 /// # Arguments
506 ///
507 /// * `path` - Socket file path (e.g., "/tmp/mcp.sock")
508 ///
509 /// # Returns
510 ///
511 /// Returns an initialized `Client` ready to use.
512 ///
513 /// # Examples
514 ///
515 /// ```rust,no_run
516 /// # #[cfg(all(unix, feature = "unix"))]
517 /// use turbomcp_client::Client;
518 ///
519 /// # async fn example() -> turbomcp_protocol::Result<()> {
520 /// let client = Client::connect_unix("/tmp/mcp.sock").await?;
521 /// let tools = client.list_tools().await?;
522 /// # Ok(())
523 /// # }
524 /// ```
525 pub async fn connect_unix(path: impl Into<std::path::PathBuf>) -> Result<Self> {
526 use turbomcp_transport::unix::UnixTransport;
527
528 let transport = UnixTransport::new_client(path.into());
529 let client = Self::new(transport);
530
531 client.initialize().await?;
532
533 Ok(client)
534 }
535}
536
537impl<T: Transport + 'static> Client<T> {
538 /// Register message handlers with the dispatcher
539 ///
540 /// This method sets up the callbacks that handle server-initiated requests
541 /// and notifications. The dispatcher's background task routes incoming
542 /// messages to these handlers.
543 ///
544 /// This is called automatically during Client construction (in `new()` and
545 /// `with_capabilities()`), so you don't need to call it manually.
546 ///
547 /// ## How It Works
548 ///
549 /// The handlers are synchronous closures that spawn async tasks to do the
550 /// actual work. This allows the dispatcher to continue routing messages
551 /// without blocking on handler execution.
552 fn register_dispatcher_handlers(&self) {
553 let dispatcher = self.inner.protocol.dispatcher();
554 let client_for_requests = self.clone();
555 let client_for_notifications = self.clone();
556
557 // Request handler (elicitation, sampling, etc.)
558 let request_handler = Arc::new(move |request: JsonRpcRequest| {
559 let client = client_for_requests.clone();
560 let method = request.method.clone();
561 let req_id = request.id.clone();
562 // Spawn async task to handle the request
563 tokio::spawn(async move {
564 tracing::debug!(
565 "🔄 [request_handler] Handling server-initiated request: method={}, id={:?}",
566 method,
567 req_id
568 );
569 if let Err(e) = client.handle_request(request).await {
570 tracing::error!(
571 "❌ [request_handler] Error handling server request '{}': {}",
572 method,
573 e
574 );
575 tracing::error!(" Request ID: {:?}", req_id);
576 tracing::error!(" Error kind: {:?}", e.kind);
577 } else {
578 tracing::debug!(
579 "✅ [request_handler] Successfully handled server request: method={}, id={:?}",
580 method,
581 req_id
582 );
583 }
584 });
585 Ok(())
586 });
587
588 // Notification handler
589 let notification_handler = Arc::new(move |notification: JsonRpcNotification| {
590 let client = client_for_notifications.clone();
591 // Spawn async task to handle the notification
592 tokio::spawn(async move {
593 if let Err(e) = client.handle_notification(notification).await {
594 tracing::error!("Error handling server notification: {}", e);
595 }
596 });
597 Ok(())
598 });
599
600 // Register handlers synchronously - no race condition!
601 // The set_* methods are now synchronous with std::sync::Mutex
602 dispatcher.set_request_handler(request_handler);
603 dispatcher.set_notification_handler(notification_handler);
604 tracing::debug!("Dispatcher handlers registered successfully");
605 }
606
607 /// Handle server-initiated requests (elicitation, sampling, roots)
608 ///
609 /// This method is called by the MessageDispatcher when it receives a request
610 /// from the server. It routes the request to the appropriate handler based on
611 /// the method name.
612 async fn handle_request(&self, request: JsonRpcRequest) -> Result<()> {
613 match request.method.as_str() {
614 "sampling/createMessage" => {
615 let handler_opt = self
616 .inner
617 .sampling_handler
618 .lock()
619 .expect("sampling_handler mutex poisoned")
620 .clone();
621 if let Some(handler) = handler_opt {
622 // Extract request ID for proper correlation
623 let request_id = match &request.id {
624 turbomcp_protocol::MessageId::String(s) => s.clone(),
625 turbomcp_protocol::MessageId::Number(n) => n.to_string(),
626 turbomcp_protocol::MessageId::Uuid(u) => u.to_string(),
627 };
628
629 let params: CreateMessageRequest =
630 serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
631 .map_err(|e| {
632 Error::protocol(format!("Invalid createMessage params: {}", e))
633 })?;
634
635 match handler.handle_create_message(request_id, params).await {
636 Ok(result) => {
637 let result_value = serde_json::to_value(result).map_err(|e| {
638 Error::protocol(format!("Failed to serialize response: {}", e))
639 })?;
640 let response = JsonRpcResponse::success(result_value, request.id);
641 self.send_response(response).await?;
642 }
643 Err(e) => {
644 tracing::warn!(
645 "⚠️ [handle_request] Sampling handler returned error: {}",
646 e
647 );
648
649 // Preserve error semantics by checking actual error type
650 // This allows proper error code propagation for retry logic
651 let (code, message) = if let Some(handler_err) =
652 e.downcast_ref::<HandlerError>()
653 {
654 // HandlerError has explicit JSON-RPC code mapping
655 let json_err = handler_err.into_jsonrpc_error();
656 tracing::info!(
657 "📋 [handle_request] HandlerError mapped to JSON-RPC code: {}",
658 json_err.code
659 );
660 (json_err.code, json_err.message)
661 } else if let Some(proto_err) =
662 e.downcast_ref::<turbomcp_protocol::Error>()
663 {
664 // Protocol errors have ErrorKind-based mapping
665 tracing::info!(
666 "📋 [handle_request] Protocol error mapped to code: {}",
667 proto_err.jsonrpc_error_code()
668 );
669 (proto_err.jsonrpc_error_code(), proto_err.to_string())
670 } else {
671 // Generic errors default to Internal (-32603)
672 // Log the error type for debugging (should rarely hit this path)
673 tracing::warn!(
674 "📋 [handle_request] Sampling handler returned unknown error type (not HandlerError or Protocol error): {}",
675 std::any::type_name_of_val(&*e)
676 );
677 (-32603, format!("Sampling handler error: {}", e))
678 };
679
680 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
681 code,
682 message,
683 data: None,
684 };
685 let response =
686 JsonRpcResponse::error_response(error, request.id.clone());
687
688 tracing::info!(
689 "🔄 [handle_request] Attempting to send error response for request: {:?}",
690 request.id
691 );
692 self.send_response(response).await?;
693 tracing::info!(
694 "✅ [handle_request] Error response sent successfully for request: {:?}",
695 request.id
696 );
697 }
698 }
699 } else {
700 // No handler configured
701 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
702 code: -32601,
703 message: "Sampling not supported".to_string(),
704 data: None,
705 };
706 let response = JsonRpcResponse::error_response(error, request.id);
707 self.send_response(response).await?;
708 }
709 }
710 "roots/list" => {
711 // Handle roots/list request from server
712 // Clone the handler Arc to avoid holding mutex across await
713 let handler_opt = self
714 .inner
715 .handlers
716 .lock()
717 .expect("handlers mutex poisoned")
718 .roots
719 .clone();
720
721 let roots_result = if let Some(handler) = handler_opt {
722 handler.handle_roots_request().await
723 } else {
724 // No handler - return empty list per MCP spec
725 Ok(Vec::new())
726 };
727
728 match roots_result {
729 Ok(roots) => {
730 let result_value =
731 serde_json::to_value(turbomcp_protocol::types::ListRootsResult {
732 roots,
733 _meta: None,
734 })
735 .map_err(|e| {
736 Error::protocol(format!(
737 "Failed to serialize roots response: {}",
738 e
739 ))
740 })?;
741 let response = JsonRpcResponse::success(result_value, request.id);
742 self.send_response(response).await?;
743 }
744 Err(e) => {
745 // FIXED: Extract error code from HandlerError instead of hardcoding -32603
746 // Roots handler returns HandlerResult<Vec<Root>>, so error is HandlerError
747 let json_err = e.into_jsonrpc_error();
748 let response = JsonRpcResponse::error_response(json_err, request.id);
749 self.send_response(response).await?;
750 }
751 }
752 }
753 "elicitation/create" => {
754 // Clone handler Arc before await to avoid holding mutex across await
755 let handler_opt = self
756 .inner
757 .handlers
758 .lock()
759 .expect("handlers mutex poisoned")
760 .elicitation
761 .clone();
762 if let Some(handler) = handler_opt {
763 // Parse elicitation request params as MCP protocol type
764 let proto_request: turbomcp_protocol::types::ElicitRequest =
765 serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
766 .map_err(|e| {
767 Error::protocol(format!("Invalid elicitation params: {}", e))
768 })?;
769
770 // Wrap protocol request with ID for handler (preserves type safety!)
771 let handler_request =
772 crate::handlers::ElicitationRequest::new(request.id.clone(), proto_request);
773
774 // Call the registered elicitation handler
775 match handler.handle_elicitation(handler_request).await {
776 Ok(elicit_response) => {
777 // Convert handler response back to protocol type
778 let proto_result = elicit_response.into_protocol();
779 let result_value = serde_json::to_value(proto_result).map_err(|e| {
780 Error::protocol(format!(
781 "Failed to serialize elicitation response: {}",
782 e
783 ))
784 })?;
785 let response = JsonRpcResponse::success(result_value, request.id);
786 self.send_response(response).await?;
787 }
788 Err(e) => {
789 // Convert handler error to JSON-RPC error using centralized mapping
790 let response =
791 JsonRpcResponse::error_response(e.into_jsonrpc_error(), request.id);
792 self.send_response(response).await?;
793 }
794 }
795 } else {
796 // No handler configured - elicitation not supported
797 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
798 code: -32601,
799 message: "Elicitation not supported - no handler registered".to_string(),
800 data: None,
801 };
802 let response = JsonRpcResponse::error_response(error, request.id);
803 self.send_response(response).await?;
804 }
805 }
806 _ => {
807 // Unknown method
808 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
809 code: -32601,
810 message: format!("Method not found: {}", request.method),
811 data: None,
812 };
813 let response = JsonRpcResponse::error_response(error, request.id);
814 self.send_response(response).await?;
815 }
816 }
817 Ok(())
818 }
819
820 /// Handle server-initiated notifications
821 ///
822 /// Routes notifications to appropriate handlers based on method name.
823 /// MCP defines several notification types that servers can send to clients:
824 ///
825 /// - `notifications/progress` - Progress updates for long-running operations
826 /// - `notifications/message` - Log messages from server
827 /// - `notifications/resources/updated` - Resource content changed
828 /// - `notifications/resources/list_changed` - Resource list changed
829 async fn handle_notification(&self, notification: JsonRpcNotification) -> Result<()> {
830 match notification.method.as_str() {
831 "notifications/progress" => {
832 // Route to progress handler
833 let handler_opt = self
834 .inner
835 .handlers
836 .lock()
837 .expect("handlers mutex poisoned")
838 .get_progress_handler();
839
840 if let Some(handler) = handler_opt {
841 // Parse progress notification
842 let progress: crate::handlers::ProgressNotification = serde_json::from_value(
843 notification.params.unwrap_or(serde_json::Value::Null),
844 )
845 .map_err(|e| {
846 Error::protocol(format!("Invalid progress notification: {}", e))
847 })?;
848
849 // Call handler (errors are logged but don't fail the flow)
850 if let Err(e) = handler.handle_progress(progress).await {
851 tracing::error!("Progress handler error: {}", e);
852 }
853 } else {
854 tracing::debug!("Received progress notification but no handler registered");
855 }
856 }
857
858 "notifications/message" => {
859 // Route to log handler
860 let handler_opt = self
861 .inner
862 .handlers
863 .lock()
864 .expect("handlers mutex poisoned")
865 .get_log_handler();
866
867 if let Some(handler) = handler_opt {
868 // Parse log message
869 let log: crate::handlers::LoggingNotification = serde_json::from_value(
870 notification.params.unwrap_or(serde_json::Value::Null),
871 )
872 .map_err(|e| Error::protocol(format!("Invalid log notification: {}", e)))?;
873
874 // Call handler
875 if let Err(e) = handler.handle_log(log).await {
876 tracing::error!("Log handler error: {}", e);
877 }
878 } else {
879 tracing::debug!("Received log notification but no handler registered");
880 }
881 }
882
883 "notifications/resources/updated" => {
884 // Route to resource update handler
885 let handler_opt = self
886 .inner
887 .handlers
888 .lock()
889 .expect("handlers mutex poisoned")
890 .get_resource_update_handler();
891
892 if let Some(handler) = handler_opt {
893 // Parse resource update notification
894 let update: crate::handlers::ResourceUpdatedNotification =
895 serde_json::from_value(
896 notification.params.unwrap_or(serde_json::Value::Null),
897 )
898 .map_err(|e| {
899 Error::protocol(format!("Invalid resource update notification: {}", e))
900 })?;
901
902 // Call handler
903 if let Err(e) = handler.handle_resource_update(update).await {
904 tracing::error!("Resource update handler error: {}", e);
905 }
906 } else {
907 tracing::debug!(
908 "Received resource update notification but no handler registered"
909 );
910 }
911 }
912
913 "notifications/resources/list_changed" => {
914 // Route to resource list changed handler
915 let handler_opt = self
916 .inner
917 .handlers
918 .lock()
919 .expect("handlers mutex poisoned")
920 .get_resource_list_changed_handler();
921
922 if let Some(handler) = handler_opt {
923 if let Err(e) = handler.handle_resource_list_changed().await {
924 tracing::error!("Resource list changed handler error: {}", e);
925 }
926 } else {
927 tracing::debug!(
928 "Resource list changed notification received (no handler registered)"
929 );
930 }
931 }
932
933 "notifications/prompts/list_changed" => {
934 // Route to prompt list changed handler
935 let handler_opt = self
936 .inner
937 .handlers
938 .lock()
939 .expect("handlers mutex poisoned")
940 .get_prompt_list_changed_handler();
941
942 if let Some(handler) = handler_opt {
943 if let Err(e) = handler.handle_prompt_list_changed().await {
944 tracing::error!("Prompt list changed handler error: {}", e);
945 }
946 } else {
947 tracing::debug!(
948 "Prompt list changed notification received (no handler registered)"
949 );
950 }
951 }
952
953 "notifications/tools/list_changed" => {
954 // Route to tool list changed handler
955 let handler_opt = self
956 .inner
957 .handlers
958 .lock()
959 .expect("handlers mutex poisoned")
960 .get_tool_list_changed_handler();
961
962 if let Some(handler) = handler_opt {
963 if let Err(e) = handler.handle_tool_list_changed().await {
964 tracing::error!("Tool list changed handler error: {}", e);
965 }
966 } else {
967 tracing::debug!(
968 "Tool list changed notification received (no handler registered)"
969 );
970 }
971 }
972
973 "notifications/cancelled" => {
974 // Route to cancellation handler
975 let handler_opt = self
976 .inner
977 .handlers
978 .lock()
979 .expect("handlers mutex poisoned")
980 .get_cancellation_handler();
981
982 if let Some(handler) = handler_opt {
983 // Parse cancellation notification
984 let cancellation: crate::handlers::CancelledNotification =
985 serde_json::from_value(
986 notification.params.unwrap_or(serde_json::Value::Null),
987 )
988 .map_err(|e| {
989 Error::protocol(format!("Invalid cancellation notification: {}", e))
990 })?;
991
992 // Call handler
993 if let Err(e) = handler.handle_cancellation(cancellation).await {
994 tracing::error!("Cancellation handler error: {}", e);
995 }
996 } else {
997 tracing::debug!("Cancellation notification received (no handler registered)");
998 }
999 }
1000
1001 _ => {
1002 // Unknown notification type
1003 tracing::debug!("Received unknown notification: {}", notification.method);
1004 }
1005 }
1006
1007 Ok(())
1008 }
1009
1010 async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
1011 tracing::info!(
1012 "📤 [send_response] Sending JSON-RPC response: id={:?}",
1013 response.id
1014 );
1015
1016 let payload = serde_json::to_vec(&response).map_err(|e| {
1017 tracing::error!("❌ [send_response] Failed to serialize response: {}", e);
1018 Error::protocol(format!("Failed to serialize response: {}", e))
1019 })?;
1020
1021 tracing::debug!(
1022 "📤 [send_response] Response payload: {} bytes",
1023 payload.len()
1024 );
1025 tracing::debug!(
1026 "📤 [send_response] Response JSON: {}",
1027 String::from_utf8_lossy(&payload)
1028 );
1029
1030 let message = TransportMessage::new(
1031 turbomcp_protocol::MessageId::from("response".to_string()),
1032 payload.into(),
1033 );
1034
1035 self.inner
1036 .protocol
1037 .transport()
1038 .send(message)
1039 .await
1040 .map_err(|e| {
1041 tracing::error!("❌ [send_response] Transport send failed: {}", e);
1042 Error::transport(format!("Failed to send response: {}", e))
1043 })?;
1044
1045 tracing::info!(
1046 "✅ [send_response] Response sent successfully: id={:?}",
1047 response.id
1048 );
1049 Ok(())
1050 }
1051
1052 /// Initialize the connection with the MCP server
1053 ///
1054 /// Performs the initialization handshake with the server, negotiating capabilities
1055 /// and establishing the protocol version. This method must be called before
1056 /// any other operations can be performed.
1057 ///
1058 /// # Returns
1059 ///
1060 /// Returns an `InitializeResult` containing server information and negotiated capabilities.
1061 ///
1062 /// # Errors
1063 ///
1064 /// Returns an error if:
1065 /// - The transport connection fails
1066 /// - The server rejects the initialization request
1067 /// - Protocol negotiation fails
1068 ///
1069 /// # Examples
1070 ///
1071 /// ```rust,no_run
1072 /// # use turbomcp_client::Client;
1073 /// # use turbomcp_transport::stdio::StdioTransport;
1074 /// # async fn example() -> turbomcp_protocol::Result<()> {
1075 /// let mut client = Client::new(StdioTransport::new());
1076 ///
1077 /// let result = client.initialize().await?;
1078 /// println!("Server: {} v{}", result.server_info.name, result.server_info.version);
1079 /// # Ok(())
1080 /// # }
1081 /// ```
1082 pub async fn initialize(&self) -> Result<InitializeResult> {
1083 // Auto-connect transport if not already connected
1084 // This provides consistent DX across all transports (Stdio, TCP, HTTP, WebSocket, Unix)
1085 let transport = self.inner.protocol.transport();
1086 let transport_state = transport.state().await;
1087 if !matches!(
1088 transport_state,
1089 turbomcp_transport::TransportState::Connected
1090 ) {
1091 tracing::debug!(
1092 "Auto-connecting transport (current state: {:?})",
1093 transport_state
1094 );
1095 transport
1096 .connect()
1097 .await
1098 .map_err(|e| Error::transport(format!("Failed to connect transport: {}", e)))?;
1099 tracing::info!("Transport connected successfully");
1100 }
1101
1102 // Build client capabilities based on registered handlers (automatic detection)
1103 let mut client_caps = ProtocolClientCapabilities::default();
1104
1105 // Detect sampling capability from handler
1106 if let Some(sampling_caps) = self.get_sampling_capabilities() {
1107 client_caps.sampling = Some(sampling_caps);
1108 }
1109
1110 // Detect elicitation capability from handler
1111 if let Some(elicitation_caps) = self.get_elicitation_capabilities() {
1112 client_caps.elicitation = Some(elicitation_caps);
1113 }
1114
1115 // Detect roots capability from handler
1116 if let Some(roots_caps) = self.get_roots_capabilities() {
1117 client_caps.roots = Some(roots_caps);
1118 }
1119
1120 // Send MCP initialization request
1121 let request = InitializeRequest {
1122 protocol_version: PROTOCOL_VERSION.to_string(),
1123 capabilities: client_caps,
1124 client_info: turbomcp_protocol::types::Implementation {
1125 name: "turbomcp-client".to_string(),
1126 version: env!("CARGO_PKG_VERSION").to_string(),
1127 title: Some("TurboMCP Client".to_string()),
1128 },
1129 _meta: None,
1130 };
1131
1132 let protocol_response: ProtocolInitializeResult = self
1133 .inner
1134 .protocol
1135 .request("initialize", Some(serde_json::to_value(request)?))
1136 .await?;
1137
1138 // AtomicBool: lock-free store with Ordering::Relaxed
1139 self.inner.initialized.store(true, Ordering::Relaxed);
1140
1141 // Send initialized notification
1142 self.inner
1143 .protocol
1144 .notify("notifications/initialized", None)
1145 .await?;
1146
1147 // Convert protocol response to client response type
1148 Ok(InitializeResult {
1149 server_info: protocol_response.server_info,
1150 server_capabilities: protocol_response.capabilities,
1151 })
1152 }
1153
1154 /// Execute a protocol method with plugin middleware
1155 ///
1156 /// This is a generic helper for wrapping protocol calls with plugin middleware.
1157 pub(crate) async fn execute_with_plugins<R>(
1158 &self,
1159 method_name: &str,
1160 params: Option<serde_json::Value>,
1161 ) -> Result<R>
1162 where
1163 R: serde::de::DeserializeOwned + serde::Serialize + Clone,
1164 {
1165 // Create JSON-RPC request for plugin context
1166 let json_rpc_request = turbomcp_protocol::jsonrpc::JsonRpcRequest {
1167 jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
1168 id: turbomcp_protocol::MessageId::Number(1),
1169 method: method_name.to_string(),
1170 params: params.clone(),
1171 };
1172
1173 // 1. Create request context for plugins
1174 let mut req_ctx =
1175 crate::plugins::RequestContext::new(json_rpc_request, std::collections::HashMap::new());
1176
1177 // 2. Execute before_request plugin middleware
1178 if let Err(e) = self
1179 .inner
1180 .plugin_registry
1181 .lock()
1182 .await
1183 .execute_before_request(&mut req_ctx)
1184 .await
1185 {
1186 return Err(Error::bad_request(format!(
1187 "Plugin before_request failed: {}",
1188 e
1189 )));
1190 }
1191
1192 // 3. Execute the actual protocol call
1193 let start_time = std::time::Instant::now();
1194 let protocol_result: Result<R> = self
1195 .inner
1196 .protocol
1197 .request(method_name, req_ctx.params().cloned())
1198 .await;
1199 let duration = start_time.elapsed();
1200
1201 // 4. Prepare response context
1202 let mut resp_ctx = match protocol_result {
1203 Ok(ref response) => {
1204 let response_value = serde_json::to_value(response.clone())?;
1205 crate::plugins::ResponseContext::new(req_ctx, Some(response_value), None, duration)
1206 }
1207 Err(ref e) => {
1208 crate::plugins::ResponseContext::new(req_ctx, None, Some(*e.clone()), duration)
1209 }
1210 };
1211
1212 // 5. Execute after_response plugin middleware
1213 if let Err(e) = self
1214 .inner
1215 .plugin_registry
1216 .lock()
1217 .await
1218 .execute_after_response(&mut resp_ctx)
1219 .await
1220 {
1221 return Err(Error::bad_request(format!(
1222 "Plugin after_response failed: {}",
1223 e
1224 )));
1225 }
1226
1227 // 6. Return the final result, checking for plugin modifications
1228 match protocol_result {
1229 Ok(ref response) => {
1230 // Check if plugins modified the response
1231 if let Some(modified_response) = resp_ctx.response {
1232 // Try to deserialize the modified response
1233 if let Ok(modified_result) =
1234 serde_json::from_value::<R>(modified_response.clone())
1235 {
1236 return Ok(modified_result);
1237 }
1238 }
1239
1240 // No plugin modifications, use original response
1241 Ok(response.clone())
1242 }
1243 Err(e) => {
1244 // Check if plugins provided an error recovery response
1245 if let Some(recovery_response) = resp_ctx.response {
1246 if let Ok(recovery_result) = serde_json::from_value::<R>(recovery_response) {
1247 Ok(recovery_result)
1248 } else {
1249 Err(e)
1250 }
1251 } else {
1252 Err(e)
1253 }
1254 }
1255 }
1256 }
1257
1258 /// Subscribe to resource change notifications
1259 ///
1260 /// Registers interest in receiving notifications when the specified
1261 /// resource changes. The server will send notifications when the
1262 /// resource is modified, created, or deleted.
1263 ///
1264 /// # Arguments
1265 ///
1266 /// * `uri` - The URI of the resource to monitor
1267 ///
1268 /// # Returns
1269 ///
1270 /// Returns `EmptyResult` on successful subscription.
1271 ///
1272 /// # Errors
1273 ///
1274 /// Returns an error if:
1275 /// - The client is not initialized
1276 /// - The URI is invalid or empty
1277 /// - The server doesn't support subscriptions
1278 /// - The request fails
1279 ///
1280 /// # Examples
1281 ///
1282 /// ```rust,no_run
1283 /// # use turbomcp_client::Client;
1284 /// # use turbomcp_transport::stdio::StdioTransport;
1285 /// # async fn example() -> turbomcp_protocol::Result<()> {
1286 /// let mut client = Client::new(StdioTransport::new());
1287 /// client.initialize().await?;
1288 ///
1289 /// // Subscribe to file changes
1290 /// client.subscribe("file:///watch/directory").await?;
1291 /// println!("Subscribed to resource changes");
1292 /// # Ok(())
1293 /// # }
1294 /// ```
1295 pub async fn subscribe(&self, uri: &str) -> Result<EmptyResult> {
1296 if !self.inner.initialized.load(Ordering::Relaxed) {
1297 return Err(Error::bad_request("Client not initialized"));
1298 }
1299
1300 if uri.is_empty() {
1301 return Err(Error::bad_request("Subscription URI cannot be empty"));
1302 }
1303
1304 // Send resources/subscribe request with plugin middleware
1305 let request = SubscribeRequest {
1306 uri: uri.to_string(),
1307 };
1308
1309 self.execute_with_plugins(
1310 "resources/subscribe",
1311 Some(serde_json::to_value(request).map_err(|e| {
1312 Error::protocol(format!("Failed to serialize subscribe request: {}", e))
1313 })?),
1314 )
1315 .await
1316 }
1317
1318 /// Unsubscribe from resource change notifications
1319 ///
1320 /// Cancels a previous subscription to resource changes. After unsubscribing,
1321 /// the client will no longer receive notifications for the specified resource.
1322 ///
1323 /// # Arguments
1324 ///
1325 /// * `uri` - The URI of the resource to stop monitoring
1326 ///
1327 /// # Returns
1328 ///
1329 /// Returns `EmptyResult` on successful unsubscription.
1330 ///
1331 /// # Errors
1332 ///
1333 /// Returns an error if:
1334 /// - The client is not initialized
1335 /// - The URI is invalid or empty
1336 /// - No active subscription exists for the URI
1337 /// - The request fails
1338 ///
1339 /// # Examples
1340 ///
1341 /// ```rust,no_run
1342 /// # use turbomcp_client::Client;
1343 /// # use turbomcp_transport::stdio::StdioTransport;
1344 /// # async fn example() -> turbomcp_protocol::Result<()> {
1345 /// let mut client = Client::new(StdioTransport::new());
1346 /// client.initialize().await?;
1347 ///
1348 /// // Unsubscribe from file changes
1349 /// client.unsubscribe("file:///watch/directory").await?;
1350 /// println!("Unsubscribed from resource changes");
1351 /// # Ok(())
1352 /// # }
1353 /// ```
1354 pub async fn unsubscribe(&self, uri: &str) -> Result<EmptyResult> {
1355 if !self.inner.initialized.load(Ordering::Relaxed) {
1356 return Err(Error::bad_request("Client not initialized"));
1357 }
1358
1359 if uri.is_empty() {
1360 return Err(Error::bad_request("Unsubscription URI cannot be empty"));
1361 }
1362
1363 // Send resources/unsubscribe request with plugin middleware
1364 let request = UnsubscribeRequest {
1365 uri: uri.to_string(),
1366 };
1367
1368 self.execute_with_plugins(
1369 "resources/unsubscribe",
1370 Some(serde_json::to_value(request).map_err(|e| {
1371 Error::protocol(format!("Failed to serialize unsubscribe request: {}", e))
1372 })?),
1373 )
1374 .await
1375 }
1376
1377 /// Get the client's capabilities configuration
1378 pub fn capabilities(&self) -> &ClientCapabilities {
1379 &self.inner.capabilities
1380 }
1381
1382 /// Initialize all registered plugins
1383 ///
1384 /// This should be called after registration but before using the client.
1385 pub async fn initialize_plugins(&self) -> Result<()> {
1386 // Set up client context for plugins with actual client capabilities
1387 let mut capabilities = std::collections::HashMap::new();
1388 capabilities.insert(
1389 "protocol_version".to_string(),
1390 serde_json::json!("2024-11-05"),
1391 );
1392 capabilities.insert(
1393 "mcp_version".to_string(),
1394 serde_json::json!(env!("CARGO_PKG_VERSION")),
1395 );
1396 capabilities.insert(
1397 "supports_notifications".to_string(),
1398 serde_json::json!(true),
1399 );
1400 capabilities.insert(
1401 "supports_sampling".to_string(),
1402 serde_json::json!(self.has_sampling_handler()),
1403 );
1404 capabilities.insert("supports_progress".to_string(), serde_json::json!(true));
1405 capabilities.insert("supports_roots".to_string(), serde_json::json!(true));
1406
1407 // Extract client configuration
1408 let mut config = std::collections::HashMap::new();
1409 config.insert(
1410 "client_name".to_string(),
1411 serde_json::json!("turbomcp-client"),
1412 );
1413 config.insert(
1414 "initialized".to_string(),
1415 serde_json::json!(self.inner.initialized.load(Ordering::Relaxed)),
1416 );
1417 config.insert(
1418 "plugin_count".to_string(),
1419 serde_json::json!(self.inner.plugin_registry.lock().await.plugin_count()),
1420 );
1421
1422 let context = crate::plugins::PluginContext::new(
1423 "turbomcp-client".to_string(),
1424 env!("CARGO_PKG_VERSION").to_string(),
1425 capabilities,
1426 config,
1427 vec![], // Will be populated by the registry
1428 );
1429
1430 self.inner
1431 .plugin_registry
1432 .lock()
1433 .await
1434 .set_client_context(context);
1435
1436 // Note: Individual plugins are initialized automatically during registration
1437 // via PluginRegistry::register_plugin(). This method ensures the registry
1438 // has proper client context for any future plugin registrations.
1439 Ok(())
1440 }
1441
1442 /// Cleanup all registered plugins
1443 ///
1444 /// This should be called when the client is being shut down.
1445 pub async fn cleanup_plugins(&self) -> Result<()> {
1446 // Clear the plugin registry - plugins will be dropped and cleaned up automatically
1447 // The Rust ownership system ensures proper cleanup when the Arc<dyn ClientPlugin>
1448 // references are dropped.
1449
1450 // Note: The plugin system uses RAII (Resource Acquisition Is Initialization)
1451 // pattern where plugins clean up their resources in their Drop implementation.
1452 // Replace the registry with a fresh one (mutex ensures safe access)
1453 *self.inner.plugin_registry.lock().await = crate::plugins::PluginRegistry::new();
1454 Ok(())
1455 }
1456
1457 // Note: Capability detection methods (has_*_handler, get_*_capabilities)
1458 // are defined in their respective operation modules:
1459 // - sampling.rs: has_sampling_handler, get_sampling_capabilities
1460 // - handlers.rs: has_elicitation_handler, has_roots_handler
1461 //
1462 // Additional capability getters for elicitation and roots added below
1463 // since they're used during initialization
1464
1465 /// Get elicitation capabilities if handler is registered
1466 /// Automatically detects capability based on registered handler
1467 fn get_elicitation_capabilities(
1468 &self,
1469 ) -> Option<turbomcp_protocol::types::ElicitationCapabilities> {
1470 if self.has_elicitation_handler() {
1471 // Currently returns default capabilities. In the future, schema_validation support
1472 // could be detected from handler traits by adding a HasSchemaValidation marker trait
1473 // that handlers could implement. For now, handlers validate schemas themselves.
1474 Some(turbomcp_protocol::types::ElicitationCapabilities::default())
1475 } else {
1476 None
1477 }
1478 }
1479
1480 /// Get roots capabilities if handler is registered
1481 fn get_roots_capabilities(&self) -> Option<turbomcp_protocol::types::RootsCapabilities> {
1482 if self.has_roots_handler() {
1483 // Roots capabilities indicate whether list can change
1484 Some(turbomcp_protocol::types::RootsCapabilities {
1485 list_changed: Some(true), // Support dynamic roots by default
1486 })
1487 } else {
1488 None
1489 }
1490 }
1491}