turbomcp_client/client/core.rs
1//! Core Client implementation for MCP communication
2//!
3//! This module contains the main `Client<T>` struct and its implementation,
4//! providing the core MCP client functionality including:
5//!
6//! - Connection initialization and lifecycle management
7//! - Message processing and bidirectional communication
8//! - MCP operation support (tools, prompts, resources, sampling, etc.)
9//! - Plugin middleware integration
10//! - Handler registration and management
11//!
12//! # Architecture
13//!
14//! `Client<T>` is implemented as a cheaply-cloneable Arc wrapper with interior
15//! mutability (same pattern as reqwest and AWS SDK):
16//!
17//! - **AtomicBool** for initialized flag (lock-free)
18//! - **Arc<Mutex<...>>** for handlers/plugins (infrequent mutation)
19//! - **`Arc<ClientInner<T>>`** for cheap cloning
20
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::{Arc, Mutex as StdMutex};
23
24use turbomcp_protocol::jsonrpc::*;
25use turbomcp_protocol::types::{
26 ClientCapabilities as ProtocolClientCapabilities, InitializeResult as ProtocolInitializeResult,
27 *,
28};
29use turbomcp_protocol::{Error, PROTOCOL_VERSION, Result};
30use turbomcp_transport::{Transport, TransportMessage};
31
32use super::config::InitializeResult;
33use super::protocol::ProtocolClient;
34use crate::{ClientCapabilities, handlers::HandlerRegistry, sampling::SamplingHandler};
35
36/// Inner client state with interior mutability
37///
38/// This structure contains the actual client state and is wrapped in Arc<...>
39/// to enable cheap cloning (same pattern as reqwest and AWS SDK).
40pub(super) struct ClientInner<T: Transport + 'static> {
41 /// Protocol client for low-level communication
42 pub(super) protocol: ProtocolClient<T>,
43
44 /// Client capabilities (immutable after construction)
45 pub(super) capabilities: ClientCapabilities,
46
47 /// Initialization state (lock-free atomic boolean)
48 pub(super) initialized: AtomicBool,
49
50 /// Optional sampling handler (mutex for dynamic updates)
51 pub(super) sampling_handler: Arc<StdMutex<Option<Arc<dyn SamplingHandler>>>>,
52
53 /// Handler registry for bidirectional communication (mutex for registration)
54 pub(super) handlers: Arc<StdMutex<HandlerRegistry>>,
55
56 /// Plugin registry for middleware (tokio mutex - holds across await)
57 pub(super) plugin_registry: Arc<tokio::sync::Mutex<crate::plugins::PluginRegistry>>,
58}
59
60/// The core MCP client implementation
61///
62/// Client provides a comprehensive interface for communicating with MCP servers,
63/// supporting all protocol features including tools, prompts, resources, sampling,
64/// elicitation, and bidirectional communication patterns.
65///
66/// # Clone Pattern
67///
68/// `Client<T>` is cheaply cloneable via Arc (same pattern as reqwest and AWS SDK).
69/// All clones share the same underlying connection and state:
70///
71/// ```rust,no_run
72/// use turbomcp_client::Client;
73/// use turbomcp_transport::stdio::StdioTransport;
74///
75/// # async fn example() -> turbomcp_protocol::Result<()> {
76/// let client = Client::new(StdioTransport::new());
77/// client.initialize().await?;
78///
79/// // Cheap clone - shares same connection
80/// let client2 = client.clone();
81/// tokio::spawn(async move {
82/// client2.list_tools().await.ok();
83/// });
84/// # Ok(())
85/// # }
86/// ```
87///
88/// The client must be initialized before use by calling `initialize()` to perform
89/// the MCP handshake and capability negotiation.
90///
91/// # Features
92///
93/// - **Protocol Compliance**: Full MCP 2025-06-18 specification support
94/// - **Bidirectional Communication**: Server-initiated requests and client responses
95/// - **Plugin Middleware**: Extensible request/response processing
96/// - **Handler Registry**: Callbacks for server-initiated operations
97/// - **Connection Management**: Robust error handling and recovery
98/// - **Type Safety**: Compile-time guarantees for MCP message types
99/// - **Cheap Cloning**: Arc-based sharing like reqwest/AWS SDK
100///
101/// # Examples
102///
103/// ```rust,no_run
104/// use turbomcp_client::Client;
105/// use turbomcp_transport::stdio::StdioTransport;
106/// use std::collections::HashMap;
107///
108/// # async fn example() -> turbomcp_protocol::Result<()> {
109/// // Create and initialize client (no mut needed!)
110/// let client = Client::new(StdioTransport::new());
111/// let init_result = client.initialize().await?;
112/// println!("Connected to: {}", init_result.server_info.name);
113///
114/// // Use MCP operations
115/// let tools = client.list_tools().await?;
116/// let mut args = HashMap::new();
117/// args.insert("input".to_string(), serde_json::json!("test"));
118/// let result = client.call_tool("my_tool", Some(args)).await?;
119/// # Ok(())
120/// # }
121/// ```
122pub struct Client<T: Transport + 'static> {
123 pub(super) inner: Arc<ClientInner<T>>,
124}
125
126/// Clone implementation via Arc (same pattern as reqwest/AWS SDK)
127///
128/// Cloning a Client is cheap (just an Arc clone) and all clones share
129/// the same underlying connection and state.
130impl<T: Transport + 'static> Clone for Client<T> {
131 fn clone(&self) -> Self {
132 Self {
133 inner: Arc::clone(&self.inner),
134 }
135 }
136}
137
138impl<T: Transport + 'static> Drop for ClientInner<T> {
139 fn drop(&mut self) {
140 // Shutdown the dispatcher's background task when the LAST Client reference is dropped
141 // This prevents the background task from running forever after all clients are dropped
142 tracing::debug!("Last Client reference dropped - shutting down message dispatcher");
143 self.protocol.dispatcher().shutdown();
144 }
145}
146
147impl<T: Transport + 'static> Client<T> {
148 /// Create a new client with the specified transport
149 ///
150 /// Creates a new MCP client instance with default capabilities.
151 /// The client must be initialized before use by calling `initialize()`.
152 ///
153 /// # Arguments
154 ///
155 /// * `transport` - The transport implementation to use for communication
156 ///
157 /// # Examples
158 ///
159 /// ```rust,no_run
160 /// use turbomcp_client::Client;
161 /// use turbomcp_transport::stdio::StdioTransport;
162 ///
163 /// let transport = StdioTransport::new();
164 /// let client = Client::new(transport);
165 /// ```
166 pub fn new(transport: T) -> Self {
167 let client = Self {
168 inner: Arc::new(ClientInner {
169 protocol: ProtocolClient::new(transport),
170 capabilities: ClientCapabilities::default(),
171 initialized: AtomicBool::new(false),
172 sampling_handler: Arc::new(StdMutex::new(None)),
173 handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
174 plugin_registry: Arc::new(tokio::sync::Mutex::new(
175 crate::plugins::PluginRegistry::new(),
176 )),
177 }),
178 };
179
180 // Register dispatcher handlers for bidirectional communication
181 client.register_dispatcher_handlers();
182
183 client
184 }
185
186 /// Create a new client with custom capabilities
187 ///
188 /// # Arguments
189 ///
190 /// * `transport` - The transport implementation to use
191 /// * `capabilities` - The client capabilities to negotiate
192 ///
193 /// # Examples
194 ///
195 /// ```rust,no_run
196 /// use turbomcp_client::{Client, ClientCapabilities};
197 /// use turbomcp_transport::stdio::StdioTransport;
198 ///
199 /// let capabilities = ClientCapabilities {
200 /// tools: true,
201 /// prompts: true,
202 /// resources: false,
203 /// sampling: false,
204 /// };
205 ///
206 /// let transport = StdioTransport::new();
207 /// let client = Client::with_capabilities(transport, capabilities);
208 /// ```
209 pub fn with_capabilities(transport: T, capabilities: ClientCapabilities) -> Self {
210 let client = Self {
211 inner: Arc::new(ClientInner {
212 protocol: ProtocolClient::new(transport),
213 capabilities,
214 initialized: AtomicBool::new(false),
215 sampling_handler: Arc::new(StdMutex::new(None)),
216 handlers: Arc::new(StdMutex::new(HandlerRegistry::new())),
217 plugin_registry: Arc::new(tokio::sync::Mutex::new(
218 crate::plugins::PluginRegistry::new(),
219 )),
220 }),
221 };
222
223 // Register dispatcher handlers for bidirectional communication
224 client.register_dispatcher_handlers();
225
226 client
227 }
228}
229
230// ============================================================================
231// HTTP-Specific Convenience Constructors (Feature-Gated)
232// ============================================================================
233
234#[cfg(feature = "http")]
235impl Client<turbomcp_transport::streamable_http_client::StreamableHttpClientTransport> {
236 /// Connect to an HTTP MCP server (convenience method)
237 ///
238 /// This is a beautiful one-liner alternative to manual configuration.
239 /// Creates an HTTP client, connects, and initializes in one call.
240 ///
241 /// # Arguments
242 ///
243 /// * `url` - The base URL of the MCP server (e.g., "http://localhost:8080")
244 ///
245 /// # Returns
246 ///
247 /// Returns an initialized `Client` ready to use.
248 ///
249 /// # Errors
250 ///
251 /// Returns an error if:
252 /// - The URL is invalid
253 /// - Connection to the server fails
254 /// - Initialization handshake fails
255 ///
256 /// # Examples
257 ///
258 /// ```rust,no_run
259 /// use turbomcp_client::Client;
260 ///
261 /// # async fn example() -> turbomcp_protocol::Result<()> {
262 /// // Beautiful one-liner - balanced with server DX
263 /// let client = Client::connect_http("http://localhost:8080").await?;
264 ///
265 /// // Now use it directly
266 /// let tools = client.list_tools().await?;
267 /// # Ok(())
268 /// # }
269 /// ```
270 ///
271 /// Compare to verbose approach (10+ lines):
272 /// ```rust,no_run
273 /// use turbomcp_client::Client;
274 /// use turbomcp_transport::streamable_http_client::{
275 /// StreamableHttpClientConfig, StreamableHttpClientTransport
276 /// };
277 ///
278 /// # async fn example() -> turbomcp_protocol::Result<()> {
279 /// let config = StreamableHttpClientConfig {
280 /// base_url: "http://localhost:8080".to_string(),
281 /// ..Default::default()
282 /// };
283 /// let transport = StreamableHttpClientTransport::new(config);
284 /// let client = Client::new(transport);
285 /// client.initialize().await?;
286 /// # Ok(())
287 /// # }
288 /// ```
289 pub async fn connect_http(url: impl Into<String>) -> Result<Self> {
290 use turbomcp_transport::streamable_http_client::{
291 StreamableHttpClientConfig, StreamableHttpClientTransport,
292 };
293
294 let config = StreamableHttpClientConfig {
295 base_url: url.into(),
296 ..Default::default()
297 };
298
299 let transport = StreamableHttpClientTransport::new(config);
300 let client = Self::new(transport);
301
302 // Initialize connection immediately
303 client.initialize().await?;
304
305 Ok(client)
306 }
307
308 /// Connect to an HTTP MCP server with custom configuration
309 ///
310 /// Provides more control than `connect_http()` while still being ergonomic.
311 ///
312 /// # Arguments
313 ///
314 /// * `url` - The base URL of the MCP server
315 /// * `config_fn` - Function to customize the configuration
316 ///
317 /// # Examples
318 ///
319 /// ```rust,no_run
320 /// use turbomcp_client::Client;
321 /// use std::time::Duration;
322 ///
323 /// # async fn example() -> turbomcp_protocol::Result<()> {
324 /// let client = Client::connect_http_with("http://localhost:8080", |config| {
325 /// config.timeout = Duration::from_secs(60);
326 /// config.endpoint_path = "/api/mcp".to_string();
327 /// }).await?;
328 /// # Ok(())
329 /// # }
330 /// ```
331 pub async fn connect_http_with<F>(url: impl Into<String>, config_fn: F) -> Result<Self>
332 where
333 F: FnOnce(&mut turbomcp_transport::streamable_http_client::StreamableHttpClientConfig),
334 {
335 use turbomcp_transport::streamable_http_client::{
336 StreamableHttpClientConfig, StreamableHttpClientTransport,
337 };
338
339 let mut config = StreamableHttpClientConfig {
340 base_url: url.into(),
341 ..Default::default()
342 };
343
344 config_fn(&mut config);
345
346 let transport = StreamableHttpClientTransport::new(config);
347 let client = Self::new(transport);
348
349 client.initialize().await?;
350
351 Ok(client)
352 }
353}
354
355// ============================================================================
356// TCP-Specific Convenience Constructors (Feature-Gated)
357// ============================================================================
358
359#[cfg(feature = "tcp")]
360impl Client<turbomcp_transport::tcp::TcpTransport> {
361 /// Connect to a TCP MCP server (convenience method)
362 ///
363 /// Beautiful one-liner for TCP connections - balanced DX.
364 ///
365 /// # Arguments
366 ///
367 /// * `addr` - Server address (e.g., "127.0.0.1:8765" or localhost:8765")
368 ///
369 /// # Returns
370 ///
371 /// Returns an initialized `Client` ready to use.
372 ///
373 /// # Examples
374 ///
375 /// ```rust,no_run
376 /// # #[cfg(feature = "tcp")]
377 /// use turbomcp_client::Client;
378 ///
379 /// # async fn example() -> turbomcp_protocol::Result<()> {
380 /// let client = Client::connect_tcp("127.0.0.1:8765").await?;
381 /// let tools = client.list_tools().await?;
382 /// # Ok(())
383 /// # }
384 /// ```
385 pub async fn connect_tcp(addr: impl AsRef<str>) -> Result<Self> {
386 use std::net::SocketAddr;
387 use turbomcp_transport::tcp::TcpTransport;
388
389 let server_addr: SocketAddr = addr
390 .as_ref()
391 .parse()
392 .map_err(|e| Error::bad_request(format!("Invalid address: {}", e)))?;
393
394 // Client binds to 0.0.0.0:0 (any available port)
395 let bind_addr: SocketAddr = if server_addr.is_ipv6() {
396 "[::]:0".parse().unwrap()
397 } else {
398 "0.0.0.0:0".parse().unwrap()
399 };
400
401 let transport = TcpTransport::new_client(bind_addr, server_addr);
402 let client = Self::new(transport);
403
404 client.initialize().await?;
405
406 Ok(client)
407 }
408}
409
410// ============================================================================
411// Unix Socket-Specific Convenience Constructors (Feature-Gated)
412// ============================================================================
413
414#[cfg(all(unix, feature = "unix"))]
415impl Client<turbomcp_transport::unix::UnixTransport> {
416 /// Connect to a Unix socket MCP server (convenience method)
417 ///
418 /// Beautiful one-liner for Unix socket IPC - balanced DX.
419 ///
420 /// # Arguments
421 ///
422 /// * `path` - Socket file path (e.g., "/tmp/mcp.sock")
423 ///
424 /// # Returns
425 ///
426 /// Returns an initialized `Client` ready to use.
427 ///
428 /// # Examples
429 ///
430 /// ```rust,no_run
431 /// # #[cfg(all(unix, feature = "unix"))]
432 /// use turbomcp_client::Client;
433 ///
434 /// # async fn example() -> turbomcp_protocol::Result<()> {
435 /// let client = Client::connect_unix("/tmp/mcp.sock").await?;
436 /// let tools = client.list_tools().await?;
437 /// # Ok(())
438 /// # }
439 /// ```
440 pub async fn connect_unix(path: impl Into<std::path::PathBuf>) -> Result<Self> {
441 use turbomcp_transport::unix::UnixTransport;
442
443 let transport = UnixTransport::new_client(path.into());
444 let client = Self::new(transport);
445
446 client.initialize().await?;
447
448 Ok(client)
449 }
450}
451
452impl<T: Transport + 'static> Client<T> {
453 /// Register message handlers with the dispatcher
454 ///
455 /// This method sets up the callbacks that handle server-initiated requests
456 /// and notifications. The dispatcher's background task routes incoming
457 /// messages to these handlers.
458 ///
459 /// This is called automatically during Client construction (in `new()` and
460 /// `with_capabilities()`), so you don't need to call it manually.
461 ///
462 /// ## How It Works
463 ///
464 /// The handlers are synchronous closures that spawn async tasks to do the
465 /// actual work. This allows the dispatcher to continue routing messages
466 /// without blocking on handler execution.
467 fn register_dispatcher_handlers(&self) {
468 let dispatcher = self.inner.protocol.dispatcher();
469 let client_for_requests = self.clone();
470 let client_for_notifications = self.clone();
471
472 // Request handler (elicitation, sampling, etc.)
473 let request_handler = Arc::new(move |request: JsonRpcRequest| {
474 let client = client_for_requests.clone();
475 // Spawn async task to handle the request
476 tokio::spawn(async move {
477 if let Err(e) = client.handle_request(request).await {
478 tracing::error!("Error handling server request: {}", e);
479 }
480 });
481 Ok(())
482 });
483
484 // Notification handler
485 let notification_handler = Arc::new(move |notification: JsonRpcNotification| {
486 let client = client_for_notifications.clone();
487 // Spawn async task to handle the notification
488 tokio::spawn(async move {
489 if let Err(e) = client.handle_notification(notification).await {
490 tracing::error!("Error handling server notification: {}", e);
491 }
492 });
493 Ok(())
494 });
495
496 // Register handlers synchronously - no race condition!
497 // The set_* methods are now synchronous with std::sync::Mutex
498 dispatcher.set_request_handler(request_handler);
499 dispatcher.set_notification_handler(notification_handler);
500 tracing::debug!("Dispatcher handlers registered successfully");
501 }
502
503 /// Handle server-initiated requests (elicitation, sampling, roots)
504 ///
505 /// This method is called by the MessageDispatcher when it receives a request
506 /// from the server. It routes the request to the appropriate handler based on
507 /// the method name.
508 async fn handle_request(&self, request: JsonRpcRequest) -> Result<()> {
509 match request.method.as_str() {
510 "sampling/createMessage" => {
511 let handler_opt = self
512 .inner
513 .sampling_handler
514 .lock()
515 .expect("sampling_handler mutex poisoned")
516 .clone();
517 if let Some(handler) = handler_opt {
518 let params: CreateMessageRequest =
519 serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
520 .map_err(|e| {
521 Error::protocol(format!("Invalid createMessage params: {}", e))
522 })?;
523
524 match handler.handle_create_message(params).await {
525 Ok(result) => {
526 let result_value = serde_json::to_value(result).map_err(|e| {
527 Error::protocol(format!("Failed to serialize response: {}", e))
528 })?;
529 let response = JsonRpcResponse::success(result_value, request.id);
530 self.send_response(response).await?;
531 }
532 Err(e) => {
533 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
534 code: -32603,
535 message: format!("Sampling handler error: {}", e),
536 data: None,
537 };
538 let response = JsonRpcResponse::error_response(error, request.id);
539 self.send_response(response).await?;
540 }
541 }
542 } else {
543 // No handler configured
544 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
545 code: -32601,
546 message: "Sampling not supported".to_string(),
547 data: None,
548 };
549 let response = JsonRpcResponse::error_response(error, request.id);
550 self.send_response(response).await?;
551 }
552 }
553 "roots/list" => {
554 // Handle roots/list request from server
555 // Clone the handler Arc to avoid holding mutex across await
556 let handler_opt = self
557 .inner
558 .handlers
559 .lock()
560 .expect("handlers mutex poisoned")
561 .roots
562 .clone();
563
564 let roots_result = if let Some(handler) = handler_opt {
565 handler.handle_roots_request().await
566 } else {
567 // No handler - return empty list per MCP spec
568 Ok(Vec::new())
569 };
570
571 match roots_result {
572 Ok(roots) => {
573 let result_value =
574 serde_json::to_value(turbomcp_protocol::types::ListRootsResult {
575 roots,
576 _meta: None,
577 })
578 .map_err(|e| {
579 Error::protocol(format!(
580 "Failed to serialize roots response: {}",
581 e
582 ))
583 })?;
584 let response = JsonRpcResponse::success(result_value, request.id);
585 self.send_response(response).await?;
586 }
587 Err(e) => {
588 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
589 code: -32603,
590 message: format!("Roots handler error: {}", e),
591 data: None,
592 };
593 let response = JsonRpcResponse::error_response(error, request.id);
594 self.send_response(response).await?;
595 }
596 }
597 }
598 "elicitation/create" => {
599 // Clone handler Arc before await to avoid holding mutex across await
600 let handler_opt = self
601 .inner
602 .handlers
603 .lock()
604 .expect("handlers mutex poisoned")
605 .elicitation
606 .clone();
607 if let Some(handler) = handler_opt {
608 // Parse elicitation request params as MCP protocol type
609 let proto_request: turbomcp_protocol::types::ElicitRequest =
610 serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
611 .map_err(|e| {
612 Error::protocol(format!("Invalid elicitation params: {}", e))
613 })?;
614
615 // Wrap protocol request with ID for handler (preserves type safety!)
616 let handler_request =
617 crate::handlers::ElicitationRequest::new(request.id.clone(), proto_request);
618
619 // Call the registered elicitation handler
620 match handler.handle_elicitation(handler_request).await {
621 Ok(elicit_response) => {
622 // Convert handler response back to protocol type
623 let proto_result = elicit_response.into_protocol();
624 let result_value = serde_json::to_value(proto_result).map_err(|e| {
625 Error::protocol(format!(
626 "Failed to serialize elicitation response: {}",
627 e
628 ))
629 })?;
630 let response = JsonRpcResponse::success(result_value, request.id);
631 self.send_response(response).await?;
632 }
633 Err(e) => {
634 // Convert handler error to JSON-RPC error using centralized mapping
635 let response =
636 JsonRpcResponse::error_response(e.into_jsonrpc_error(), request.id);
637 self.send_response(response).await?;
638 }
639 }
640 } else {
641 // No handler configured - elicitation not supported
642 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
643 code: -32601,
644 message: "Elicitation not supported - no handler registered".to_string(),
645 data: None,
646 };
647 let response = JsonRpcResponse::error_response(error, request.id);
648 self.send_response(response).await?;
649 }
650 }
651 _ => {
652 // Unknown method
653 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
654 code: -32601,
655 message: format!("Method not found: {}", request.method),
656 data: None,
657 };
658 let response = JsonRpcResponse::error_response(error, request.id);
659 self.send_response(response).await?;
660 }
661 }
662 Ok(())
663 }
664
665 /// Handle server-initiated notifications
666 ///
667 /// Routes notifications to appropriate handlers based on method name.
668 /// MCP defines several notification types that servers can send to clients:
669 ///
670 /// - `notifications/progress` - Progress updates for long-running operations
671 /// - `notifications/message` - Log messages from server
672 /// - `notifications/resources/updated` - Resource content changed
673 /// - `notifications/resources/list_changed` - Resource list changed
674 async fn handle_notification(&self, notification: JsonRpcNotification) -> Result<()> {
675 match notification.method.as_str() {
676 "notifications/progress" => {
677 // Route to progress handler
678 let handler_opt = self
679 .inner
680 .handlers
681 .lock()
682 .expect("handlers mutex poisoned")
683 .get_progress_handler();
684
685 if let Some(handler) = handler_opt {
686 // Parse progress notification
687 let progress: crate::handlers::ProgressNotification = serde_json::from_value(
688 notification.params.unwrap_or(serde_json::Value::Null),
689 )
690 .map_err(|e| {
691 Error::protocol(format!("Invalid progress notification: {}", e))
692 })?;
693
694 // Call handler (errors are logged but don't fail the flow)
695 if let Err(e) = handler.handle_progress(progress).await {
696 tracing::error!("Progress handler error: {}", e);
697 }
698 } else {
699 tracing::debug!("Received progress notification but no handler registered");
700 }
701 }
702
703 "notifications/message" => {
704 // Route to log handler
705 let handler_opt = self
706 .inner
707 .handlers
708 .lock()
709 .expect("handlers mutex poisoned")
710 .get_log_handler();
711
712 if let Some(handler) = handler_opt {
713 // Parse log message
714 let log: crate::handlers::LoggingNotification = serde_json::from_value(
715 notification.params.unwrap_or(serde_json::Value::Null),
716 )
717 .map_err(|e| Error::protocol(format!("Invalid log notification: {}", e)))?;
718
719 // Call handler
720 if let Err(e) = handler.handle_log(log).await {
721 tracing::error!("Log handler error: {}", e);
722 }
723 } else {
724 tracing::debug!("Received log notification but no handler registered");
725 }
726 }
727
728 "notifications/resources/updated" => {
729 // Route to resource update handler
730 let handler_opt = self
731 .inner
732 .handlers
733 .lock()
734 .expect("handlers mutex poisoned")
735 .get_resource_update_handler();
736
737 if let Some(handler) = handler_opt {
738 // Parse resource update notification
739 let update: crate::handlers::ResourceUpdatedNotification =
740 serde_json::from_value(
741 notification.params.unwrap_or(serde_json::Value::Null),
742 )
743 .map_err(|e| {
744 Error::protocol(format!("Invalid resource update notification: {}", e))
745 })?;
746
747 // Call handler
748 if let Err(e) = handler.handle_resource_update(update).await {
749 tracing::error!("Resource update handler error: {}", e);
750 }
751 } else {
752 tracing::debug!(
753 "Received resource update notification but no handler registered"
754 );
755 }
756 }
757
758 "notifications/resources/list_changed" => {
759 // Route to resource list changed handler
760 let handler_opt = self
761 .inner
762 .handlers
763 .lock()
764 .expect("handlers mutex poisoned")
765 .get_resource_list_changed_handler();
766
767 if let Some(handler) = handler_opt {
768 if let Err(e) = handler.handle_resource_list_changed().await {
769 tracing::error!("Resource list changed handler error: {}", e);
770 }
771 } else {
772 tracing::debug!(
773 "Resource list changed notification received (no handler registered)"
774 );
775 }
776 }
777
778 "notifications/prompts/list_changed" => {
779 // Route to prompt list changed handler
780 let handler_opt = self
781 .inner
782 .handlers
783 .lock()
784 .expect("handlers mutex poisoned")
785 .get_prompt_list_changed_handler();
786
787 if let Some(handler) = handler_opt {
788 if let Err(e) = handler.handle_prompt_list_changed().await {
789 tracing::error!("Prompt list changed handler error: {}", e);
790 }
791 } else {
792 tracing::debug!(
793 "Prompt list changed notification received (no handler registered)"
794 );
795 }
796 }
797
798 "notifications/tools/list_changed" => {
799 // Route to tool list changed handler
800 let handler_opt = self
801 .inner
802 .handlers
803 .lock()
804 .expect("handlers mutex poisoned")
805 .get_tool_list_changed_handler();
806
807 if let Some(handler) = handler_opt {
808 if let Err(e) = handler.handle_tool_list_changed().await {
809 tracing::error!("Tool list changed handler error: {}", e);
810 }
811 } else {
812 tracing::debug!(
813 "Tool list changed notification received (no handler registered)"
814 );
815 }
816 }
817
818 "notifications/cancelled" => {
819 // Route to cancellation handler
820 let handler_opt = self
821 .inner
822 .handlers
823 .lock()
824 .expect("handlers mutex poisoned")
825 .get_cancellation_handler();
826
827 if let Some(handler) = handler_opt {
828 // Parse cancellation notification
829 let cancellation: crate::handlers::CancelledNotification =
830 serde_json::from_value(
831 notification.params.unwrap_or(serde_json::Value::Null),
832 )
833 .map_err(|e| {
834 Error::protocol(format!("Invalid cancellation notification: {}", e))
835 })?;
836
837 // Call handler
838 if let Err(e) = handler.handle_cancellation(cancellation).await {
839 tracing::error!("Cancellation handler error: {}", e);
840 }
841 } else {
842 tracing::debug!("Cancellation notification received (no handler registered)");
843 }
844 }
845
846 _ => {
847 // Unknown notification type
848 tracing::debug!("Received unknown notification: {}", notification.method);
849 }
850 }
851
852 Ok(())
853 }
854
855 async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
856 let payload = serde_json::to_vec(&response)
857 .map_err(|e| Error::protocol(format!("Failed to serialize response: {}", e)))?;
858
859 let message = TransportMessage::new(
860 turbomcp_protocol::MessageId::from("response".to_string()),
861 payload.into(),
862 );
863
864 self.inner
865 .protocol
866 .transport()
867 .send(message)
868 .await
869 .map_err(|e| Error::transport(format!("Failed to send response: {}", e)))?;
870
871 Ok(())
872 }
873
874 /// Initialize the connection with the MCP server
875 ///
876 /// Performs the initialization handshake with the server, negotiating capabilities
877 /// and establishing the protocol version. This method must be called before
878 /// any other operations can be performed.
879 ///
880 /// # Returns
881 ///
882 /// Returns an `InitializeResult` containing server information and negotiated capabilities.
883 ///
884 /// # Errors
885 ///
886 /// Returns an error if:
887 /// - The transport connection fails
888 /// - The server rejects the initialization request
889 /// - Protocol negotiation fails
890 ///
891 /// # Examples
892 ///
893 /// ```rust,no_run
894 /// # use turbomcp_client::Client;
895 /// # use turbomcp_transport::stdio::StdioTransport;
896 /// # async fn example() -> turbomcp_protocol::Result<()> {
897 /// let mut client = Client::new(StdioTransport::new());
898 ///
899 /// let result = client.initialize().await?;
900 /// println!("Server: {} v{}", result.server_info.name, result.server_info.version);
901 /// # Ok(())
902 /// # }
903 /// ```
904 pub async fn initialize(&self) -> Result<InitializeResult> {
905 // Auto-connect transport if not already connected
906 // This provides consistent DX across all transports (Stdio, TCP, HTTP, WebSocket, Unix)
907 let transport = self.inner.protocol.transport();
908 let transport_state = transport.state().await;
909 if !matches!(
910 transport_state,
911 turbomcp_transport::TransportState::Connected
912 ) {
913 tracing::debug!(
914 "Auto-connecting transport (current state: {:?})",
915 transport_state
916 );
917 transport
918 .connect()
919 .await
920 .map_err(|e| Error::transport(format!("Failed to connect transport: {}", e)))?;
921 tracing::info!("Transport connected successfully");
922 }
923
924 // Build client capabilities based on registered handlers (automatic detection)
925 let mut client_caps = ProtocolClientCapabilities::default();
926
927 // Detect sampling capability from handler
928 if let Some(sampling_caps) = self.get_sampling_capabilities() {
929 client_caps.sampling = Some(sampling_caps);
930 }
931
932 // Detect elicitation capability from handler
933 if let Some(elicitation_caps) = self.get_elicitation_capabilities() {
934 client_caps.elicitation = Some(elicitation_caps);
935 }
936
937 // Detect roots capability from handler
938 if let Some(roots_caps) = self.get_roots_capabilities() {
939 client_caps.roots = Some(roots_caps);
940 }
941
942 // Send MCP initialization request
943 let request = InitializeRequest {
944 protocol_version: PROTOCOL_VERSION.to_string(),
945 capabilities: client_caps,
946 client_info: turbomcp_protocol::types::Implementation {
947 name: "turbomcp-client".to_string(),
948 version: env!("CARGO_PKG_VERSION").to_string(),
949 title: Some("TurboMCP Client".to_string()),
950 },
951 _meta: None,
952 };
953
954 let protocol_response: ProtocolInitializeResult = self
955 .inner
956 .protocol
957 .request("initialize", Some(serde_json::to_value(request)?))
958 .await?;
959
960 // AtomicBool: lock-free store with Ordering::Relaxed
961 self.inner.initialized.store(true, Ordering::Relaxed);
962
963 // Send initialized notification
964 self.inner
965 .protocol
966 .notify("notifications/initialized", None)
967 .await?;
968
969 // Convert protocol response to client response type
970 Ok(InitializeResult {
971 server_info: protocol_response.server_info,
972 server_capabilities: protocol_response.capabilities,
973 })
974 }
975
976 /// Execute a protocol method with plugin middleware
977 ///
978 /// This is a generic helper for wrapping protocol calls with plugin middleware.
979 pub(crate) async fn execute_with_plugins<R>(
980 &self,
981 method_name: &str,
982 params: Option<serde_json::Value>,
983 ) -> Result<R>
984 where
985 R: serde::de::DeserializeOwned + serde::Serialize + Clone,
986 {
987 // Create JSON-RPC request for plugin context
988 let json_rpc_request = turbomcp_protocol::jsonrpc::JsonRpcRequest {
989 jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
990 id: turbomcp_protocol::MessageId::Number(1),
991 method: method_name.to_string(),
992 params: params.clone(),
993 };
994
995 // 1. Create request context for plugins
996 let mut req_ctx =
997 crate::plugins::RequestContext::new(json_rpc_request, std::collections::HashMap::new());
998
999 // 2. Execute before_request plugin middleware
1000 if let Err(e) = self
1001 .inner
1002 .plugin_registry
1003 .lock()
1004 .await
1005 .execute_before_request(&mut req_ctx)
1006 .await
1007 {
1008 return Err(Error::bad_request(format!(
1009 "Plugin before_request failed: {}",
1010 e
1011 )));
1012 }
1013
1014 // 3. Execute the actual protocol call
1015 let start_time = std::time::Instant::now();
1016 let protocol_result: Result<R> = self
1017 .inner
1018 .protocol
1019 .request(method_name, req_ctx.params().cloned())
1020 .await;
1021 let duration = start_time.elapsed();
1022
1023 // 4. Prepare response context
1024 let mut resp_ctx = match protocol_result {
1025 Ok(ref response) => {
1026 let response_value = serde_json::to_value(response.clone())?;
1027 crate::plugins::ResponseContext::new(req_ctx, Some(response_value), None, duration)
1028 }
1029 Err(ref e) => {
1030 crate::plugins::ResponseContext::new(req_ctx, None, Some(*e.clone()), duration)
1031 }
1032 };
1033
1034 // 5. Execute after_response plugin middleware
1035 if let Err(e) = self
1036 .inner
1037 .plugin_registry
1038 .lock()
1039 .await
1040 .execute_after_response(&mut resp_ctx)
1041 .await
1042 {
1043 return Err(Error::bad_request(format!(
1044 "Plugin after_response failed: {}",
1045 e
1046 )));
1047 }
1048
1049 // 6. Return the final result, checking for plugin modifications
1050 match protocol_result {
1051 Ok(ref response) => {
1052 // Check if plugins modified the response
1053 if let Some(modified_response) = resp_ctx.response {
1054 // Try to deserialize the modified response
1055 if let Ok(modified_result) =
1056 serde_json::from_value::<R>(modified_response.clone())
1057 {
1058 return Ok(modified_result);
1059 }
1060 }
1061
1062 // No plugin modifications, use original response
1063 Ok(response.clone())
1064 }
1065 Err(e) => {
1066 // Check if plugins provided an error recovery response
1067 if let Some(recovery_response) = resp_ctx.response {
1068 if let Ok(recovery_result) = serde_json::from_value::<R>(recovery_response) {
1069 Ok(recovery_result)
1070 } else {
1071 Err(e)
1072 }
1073 } else {
1074 Err(e)
1075 }
1076 }
1077 }
1078 }
1079
1080 /// Subscribe to resource change notifications
1081 ///
1082 /// Registers interest in receiving notifications when the specified
1083 /// resource changes. The server will send notifications when the
1084 /// resource is modified, created, or deleted.
1085 ///
1086 /// # Arguments
1087 ///
1088 /// * `uri` - The URI of the resource to monitor
1089 ///
1090 /// # Returns
1091 ///
1092 /// Returns `EmptyResult` on successful subscription.
1093 ///
1094 /// # Errors
1095 ///
1096 /// Returns an error if:
1097 /// - The client is not initialized
1098 /// - The URI is invalid or empty
1099 /// - The server doesn't support subscriptions
1100 /// - The request fails
1101 ///
1102 /// # Examples
1103 ///
1104 /// ```rust,no_run
1105 /// # use turbomcp_client::Client;
1106 /// # use turbomcp_transport::stdio::StdioTransport;
1107 /// # async fn example() -> turbomcp_protocol::Result<()> {
1108 /// let mut client = Client::new(StdioTransport::new());
1109 /// client.initialize().await?;
1110 ///
1111 /// // Subscribe to file changes
1112 /// client.subscribe("file:///watch/directory").await?;
1113 /// println!("Subscribed to resource changes");
1114 /// # Ok(())
1115 /// # }
1116 /// ```
1117 pub async fn subscribe(&self, uri: &str) -> Result<EmptyResult> {
1118 if !self.inner.initialized.load(Ordering::Relaxed) {
1119 return Err(Error::bad_request("Client not initialized"));
1120 }
1121
1122 if uri.is_empty() {
1123 return Err(Error::bad_request("Subscription URI cannot be empty"));
1124 }
1125
1126 // Send resources/subscribe request with plugin middleware
1127 let request = SubscribeRequest {
1128 uri: uri.to_string(),
1129 };
1130
1131 self.execute_with_plugins(
1132 "resources/subscribe",
1133 Some(serde_json::to_value(request).map_err(|e| {
1134 Error::protocol(format!("Failed to serialize subscribe request: {}", e))
1135 })?),
1136 )
1137 .await
1138 }
1139
1140 /// Unsubscribe from resource change notifications
1141 ///
1142 /// Cancels a previous subscription to resource changes. After unsubscribing,
1143 /// the client will no longer receive notifications for the specified resource.
1144 ///
1145 /// # Arguments
1146 ///
1147 /// * `uri` - The URI of the resource to stop monitoring
1148 ///
1149 /// # Returns
1150 ///
1151 /// Returns `EmptyResult` on successful unsubscription.
1152 ///
1153 /// # Errors
1154 ///
1155 /// Returns an error if:
1156 /// - The client is not initialized
1157 /// - The URI is invalid or empty
1158 /// - No active subscription exists for the URI
1159 /// - The request fails
1160 ///
1161 /// # Examples
1162 ///
1163 /// ```rust,no_run
1164 /// # use turbomcp_client::Client;
1165 /// # use turbomcp_transport::stdio::StdioTransport;
1166 /// # async fn example() -> turbomcp_protocol::Result<()> {
1167 /// let mut client = Client::new(StdioTransport::new());
1168 /// client.initialize().await?;
1169 ///
1170 /// // Unsubscribe from file changes
1171 /// client.unsubscribe("file:///watch/directory").await?;
1172 /// println!("Unsubscribed from resource changes");
1173 /// # Ok(())
1174 /// # }
1175 /// ```
1176 pub async fn unsubscribe(&self, uri: &str) -> Result<EmptyResult> {
1177 if !self.inner.initialized.load(Ordering::Relaxed) {
1178 return Err(Error::bad_request("Client not initialized"));
1179 }
1180
1181 if uri.is_empty() {
1182 return Err(Error::bad_request("Unsubscription URI cannot be empty"));
1183 }
1184
1185 // Send resources/unsubscribe request with plugin middleware
1186 let request = UnsubscribeRequest {
1187 uri: uri.to_string(),
1188 };
1189
1190 self.execute_with_plugins(
1191 "resources/unsubscribe",
1192 Some(serde_json::to_value(request).map_err(|e| {
1193 Error::protocol(format!("Failed to serialize unsubscribe request: {}", e))
1194 })?),
1195 )
1196 .await
1197 }
1198
1199 /// Get the client's capabilities configuration
1200 pub fn capabilities(&self) -> &ClientCapabilities {
1201 &self.inner.capabilities
1202 }
1203
1204 /// Initialize all registered plugins
1205 ///
1206 /// This should be called after registration but before using the client.
1207 pub async fn initialize_plugins(&self) -> Result<()> {
1208 // Set up client context for plugins with actual client capabilities
1209 let mut capabilities = std::collections::HashMap::new();
1210 capabilities.insert(
1211 "protocol_version".to_string(),
1212 serde_json::json!("2024-11-05"),
1213 );
1214 capabilities.insert(
1215 "mcp_version".to_string(),
1216 serde_json::json!(env!("CARGO_PKG_VERSION")),
1217 );
1218 capabilities.insert(
1219 "supports_notifications".to_string(),
1220 serde_json::json!(true),
1221 );
1222 capabilities.insert(
1223 "supports_sampling".to_string(),
1224 serde_json::json!(self.has_sampling_handler()),
1225 );
1226 capabilities.insert("supports_progress".to_string(), serde_json::json!(true));
1227 capabilities.insert("supports_roots".to_string(), serde_json::json!(true));
1228
1229 // Extract client configuration
1230 let mut config = std::collections::HashMap::new();
1231 config.insert(
1232 "client_name".to_string(),
1233 serde_json::json!("turbomcp-client"),
1234 );
1235 config.insert(
1236 "initialized".to_string(),
1237 serde_json::json!(self.inner.initialized.load(Ordering::Relaxed)),
1238 );
1239 config.insert(
1240 "plugin_count".to_string(),
1241 serde_json::json!(self.inner.plugin_registry.lock().await.plugin_count()),
1242 );
1243
1244 let context = crate::plugins::PluginContext::new(
1245 "turbomcp-client".to_string(),
1246 env!("CARGO_PKG_VERSION").to_string(),
1247 capabilities,
1248 config,
1249 vec![], // Will be populated by the registry
1250 );
1251
1252 self.inner
1253 .plugin_registry
1254 .lock()
1255 .await
1256 .set_client_context(context);
1257
1258 // Note: Individual plugins are initialized automatically during registration
1259 // via PluginRegistry::register_plugin(). This method ensures the registry
1260 // has proper client context for any future plugin registrations.
1261 Ok(())
1262 }
1263
1264 /// Cleanup all registered plugins
1265 ///
1266 /// This should be called when the client is being shut down.
1267 pub async fn cleanup_plugins(&self) -> Result<()> {
1268 // Clear the plugin registry - plugins will be dropped and cleaned up automatically
1269 // The Rust ownership system ensures proper cleanup when the Arc<dyn ClientPlugin>
1270 // references are dropped.
1271
1272 // Note: The plugin system uses RAII (Resource Acquisition Is Initialization)
1273 // pattern where plugins clean up their resources in their Drop implementation.
1274 // Replace the registry with a fresh one (mutex ensures safe access)
1275 *self.inner.plugin_registry.lock().await = crate::plugins::PluginRegistry::new();
1276 Ok(())
1277 }
1278
1279 // Note: Capability detection methods (has_*_handler, get_*_capabilities)
1280 // are defined in their respective operation modules:
1281 // - sampling.rs: has_sampling_handler, get_sampling_capabilities
1282 // - handlers.rs: has_elicitation_handler, has_roots_handler
1283 //
1284 // Additional capability getters for elicitation and roots added below
1285 // since they're used during initialization
1286
1287 /// Get elicitation capabilities if handler is registered
1288 /// Automatically detects capability based on registered handler
1289 fn get_elicitation_capabilities(
1290 &self,
1291 ) -> Option<turbomcp_protocol::types::ElicitationCapabilities> {
1292 if self.has_elicitation_handler() {
1293 // Currently returns default capabilities. In the future, schema_validation support
1294 // could be detected from handler traits by adding a HasSchemaValidation marker trait
1295 // that handlers could implement. For now, handlers validate schemas themselves.
1296 Some(turbomcp_protocol::types::ElicitationCapabilities::default())
1297 } else {
1298 None
1299 }
1300 }
1301
1302 /// Get roots capabilities if handler is registered
1303 fn get_roots_capabilities(&self) -> Option<turbomcp_protocol::types::RootsCapabilities> {
1304 if self.has_roots_handler() {
1305 // Roots capabilities indicate whether list can change
1306 Some(turbomcp_protocol::types::RootsCapabilities {
1307 list_changed: Some(true), // Support dynamic roots by default
1308 })
1309 } else {
1310 None
1311 }
1312 }
1313}