tower_mcp/client/channel.rs
1//! In-process channel transport for connecting an [`McpClient`] to an [`McpRouter`].
2//!
3//! This transport bridges client and server in the same process without
4//! any network or subprocess overhead. It is primarily useful for testing
5//! (e.g., proxy tests) but can also be used for in-process composition.
6//!
7//! # Example
8//!
9//! ```rust,no_run
10//! use tower_mcp::client::{McpClient, ChannelTransport};
11//! use tower_mcp::McpRouter;
12//!
13//! # async fn example() -> Result<(), tower_mcp::BoxError> {
14//! let router = McpRouter::new().server_info("backend", "1.0.0");
15//! let transport = ChannelTransport::new(router);
16//! let client = McpClient::connect(transport).await?;
17//! client.initialize("my-client", "1.0.0").await?;
18//! # Ok(())
19//! # }
20//! ```
21
22use async_trait::async_trait;
23use tokio::sync::mpsc;
24
25use crate::context::notification_channel;
26use crate::error::Result;
27use crate::jsonrpc::JsonRpcService;
28use crate::protocol::{JsonRpcRequest, JsonRpcResponse, McpNotification};
29use crate::router::McpRouter;
30
31use super::transport::ClientTransport;
32
33/// An in-process [`ClientTransport`] that connects directly to an [`McpRouter`].
34///
35/// Messages are passed through tokio channels — the transport spawns a
36/// background task that feeds incoming JSON-RPC requests to a
37/// [`JsonRpcService<McpRouter>`] and returns responses.
38pub struct ChannelTransport {
39 /// Send raw JSON messages to the server task.
40 request_tx: mpsc::Sender<String>,
41 /// Receive raw JSON responses from the server task.
42 response_rx: mpsc::Receiver<String>,
43 connected: bool,
44}
45
46impl ChannelTransport {
47 /// Create a new channel transport backed by the given router.
48 ///
49 /// Spawns a background tokio task that processes requests.
50 pub fn new(router: McpRouter) -> Self {
51 let (request_tx, mut request_rx) = mpsc::channel::<String>(64);
52 let (response_tx, response_rx) = mpsc::channel::<String>(64);
53
54 let (notification_tx, _notification_rx) = notification_channel(64);
55 let router = router.with_notification_sender(notification_tx);
56 let mut service = JsonRpcService::new(router.clone());
57
58 tokio::spawn(async move {
59 while let Some(raw_request) = request_rx.recv().await {
60 // Parse the incoming JSON
61 let req: JsonRpcRequest = match serde_json::from_str(&raw_request) {
62 Ok(r) => r,
63 Err(e) => {
64 tracing::error!("ChannelTransport: failed to parse request: {}", e);
65 continue;
66 }
67 };
68
69 // Check for initialized notification embedded as a request
70 // (McpClient sends notifications as JSON-RPC messages)
71 if req.method == "notifications/initialized" {
72 router.handle_notification(McpNotification::Initialized);
73 // No response for notifications
74 continue;
75 }
76
77 // Handle other notifications (no response expected)
78 if req.method.starts_with("notifications/") {
79 continue;
80 }
81
82 // Process the request through JsonRpcService
83 let response = service.call_single(req).await;
84
85 let json = match response {
86 Ok(resp) => match serde_json::to_string(&resp) {
87 Ok(j) => j,
88 Err(e) => {
89 tracing::error!(
90 "ChannelTransport: failed to serialize response: {}",
91 e
92 );
93 continue;
94 }
95 },
96 Err(e) => {
97 // Convert error to a JSON-RPC error response
98 let err_resp = JsonRpcResponse::error(
99 None,
100 tower_mcp_types::JsonRpcError::internal_error(e.to_string()),
101 );
102 match serde_json::to_string(&err_resp) {
103 Ok(j) => j,
104 Err(_) => continue,
105 }
106 }
107 };
108
109 if response_tx.send(json).await.is_err() {
110 break; // Client dropped
111 }
112 }
113 });
114
115 Self {
116 request_tx,
117 response_rx,
118 connected: true,
119 }
120 }
121}
122
123#[async_trait]
124impl ClientTransport for ChannelTransport {
125 async fn send(&mut self, message: &str) -> Result<()> {
126 self.request_tx
127 .send(message.to_string())
128 .await
129 .map_err(|_| crate::error::Error::internal("ChannelTransport: server task dropped"))?;
130 Ok(())
131 }
132
133 async fn recv(&mut self) -> Result<Option<String>> {
134 match self.response_rx.recv().await {
135 Some(msg) => Ok(Some(msg)),
136 None => {
137 self.connected = false;
138 Ok(None)
139 }
140 }
141 }
142
143 fn is_connected(&self) -> bool {
144 self.connected
145 }
146
147 async fn close(&mut self) -> Result<()> {
148 self.connected = false;
149 Ok(())
150 }
151}