codetether_agent/mcp/
transport.rs1use super::types::*;
4use anyhow::Result;
5use async_trait::async_trait;
6use serde_json::Value;
7use std::io::{BufRead, Write};
8use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
9use tokio::sync::mpsc;
10use tracing::{debug, error, trace, warn};
11
12#[async_trait]
14pub trait Transport: Send + Sync {
15 async fn send_request(&self, request: JsonRpcRequest) -> Result<()>;
17
18 async fn send_response(&self, response: JsonRpcResponse) -> Result<()>;
20
21 async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()>;
23
24 async fn receive(&self) -> Result<Option<McpMessage>>;
26
27 async fn close(&self) -> Result<()>;
29}
30
31#[derive(Debug, Clone)]
33pub enum McpMessage {
34 Request(JsonRpcRequest),
35 Response(JsonRpcResponse),
36 Notification(JsonRpcNotification),
37}
38
39impl McpMessage {
40 pub fn from_json(value: Value) -> Result<Self> {
41 if value.get("id").is_some() {
43 if value.get("method").is_some() {
45 let request: JsonRpcRequest = serde_json::from_value(value)?;
47 Ok(McpMessage::Request(request))
48 } else {
49 let response: JsonRpcResponse = serde_json::from_value(value)?;
51 Ok(McpMessage::Response(response))
52 }
53 } else {
54 let notification: JsonRpcNotification = serde_json::from_value(value)?;
56 Ok(McpMessage::Notification(notification))
57 }
58 }
59}
60
61pub struct StdioTransport {
63 tx: mpsc::Sender<String>,
65 rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
66}
67
68#[derive(Debug, Default, Clone)]
75pub struct NullTransport;
76
77impl NullTransport {
78 pub fn new() -> Self {
79 Self
80 }
81}
82
83impl Default for StdioTransport {
84 fn default() -> Self {
85 Self::new()
86 }
87}
88
89impl StdioTransport {
90 pub fn new() -> Self {
92 let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
93 let (read_tx, read_rx) = mpsc::channel::<String>(100);
94
95 std::thread::spawn(move || {
97 let mut stdout = std::io::stdout().lock();
98 while let Some(msg) = write_rx.blocking_recv() {
99 trace!("MCP TX: {}", msg);
100 if let Err(e) = writeln!(stdout, "{}", msg) {
101 error!("Failed to write to stdout: {}", e);
102 break;
103 }
104 if let Err(e) = stdout.flush() {
105 error!("Failed to flush stdout: {}", e);
106 break;
107 }
108 }
109 });
110
111 std::thread::spawn(move || {
113 let stdin = std::io::stdin();
114 let reader = stdin.lock();
115 for line in reader.lines() {
116 match line {
117 Ok(msg) if !msg.is_empty() => {
118 trace!("MCP RX: {}", msg);
119 if read_tx.blocking_send(msg).is_err() {
120 break;
121 }
122 }
123 Ok(_) => continue, Err(e) => {
125 error!("Failed to read from stdin: {}", e);
126 break;
127 }
128 }
129 }
130 });
131
132 Self {
133 tx: write_tx,
134 rx: tokio::sync::Mutex::new(read_rx),
135 }
136 }
137
138 async fn send_json(&self, value: Value) -> Result<()> {
139 let json = serde_json::to_string(&value)?;
140 self.tx.send(json).await?;
141 Ok(())
142 }
143}
144
145#[async_trait]
146impl Transport for StdioTransport {
147 async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
148 self.send_json(serde_json::to_value(&request)?).await
149 }
150
151 async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
152 self.send_json(serde_json::to_value(&response)?).await
153 }
154
155 async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
156 self.send_json(serde_json::to_value(¬ification)?).await
157 }
158
159 async fn receive(&self) -> Result<Option<McpMessage>> {
160 let mut rx = self.rx.lock().await;
161 match rx.recv().await {
162 Some(line) => {
163 let value: Value = serde_json::from_str(&line)?;
164 let msg = McpMessage::from_json(value)?;
165 Ok(Some(msg))
166 }
167 None => Ok(None),
168 }
169 }
170
171 async fn close(&self) -> Result<()> {
172 Ok(())
174 }
175}
176
177#[async_trait]
178impl Transport for NullTransport {
179 async fn send_request(&self, _request: JsonRpcRequest) -> Result<()> {
180 Ok(())
181 }
182
183 async fn send_response(&self, _response: JsonRpcResponse) -> Result<()> {
184 Ok(())
185 }
186
187 async fn send_notification(&self, _notification: JsonRpcNotification) -> Result<()> {
188 Ok(())
189 }
190
191 async fn receive(&self) -> Result<Option<McpMessage>> {
192 Ok(None)
193 }
194
195 async fn close(&self) -> Result<()> {
196 Ok(())
197 }
198}
199
200pub struct SseTransport {
202 endpoint: String,
203 client: reqwest::Client,
204 _tx: mpsc::Sender<String>,
205 rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
206}
207
208impl SseTransport {
209 pub async fn new(endpoint: String) -> Result<Self> {
211 let client = reqwest::Client::new();
212 let (write_tx, _write_rx) = mpsc::channel::<String>(100);
213 let (read_tx, read_rx) = mpsc::channel::<String>(100);
214
215 let endpoint_clone = endpoint.clone();
218 let read_tx_clone = read_tx;
219
220 tokio::spawn(async move {
221 debug!("SSE transport connecting to: {}", endpoint_clone);
222 let _ = read_tx_clone;
224 });
225
226 Ok(Self {
227 endpoint,
228 client,
229 _tx: write_tx,
230 rx: tokio::sync::Mutex::new(read_rx),
231 })
232 }
233
234 async fn send_json(&self, value: Value) -> Result<()> {
235 let json = serde_json::to_string(&value)?;
236 debug!("SSE TX: {}", json);
237
238 self.client
240 .post(&self.endpoint)
241 .header("Content-Type", "application/json")
242 .body(json)
243 .send()
244 .await?;
245
246 Ok(())
247 }
248}
249
250#[async_trait]
251impl Transport for SseTransport {
252 async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
253 self.send_json(serde_json::to_value(&request)?).await
254 }
255
256 async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
257 self.send_json(serde_json::to_value(&response)?).await
258 }
259
260 async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
261 self.send_json(serde_json::to_value(¬ification)?).await
262 }
263
264 async fn receive(&self) -> Result<Option<McpMessage>> {
265 let mut rx = self.rx.lock().await;
266 match rx.recv().await {
267 Some(line) => {
268 let value: Value = serde_json::from_str(&line)?;
269 let msg = McpMessage::from_json(value)?;
270 Ok(Some(msg))
271 }
272 None => Ok(None),
273 }
274 }
275
276 async fn close(&self) -> Result<()> {
277 Ok(())
278 }
279}
280
281pub struct ProcessTransport {
283 _child: tokio::process::Child,
284 tx: mpsc::Sender<String>,
285 rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
286}
287
288impl ProcessTransport {
289 pub async fn spawn(command: &str, args: &[&str]) -> Result<Self> {
291 use tokio::process::Command;
292
293 let mut child = Command::new(command)
294 .args(args)
295 .stdin(std::process::Stdio::piped())
296 .stdout(std::process::Stdio::piped())
297 .stderr(std::process::Stdio::piped())
298 .spawn()?;
299
300 let stdout = child
301 .stdout
302 .take()
303 .ok_or_else(|| anyhow::anyhow!("No stdout"))?;
304 let mut stdin = child
305 .stdin
306 .take()
307 .ok_or_else(|| anyhow::anyhow!("No stdin"))?;
308 let stderr = child
309 .stderr
310 .take()
311 .ok_or_else(|| anyhow::anyhow!("No stderr"))?;
312
313 let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
314 let (read_tx, read_rx) = mpsc::channel::<String>(100);
315
316 tokio::spawn(async move {
318 let mut reader = BufReader::new(stderr);
319 let mut line = String::new();
320 loop {
321 line.clear();
322 match reader.read_line(&mut line).await {
323 Ok(0) => break,
324 Ok(_) => {
325 let trimmed = line.trim();
326 if !trimmed.is_empty() {
327 warn!(target: "mcp_subprocess", "{trimmed}");
328 }
329 }
330 Err(_) => break,
331 }
332 }
333 });
334
335 tokio::spawn(async move {
337 while let Some(msg) = write_rx.recv().await {
338 trace!("Process TX: {}", msg);
339 if let Err(e) = stdin.write_all(format!("{}\n", msg).as_bytes()).await {
340 error!("Failed to write to process stdin: {}", e);
341 break;
342 }
343 if let Err(e) = stdin.flush().await {
344 error!("Failed to flush process stdin: {}", e);
345 break;
346 }
347 }
348 });
349
350 tokio::spawn(async move {
352 let mut reader = BufReader::new(stdout);
353 let mut line = String::new();
354 loop {
355 line.clear();
356 match reader.read_line(&mut line).await {
357 Ok(0) => break, Ok(_) => {
359 let trimmed = line.trim();
360 if !trimmed.is_empty() {
361 trace!("Process RX: {}", trimmed);
362 if read_tx.send(trimmed.to_string()).await.is_err() {
363 break;
364 }
365 }
366 }
367 Err(e) => {
368 error!("Failed to read from process stdout: {}", e);
369 break;
370 }
371 }
372 }
373 });
374
375 Ok(Self {
376 _child: child,
377 tx: write_tx,
378 rx: tokio::sync::Mutex::new(read_rx),
379 })
380 }
381
382 async fn send_json(&self, value: Value) -> Result<()> {
383 let json = serde_json::to_string(&value)?;
384 self.tx.send(json).await?;
385 Ok(())
386 }
387}
388
389#[async_trait]
390impl Transport for ProcessTransport {
391 async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
392 self.send_json(serde_json::to_value(&request)?).await
393 }
394
395 async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
396 self.send_json(serde_json::to_value(&response)?).await
397 }
398
399 async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
400 self.send_json(serde_json::to_value(¬ification)?).await
401 }
402
403 async fn receive(&self) -> Result<Option<McpMessage>> {
404 let mut rx = self.rx.lock().await;
405 match rx.recv().await {
406 Some(line) => {
407 let value: Value = serde_json::from_str(&line)?;
408 let msg = McpMessage::from_json(value)?;
409 Ok(Some(msg))
410 }
411 None => Ok(None),
412 }
413 }
414
415 async fn close(&self) -> Result<()> {
416 Ok(())
417 }
418}