Skip to main content

tower_mcp/client/
channel.rs

1//! In-process channel transport for connecting an [`McpClient`] to an [`McpRouter`].
2//!
3//! This transport bridges client and server in the same process without
4//! any network or subprocess overhead. It is primarily useful for testing
5//! (e.g., proxy tests) but can also be used for in-process composition.
6//!
7//! # Example
8//!
9//! ```rust,no_run
10//! use tower_mcp::client::{McpClient, ChannelTransport};
11//! use tower_mcp::McpRouter;
12//!
13//! # async fn example() -> Result<(), tower_mcp::BoxError> {
14//! let router = McpRouter::new().server_info("backend", "1.0.0");
15//! let transport = ChannelTransport::new(router);
16//! let client = McpClient::connect(transport).await?;
17//! client.initialize("my-client", "1.0.0").await?;
18//! # Ok(())
19//! # }
20//! ```
21
22use async_trait::async_trait;
23use tokio::sync::mpsc;
24
25use crate::context::notification_channel;
26use crate::error::Result;
27use crate::jsonrpc::JsonRpcService;
28use crate::protocol::{JsonRpcRequest, JsonRpcResponse, McpNotification};
29use crate::router::McpRouter;
30
31use super::transport::ClientTransport;
32
33/// An in-process [`ClientTransport`] that connects directly to an [`McpRouter`].
34///
35/// Messages are passed through tokio channels — the transport spawns a
36/// background task that feeds incoming JSON-RPC requests to a
37/// [`JsonRpcService<McpRouter>`] and returns responses.
38pub struct ChannelTransport {
39    /// Send raw JSON messages to the server task.
40    request_tx: mpsc::Sender<String>,
41    /// Receive raw JSON responses from the server task.
42    response_rx: mpsc::Receiver<String>,
43    connected: bool,
44}
45
46impl ChannelTransport {
47    /// Create a new channel transport backed by the given router.
48    ///
49    /// Spawns a background tokio task that processes requests.
50    pub fn new(router: McpRouter) -> Self {
51        let (request_tx, mut request_rx) = mpsc::channel::<String>(64);
52        let (response_tx, response_rx) = mpsc::channel::<String>(64);
53
54        let (notification_tx, _notification_rx) = notification_channel(64);
55        let router = router.with_notification_sender(notification_tx);
56        let mut service = JsonRpcService::new(router.clone());
57
58        tokio::spawn(async move {
59            while let Some(raw_request) = request_rx.recv().await {
60                // Parse the incoming JSON
61                let req: JsonRpcRequest = match serde_json::from_str(&raw_request) {
62                    Ok(r) => r,
63                    Err(e) => {
64                        tracing::error!("ChannelTransport: failed to parse request: {}", e);
65                        continue;
66                    }
67                };
68
69                // Check for initialized notification embedded as a request
70                // (McpClient sends notifications as JSON-RPC messages)
71                if req.method == "notifications/initialized" {
72                    router.handle_notification(McpNotification::Initialized);
73                    // No response for notifications
74                    continue;
75                }
76
77                // Handle other notifications (no response expected)
78                if req.method.starts_with("notifications/") {
79                    continue;
80                }
81
82                // Process the request through JsonRpcService
83                let response = service.call_single(req).await;
84
85                let json = match response {
86                    Ok(resp) => match serde_json::to_string(&resp) {
87                        Ok(j) => j,
88                        Err(e) => {
89                            tracing::error!(
90                                "ChannelTransport: failed to serialize response: {}",
91                                e
92                            );
93                            continue;
94                        }
95                    },
96                    Err(e) => {
97                        // Convert error to a JSON-RPC error response
98                        let err_resp = JsonRpcResponse::error(
99                            None,
100                            tower_mcp_types::JsonRpcError::internal_error(e.to_string()),
101                        );
102                        match serde_json::to_string(&err_resp) {
103                            Ok(j) => j,
104                            Err(_) => continue,
105                        }
106                    }
107                };
108
109                if response_tx.send(json).await.is_err() {
110                    break; // Client dropped
111                }
112            }
113        });
114
115        Self {
116            request_tx,
117            response_rx,
118            connected: true,
119        }
120    }
121}
122
123#[async_trait]
124impl ClientTransport for ChannelTransport {
125    async fn send(&mut self, message: &str) -> Result<()> {
126        self.request_tx
127            .send(message.to_string())
128            .await
129            .map_err(|_| crate::error::Error::internal("ChannelTransport: server task dropped"))?;
130        Ok(())
131    }
132
133    async fn recv(&mut self) -> Result<Option<String>> {
134        match self.response_rx.recv().await {
135            Some(msg) => Ok(Some(msg)),
136            None => {
137                self.connected = false;
138                Ok(None)
139            }
140        }
141    }
142
143    fn is_connected(&self) -> bool {
144        self.connected
145    }
146
147    async fn close(&mut self) -> Result<()> {
148        self.connected = false;
149        Ok(())
150    }
151}