Skip to main content

codetether_agent/mcp/
transport.rs

1//! MCP transport layer - stdio and SSE implementations
2
3use super::types::*;
4use anyhow::Result;
5use async_trait::async_trait;
6use serde_json::Value;
7use std::io::{BufRead, Write};
8use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
9use tokio::sync::mpsc;
10use tracing::{debug, error, trace, warn};
11
12/// Transport trait for MCP communication
13#[async_trait]
14pub trait Transport: Send + Sync {
15    /// Send a JSON-RPC request
16    async fn send_request(&self, request: JsonRpcRequest) -> Result<()>;
17
18    /// Send a JSON-RPC response
19    async fn send_response(&self, response: JsonRpcResponse) -> Result<()>;
20
21    /// Send a JSON-RPC notification
22    async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()>;
23
24    /// Receive incoming messages
25    async fn receive(&self) -> Result<Option<McpMessage>>;
26
27    /// Close the transport
28    async fn close(&self) -> Result<()>;
29}
30
31/// Incoming MCP message (can be request, response, or notification)
32#[derive(Debug, Clone)]
33pub enum McpMessage {
34    Request(JsonRpcRequest),
35    Response(JsonRpcResponse),
36    Notification(JsonRpcNotification),
37}
38
39impl McpMessage {
40    pub fn from_json(value: Value) -> Result<Self> {
41        // Check if it has an id
42        if value.get("id").is_some() {
43            // Has id - could be request or response
44            if value.get("method").is_some() {
45                // Has method - it's a request
46                let request: JsonRpcRequest = serde_json::from_value(value)?;
47                Ok(McpMessage::Request(request))
48            } else {
49                // No method - it's a response
50                let response: JsonRpcResponse = serde_json::from_value(value)?;
51                Ok(McpMessage::Response(response))
52            }
53        } else {
54            // No id - it's a notification
55            let notification: JsonRpcNotification = serde_json::from_value(value)?;
56            Ok(McpMessage::Notification(notification))
57        }
58    }
59}
60
61/// Stdio transport for MCP (synchronous version for server mode)
62pub struct StdioTransport {
63    /// Sender channel for outgoing messages (kept alive for transport lifetime)
64    #[allow(dead_code)]
65    tx: mpsc::Sender<String>,
66    rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
67}
68
69/// Null transport for local/in-process MCP usage.
70///
71/// This transport intentionally does **not** spawn any stdio reader/writer threads
72/// and does not lock stdout. It is suitable for CLI commands that want to reuse the
73/// MCP tool registry (e.g. `codetether mcp list-tools` and `codetether mcp call`)
74/// without running a stdio server.
75#[derive(Debug, Default, Clone)]
76pub struct NullTransport;
77
78impl NullTransport {
79    pub fn new() -> Self {
80        Self
81    }
82}
83
84impl Default for StdioTransport {
85    fn default() -> Self {
86        Self::new()
87    }
88}
89
90impl StdioTransport {
91    /// Create a new stdio transport
92    pub fn new() -> Self {
93        let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
94        let (read_tx, read_rx) = mpsc::channel::<String>(100);
95
96        // Spawn writer thread (blocking IO)
97        std::thread::spawn(move || {
98            let mut stdout = std::io::stdout().lock();
99            while let Some(msg) = write_rx.blocking_recv() {
100                trace!("MCP TX: {}", msg);
101                if let Err(e) = writeln!(stdout, "{}", msg) {
102                    error!("Failed to write to stdout: {}", e);
103                    break;
104                }
105                if let Err(e) = stdout.flush() {
106                    error!("Failed to flush stdout: {}", e);
107                    break;
108                }
109            }
110        });
111
112        // Spawn reader thread (blocking IO)
113        std::thread::spawn(move || {
114            let stdin = std::io::stdin();
115            let reader = stdin.lock();
116            for line in reader.lines() {
117                match line {
118                    Ok(msg) if !msg.is_empty() => {
119                        trace!("MCP RX: {}", msg);
120                        if read_tx.blocking_send(msg).is_err() {
121                            break;
122                        }
123                    }
124                    Ok(_) => continue, // Empty line
125                    Err(e) => {
126                        error!("Failed to read from stdin: {}", e);
127                        break;
128                    }
129                }
130            }
131        });
132
133        Self {
134            tx: write_tx,
135            rx: tokio::sync::Mutex::new(read_rx),
136        }
137    }
138
139    async fn send_json(&self, value: Value) -> Result<()> {
140        let json = serde_json::to_string(&value)?;
141        self.tx.send(json).await?;
142        Ok(())
143    }
144}
145
146#[async_trait]
147impl Transport for StdioTransport {
148    async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
149        self.send_json(serde_json::to_value(&request)?).await
150    }
151
152    async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
153        self.send_json(serde_json::to_value(&response)?).await
154    }
155
156    async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
157        self.send_json(serde_json::to_value(&notification)?).await
158    }
159
160    async fn receive(&self) -> Result<Option<McpMessage>> {
161        let mut rx = self.rx.lock().await;
162        match rx.recv().await {
163            Some(line) => {
164                let value: Value = serde_json::from_str(&line)?;
165                let msg = McpMessage::from_json(value)?;
166                Ok(Some(msg))
167            }
168            None => Ok(None),
169        }
170    }
171
172    async fn close(&self) -> Result<()> {
173        // Channel will close when transport is dropped
174        Ok(())
175    }
176}
177
178#[async_trait]
179impl Transport for NullTransport {
180    async fn send_request(&self, _request: JsonRpcRequest) -> Result<()> {
181        Ok(())
182    }
183
184    async fn send_response(&self, _response: JsonRpcResponse) -> Result<()> {
185        Ok(())
186    }
187
188    async fn send_notification(&self, _notification: JsonRpcNotification) -> Result<()> {
189        Ok(())
190    }
191
192    async fn receive(&self) -> Result<Option<McpMessage>> {
193        Ok(None)
194    }
195
196    async fn close(&self) -> Result<()> {
197        Ok(())
198    }
199}
200
201/// SSE transport for MCP (HTTP-based)
202pub struct SseTransport {
203    endpoint: String,
204    client: reqwest::Client,
205    _tx: mpsc::Sender<String>,
206    rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
207}
208
209impl SseTransport {
210    /// Create a new SSE transport connecting to the given endpoint
211    pub async fn new(endpoint: String) -> Result<Self> {
212        let client = reqwest::Client::new();
213        let (write_tx, _write_rx) = mpsc::channel::<String>(100);
214        let (read_tx, read_rx) = mpsc::channel::<String>(100);
215
216        // TODO: Start SSE connection for receiving messages
217        // For now, this is a placeholder
218        let endpoint_clone = endpoint.clone();
219        let read_tx_clone = read_tx;
220
221        tokio::spawn(async move {
222            debug!("SSE transport connecting to: {}", endpoint_clone);
223            // SSE event stream handling would go here
224            let _ = read_tx_clone;
225        });
226
227        Ok(Self {
228            endpoint,
229            client,
230            _tx: write_tx,
231            rx: tokio::sync::Mutex::new(read_rx),
232        })
233    }
234
235    async fn send_json(&self, value: Value) -> Result<()> {
236        let json = serde_json::to_string(&value)?;
237        debug!("SSE TX: {}", json);
238
239        // POST to the endpoint
240        self.client
241            .post(&self.endpoint)
242            .header("Content-Type", "application/json")
243            .body(json)
244            .send()
245            .await?;
246
247        Ok(())
248    }
249}
250
251#[async_trait]
252impl Transport for SseTransport {
253    async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
254        self.send_json(serde_json::to_value(&request)?).await
255    }
256
257    async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
258        self.send_json(serde_json::to_value(&response)?).await
259    }
260
261    async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
262        self.send_json(serde_json::to_value(&notification)?).await
263    }
264
265    async fn receive(&self) -> Result<Option<McpMessage>> {
266        let mut rx = self.rx.lock().await;
267        match rx.recv().await {
268            Some(line) => {
269                let value: Value = serde_json::from_str(&line)?;
270                let msg = McpMessage::from_json(value)?;
271                Ok(Some(msg))
272            }
273            None => Ok(None),
274        }
275    }
276
277    async fn close(&self) -> Result<()> {
278        Ok(())
279    }
280}
281
282/// Process transport for connecting to MCP servers via subprocess
283pub struct ProcessTransport {
284    _child: tokio::process::Child,
285    tx: mpsc::Sender<String>,
286    rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
287}
288
289impl ProcessTransport {
290    /// Spawn a subprocess and connect via stdio
291    pub async fn spawn(command: &str, args: &[&str]) -> Result<Self> {
292        use tokio::process::Command;
293
294        let mut child = Command::new(command)
295            .args(args)
296            .stdin(std::process::Stdio::piped())
297            .stdout(std::process::Stdio::piped())
298            .stderr(std::process::Stdio::piped())
299            .spawn()?;
300
301        let stdout = child
302            .stdout
303            .take()
304            .ok_or_else(|| anyhow::anyhow!("No stdout"))?;
305        let mut stdin = child
306            .stdin
307            .take()
308            .ok_or_else(|| anyhow::anyhow!("No stdin"))?;
309        let stderr = child
310            .stderr
311            .take()
312            .ok_or_else(|| anyhow::anyhow!("No stderr"))?;
313
314        let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
315        let (read_tx, read_rx) = mpsc::channel::<String>(100);
316
317        // Stderr drain task — capture instead of inheriting so it doesn't corrupt the TUI
318        tokio::spawn(async move {
319            let mut reader = BufReader::new(stderr);
320            let mut line = String::new();
321            loop {
322                line.clear();
323                match reader.read_line(&mut line).await {
324                    Ok(0) => break,
325                    Ok(_) => {
326                        let trimmed = line.trim();
327                        if !trimmed.is_empty() {
328                            warn!(target: "mcp_subprocess", "{trimmed}");
329                        }
330                    }
331                    Err(_) => break,
332                }
333            }
334        });
335
336        // Writer task
337        tokio::spawn(async move {
338            while let Some(msg) = write_rx.recv().await {
339                trace!("Process TX: {}", msg);
340                if let Err(e) = stdin.write_all(format!("{}\n", msg).as_bytes()).await {
341                    error!("Failed to write to process stdin: {}", e);
342                    break;
343                }
344                if let Err(e) = stdin.flush().await {
345                    error!("Failed to flush process stdin: {}", e);
346                    break;
347                }
348            }
349        });
350
351        // Reader task
352        tokio::spawn(async move {
353            let mut reader = BufReader::new(stdout);
354            let mut line = String::new();
355            loop {
356                line.clear();
357                match reader.read_line(&mut line).await {
358                    Ok(0) => break, // EOF
359                    Ok(_) => {
360                        let trimmed = line.trim();
361                        if !trimmed.is_empty() {
362                            trace!("Process RX: {}", trimmed);
363                            if read_tx.send(trimmed.to_string()).await.is_err() {
364                                break;
365                            }
366                        }
367                    }
368                    Err(e) => {
369                        error!("Failed to read from process stdout: {}", e);
370                        break;
371                    }
372                }
373            }
374        });
375
376        Ok(Self {
377            _child: child,
378            tx: write_tx,
379            rx: tokio::sync::Mutex::new(read_rx),
380        })
381    }
382
383    async fn send_json(&self, value: Value) -> Result<()> {
384        let json = serde_json::to_string(&value)?;
385        self.tx.send(json).await?;
386        Ok(())
387    }
388}
389
390#[async_trait]
391impl Transport for ProcessTransport {
392    async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
393        self.send_json(serde_json::to_value(&request)?).await
394    }
395
396    async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
397        self.send_json(serde_json::to_value(&response)?).await
398    }
399
400    async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
401        self.send_json(serde_json::to_value(&notification)?).await
402    }
403
404    async fn receive(&self) -> Result<Option<McpMessage>> {
405        let mut rx = self.rx.lock().await;
406        match rx.recv().await {
407            Some(line) => {
408                let value: Value = serde_json::from_str(&line)?;
409                let msg = McpMessage::from_json(value)?;
410                Ok(Some(msg))
411            }
412            None => Ok(None),
413        }
414    }
415
416    async fn close(&self) -> Result<()> {
417        Ok(())
418    }
419}