codetether_agent/mcp/
transport.rs1use super::types::*;
4use anyhow::Result;
5use async_trait::async_trait;
6use serde_json::Value;
7use std::io::{BufRead, Write};
8use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
9use tokio::sync::mpsc;
10use tracing::{debug, error, trace, warn};
11
12#[async_trait]
14pub trait Transport: Send + Sync {
15 async fn send_request(&self, request: JsonRpcRequest) -> Result<()>;
17
18 async fn send_response(&self, response: JsonRpcResponse) -> Result<()>;
20
21 async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()>;
23
24 async fn receive(&self) -> Result<Option<McpMessage>>;
26
27 async fn close(&self) -> Result<()>;
29}
30
31#[derive(Debug, Clone)]
33pub enum McpMessage {
34 Request(JsonRpcRequest),
35 Response(JsonRpcResponse),
36 Notification(JsonRpcNotification),
37}
38
39impl McpMessage {
40 pub fn from_json(value: Value) -> Result<Self> {
41 if value.get("id").is_some() {
43 if value.get("method").is_some() {
45 let request: JsonRpcRequest = serde_json::from_value(value)?;
47 Ok(McpMessage::Request(request))
48 } else {
49 let response: JsonRpcResponse = serde_json::from_value(value)?;
51 Ok(McpMessage::Response(response))
52 }
53 } else {
54 let notification: JsonRpcNotification = serde_json::from_value(value)?;
56 Ok(McpMessage::Notification(notification))
57 }
58 }
59}
60
61pub struct StdioTransport {
63 #[allow(dead_code)]
65 tx: mpsc::Sender<String>,
66 rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
67}
68
69#[derive(Debug, Default, Clone)]
76pub struct NullTransport;
77
78impl NullTransport {
79 pub fn new() -> Self {
80 Self
81 }
82}
83
84impl Default for StdioTransport {
85 fn default() -> Self {
86 Self::new()
87 }
88}
89
90impl StdioTransport {
91 pub fn new() -> Self {
93 let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
94 let (read_tx, read_rx) = mpsc::channel::<String>(100);
95
96 std::thread::spawn(move || {
98 let mut stdout = std::io::stdout().lock();
99 while let Some(msg) = write_rx.blocking_recv() {
100 trace!("MCP TX: {}", msg);
101 if let Err(e) = writeln!(stdout, "{}", msg) {
102 error!("Failed to write to stdout: {}", e);
103 break;
104 }
105 if let Err(e) = stdout.flush() {
106 error!("Failed to flush stdout: {}", e);
107 break;
108 }
109 }
110 });
111
112 std::thread::spawn(move || {
114 let stdin = std::io::stdin();
115 let reader = stdin.lock();
116 for line in reader.lines() {
117 match line {
118 Ok(msg) if !msg.is_empty() => {
119 trace!("MCP RX: {}", msg);
120 if read_tx.blocking_send(msg).is_err() {
121 break;
122 }
123 }
124 Ok(_) => continue, Err(e) => {
126 error!("Failed to read from stdin: {}", e);
127 break;
128 }
129 }
130 }
131 });
132
133 Self {
134 tx: write_tx,
135 rx: tokio::sync::Mutex::new(read_rx),
136 }
137 }
138
139 async fn send_json(&self, value: Value) -> Result<()> {
140 let json = serde_json::to_string(&value)?;
141 self.tx.send(json).await?;
142 Ok(())
143 }
144}
145
146#[async_trait]
147impl Transport for StdioTransport {
148 async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
149 self.send_json(serde_json::to_value(&request)?).await
150 }
151
152 async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
153 self.send_json(serde_json::to_value(&response)?).await
154 }
155
156 async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
157 self.send_json(serde_json::to_value(¬ification)?).await
158 }
159
160 async fn receive(&self) -> Result<Option<McpMessage>> {
161 let mut rx = self.rx.lock().await;
162 match rx.recv().await {
163 Some(line) => {
164 let value: Value = serde_json::from_str(&line)?;
165 let msg = McpMessage::from_json(value)?;
166 Ok(Some(msg))
167 }
168 None => Ok(None),
169 }
170 }
171
172 async fn close(&self) -> Result<()> {
173 Ok(())
175 }
176}
177
178#[async_trait]
179impl Transport for NullTransport {
180 async fn send_request(&self, _request: JsonRpcRequest) -> Result<()> {
181 Ok(())
182 }
183
184 async fn send_response(&self, _response: JsonRpcResponse) -> Result<()> {
185 Ok(())
186 }
187
188 async fn send_notification(&self, _notification: JsonRpcNotification) -> Result<()> {
189 Ok(())
190 }
191
192 async fn receive(&self) -> Result<Option<McpMessage>> {
193 Ok(None)
194 }
195
196 async fn close(&self) -> Result<()> {
197 Ok(())
198 }
199}
200
201pub struct SseTransport {
203 endpoint: String,
204 client: reqwest::Client,
205 _tx: mpsc::Sender<String>,
206 rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
207}
208
209impl SseTransport {
210 pub async fn new(endpoint: String) -> Result<Self> {
212 let client = reqwest::Client::new();
213 let (write_tx, _write_rx) = mpsc::channel::<String>(100);
214 let (read_tx, read_rx) = mpsc::channel::<String>(100);
215
216 let endpoint_clone = endpoint.clone();
219 let read_tx_clone = read_tx;
220
221 tokio::spawn(async move {
222 debug!("SSE transport connecting to: {}", endpoint_clone);
223 let _ = read_tx_clone;
225 });
226
227 Ok(Self {
228 endpoint,
229 client,
230 _tx: write_tx,
231 rx: tokio::sync::Mutex::new(read_rx),
232 })
233 }
234
235 async fn send_json(&self, value: Value) -> Result<()> {
236 let json = serde_json::to_string(&value)?;
237 debug!("SSE TX: {}", json);
238
239 self.client
241 .post(&self.endpoint)
242 .header("Content-Type", "application/json")
243 .body(json)
244 .send()
245 .await?;
246
247 Ok(())
248 }
249}
250
251#[async_trait]
252impl Transport for SseTransport {
253 async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
254 self.send_json(serde_json::to_value(&request)?).await
255 }
256
257 async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
258 self.send_json(serde_json::to_value(&response)?).await
259 }
260
261 async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
262 self.send_json(serde_json::to_value(¬ification)?).await
263 }
264
265 async fn receive(&self) -> Result<Option<McpMessage>> {
266 let mut rx = self.rx.lock().await;
267 match rx.recv().await {
268 Some(line) => {
269 let value: Value = serde_json::from_str(&line)?;
270 let msg = McpMessage::from_json(value)?;
271 Ok(Some(msg))
272 }
273 None => Ok(None),
274 }
275 }
276
277 async fn close(&self) -> Result<()> {
278 Ok(())
279 }
280}
281
282pub struct ProcessTransport {
284 _child: tokio::process::Child,
285 tx: mpsc::Sender<String>,
286 rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
287}
288
289impl ProcessTransport {
290 pub async fn spawn(command: &str, args: &[&str]) -> Result<Self> {
292 use tokio::process::Command;
293
294 let mut child = Command::new(command)
295 .args(args)
296 .stdin(std::process::Stdio::piped())
297 .stdout(std::process::Stdio::piped())
298 .stderr(std::process::Stdio::piped())
299 .spawn()?;
300
301 let stdout = child
302 .stdout
303 .take()
304 .ok_or_else(|| anyhow::anyhow!("No stdout"))?;
305 let mut stdin = child
306 .stdin
307 .take()
308 .ok_or_else(|| anyhow::anyhow!("No stdin"))?;
309 let stderr = child
310 .stderr
311 .take()
312 .ok_or_else(|| anyhow::anyhow!("No stderr"))?;
313
314 let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
315 let (read_tx, read_rx) = mpsc::channel::<String>(100);
316
317 tokio::spawn(async move {
319 let mut reader = BufReader::new(stderr);
320 let mut line = String::new();
321 loop {
322 line.clear();
323 match reader.read_line(&mut line).await {
324 Ok(0) => break,
325 Ok(_) => {
326 let trimmed = line.trim();
327 if !trimmed.is_empty() {
328 warn!(target: "mcp_subprocess", "{trimmed}");
329 }
330 }
331 Err(_) => break,
332 }
333 }
334 });
335
336 tokio::spawn(async move {
338 while let Some(msg) = write_rx.recv().await {
339 trace!("Process TX: {}", msg);
340 if let Err(e) = stdin.write_all(format!("{}\n", msg).as_bytes()).await {
341 error!("Failed to write to process stdin: {}", e);
342 break;
343 }
344 if let Err(e) = stdin.flush().await {
345 error!("Failed to flush process stdin: {}", e);
346 break;
347 }
348 }
349 });
350
351 tokio::spawn(async move {
353 let mut reader = BufReader::new(stdout);
354 let mut line = String::new();
355 loop {
356 line.clear();
357 match reader.read_line(&mut line).await {
358 Ok(0) => break, Ok(_) => {
360 let trimmed = line.trim();
361 if !trimmed.is_empty() {
362 trace!("Process RX: {}", trimmed);
363 if read_tx.send(trimmed.to_string()).await.is_err() {
364 break;
365 }
366 }
367 }
368 Err(e) => {
369 error!("Failed to read from process stdout: {}", e);
370 break;
371 }
372 }
373 }
374 });
375
376 Ok(Self {
377 _child: child,
378 tx: write_tx,
379 rx: tokio::sync::Mutex::new(read_rx),
380 })
381 }
382
383 async fn send_json(&self, value: Value) -> Result<()> {
384 let json = serde_json::to_string(&value)?;
385 self.tx.send(json).await?;
386 Ok(())
387 }
388}
389
390#[async_trait]
391impl Transport for ProcessTransport {
392 async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
393 self.send_json(serde_json::to_value(&request)?).await
394 }
395
396 async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
397 self.send_json(serde_json::to_value(&response)?).await
398 }
399
400 async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
401 self.send_json(serde_json::to_value(¬ification)?).await
402 }
403
404 async fn receive(&self) -> Result<Option<McpMessage>> {
405 let mut rx = self.rx.lock().await;
406 match rx.recv().await {
407 Some(line) => {
408 let value: Value = serde_json::from_str(&line)?;
409 let msg = McpMessage::from_json(value)?;
410 Ok(Some(msg))
411 }
412 None => Ok(None),
413 }
414 }
415
416 async fn close(&self) -> Result<()> {
417 Ok(())
418 }
419}