Skip to main content

codetether_agent/mcp/
transport.rs

1//! MCP transport layer - stdio and SSE implementations
2
3use super::types::*;
4use anyhow::Result;
5use async_trait::async_trait;
6use serde_json::Value;
7use std::io::{BufRead, Write};
8use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
9use tokio::sync::mpsc;
10use tracing::{debug, error, trace, warn};
11
12/// Transport trait for MCP communication
13#[async_trait]
14pub trait Transport: Send + Sync {
15    /// Send a JSON-RPC request
16    async fn send_request(&self, request: JsonRpcRequest) -> Result<()>;
17
18    /// Send a JSON-RPC response
19    async fn send_response(&self, response: JsonRpcResponse) -> Result<()>;
20
21    /// Send a JSON-RPC notification
22    async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()>;
23
24    /// Receive incoming messages
25    async fn receive(&self) -> Result<Option<McpMessage>>;
26
27    /// Close the transport
28    async fn close(&self) -> Result<()>;
29}
30
31/// Incoming MCP message (can be request, response, or notification)
32#[derive(Debug, Clone)]
33pub enum McpMessage {
34    Request(JsonRpcRequest),
35    Response(JsonRpcResponse),
36    Notification(JsonRpcNotification),
37}
38
39impl McpMessage {
40    pub fn from_json(value: Value) -> Result<Self> {
41        // Check if it has an id
42        if value.get("id").is_some() {
43            // Has id - could be request or response
44            if value.get("method").is_some() {
45                // Has method - it's a request
46                let request: JsonRpcRequest = serde_json::from_value(value)?;
47                Ok(McpMessage::Request(request))
48            } else {
49                // No method - it's a response
50                let response: JsonRpcResponse = serde_json::from_value(value)?;
51                Ok(McpMessage::Response(response))
52            }
53        } else {
54            // No id - it's a notification
55            let notification: JsonRpcNotification = serde_json::from_value(value)?;
56            Ok(McpMessage::Notification(notification))
57        }
58    }
59}
60
61/// Stdio transport for MCP (synchronous version for server mode)
62pub struct StdioTransport {
63    /// Sender channel for outgoing messages (kept alive for transport lifetime)
64    tx: mpsc::Sender<String>,
65    rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
66}
67
68/// Null transport for local/in-process MCP usage.
69///
70/// This transport intentionally does **not** spawn any stdio reader/writer threads
71/// and does not lock stdout. It is suitable for CLI commands that want to reuse the
72/// MCP tool registry (e.g. `codetether mcp list-tools` and `codetether mcp call`)
73/// without running a stdio server.
74#[derive(Debug, Default, Clone)]
75pub struct NullTransport;
76
77impl NullTransport {
78    pub fn new() -> Self {
79        Self
80    }
81}
82
83impl Default for StdioTransport {
84    fn default() -> Self {
85        Self::new()
86    }
87}
88
89impl StdioTransport {
90    /// Create a new stdio transport
91    pub fn new() -> Self {
92        let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
93        let (read_tx, read_rx) = mpsc::channel::<String>(100);
94
95        // Spawn writer thread (blocking IO)
96        std::thread::spawn(move || {
97            let mut stdout = std::io::stdout().lock();
98            while let Some(msg) = write_rx.blocking_recv() {
99                trace!("MCP TX: {}", msg);
100                if let Err(e) = writeln!(stdout, "{}", msg) {
101                    error!("Failed to write to stdout: {}", e);
102                    break;
103                }
104                if let Err(e) = stdout.flush() {
105                    error!("Failed to flush stdout: {}", e);
106                    break;
107                }
108            }
109        });
110
111        // Spawn reader thread (blocking IO)
112        std::thread::spawn(move || {
113            let stdin = std::io::stdin();
114            let reader = stdin.lock();
115            for line in reader.lines() {
116                match line {
117                    Ok(msg) if !msg.is_empty() => {
118                        trace!("MCP RX: {}", msg);
119                        if read_tx.blocking_send(msg).is_err() {
120                            break;
121                        }
122                    }
123                    Ok(_) => continue, // Empty line
124                    Err(e) => {
125                        error!("Failed to read from stdin: {}", e);
126                        break;
127                    }
128                }
129            }
130        });
131
132        Self {
133            tx: write_tx,
134            rx: tokio::sync::Mutex::new(read_rx),
135        }
136    }
137
138    async fn send_json(&self, value: Value) -> Result<()> {
139        let json = serde_json::to_string(&value)?;
140        self.tx.send(json).await?;
141        Ok(())
142    }
143}
144
145#[async_trait]
146impl Transport for StdioTransport {
147    async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
148        self.send_json(serde_json::to_value(&request)?).await
149    }
150
151    async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
152        self.send_json(serde_json::to_value(&response)?).await
153    }
154
155    async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
156        self.send_json(serde_json::to_value(&notification)?).await
157    }
158
159    async fn receive(&self) -> Result<Option<McpMessage>> {
160        let mut rx = self.rx.lock().await;
161        match rx.recv().await {
162            Some(line) => {
163                let value: Value = serde_json::from_str(&line)?;
164                let msg = McpMessage::from_json(value)?;
165                Ok(Some(msg))
166            }
167            None => Ok(None),
168        }
169    }
170
171    async fn close(&self) -> Result<()> {
172        // Channel will close when transport is dropped
173        Ok(())
174    }
175}
176
177#[async_trait]
178impl Transport for NullTransport {
179    async fn send_request(&self, _request: JsonRpcRequest) -> Result<()> {
180        Ok(())
181    }
182
183    async fn send_response(&self, _response: JsonRpcResponse) -> Result<()> {
184        Ok(())
185    }
186
187    async fn send_notification(&self, _notification: JsonRpcNotification) -> Result<()> {
188        Ok(())
189    }
190
191    async fn receive(&self) -> Result<Option<McpMessage>> {
192        Ok(None)
193    }
194
195    async fn close(&self) -> Result<()> {
196        Ok(())
197    }
198}
199
200/// SSE transport for MCP (HTTP-based)
201pub struct SseTransport {
202    endpoint: String,
203    client: reqwest::Client,
204    _tx: mpsc::Sender<String>,
205    rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
206}
207
208impl SseTransport {
209    /// Create a new SSE transport connecting to the given endpoint
210    pub async fn new(endpoint: String) -> Result<Self> {
211        let client = reqwest::Client::new();
212        let (write_tx, _write_rx) = mpsc::channel::<String>(100);
213        let (read_tx, read_rx) = mpsc::channel::<String>(100);
214
215        // TODO: Start SSE connection for receiving messages
216        // For now, this is a placeholder
217        let endpoint_clone = endpoint.clone();
218        let read_tx_clone = read_tx;
219
220        tokio::spawn(async move {
221            debug!("SSE transport connecting to: {}", endpoint_clone);
222            // SSE event stream handling would go here
223            let _ = read_tx_clone;
224        });
225
226        Ok(Self {
227            endpoint,
228            client,
229            _tx: write_tx,
230            rx: tokio::sync::Mutex::new(read_rx),
231        })
232    }
233
234    async fn send_json(&self, value: Value) -> Result<()> {
235        let json = serde_json::to_string(&value)?;
236        debug!("SSE TX: {}", json);
237
238        // POST to the endpoint
239        self.client
240            .post(&self.endpoint)
241            .header("Content-Type", "application/json")
242            .body(json)
243            .send()
244            .await?;
245
246        Ok(())
247    }
248}
249
250#[async_trait]
251impl Transport for SseTransport {
252    async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
253        self.send_json(serde_json::to_value(&request)?).await
254    }
255
256    async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
257        self.send_json(serde_json::to_value(&response)?).await
258    }
259
260    async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
261        self.send_json(serde_json::to_value(&notification)?).await
262    }
263
264    async fn receive(&self) -> Result<Option<McpMessage>> {
265        let mut rx = self.rx.lock().await;
266        match rx.recv().await {
267            Some(line) => {
268                let value: Value = serde_json::from_str(&line)?;
269                let msg = McpMessage::from_json(value)?;
270                Ok(Some(msg))
271            }
272            None => Ok(None),
273        }
274    }
275
276    async fn close(&self) -> Result<()> {
277        Ok(())
278    }
279}
280
281/// Process transport for connecting to MCP servers via subprocess
282pub struct ProcessTransport {
283    _child: tokio::process::Child,
284    tx: mpsc::Sender<String>,
285    rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
286}
287
288impl ProcessTransport {
289    /// Spawn a subprocess and connect via stdio
290    pub async fn spawn(command: &str, args: &[&str]) -> Result<Self> {
291        use tokio::process::Command;
292
293        let mut child = Command::new(command)
294            .args(args)
295            .stdin(std::process::Stdio::piped())
296            .stdout(std::process::Stdio::piped())
297            .stderr(std::process::Stdio::piped())
298            .spawn()?;
299
300        let stdout = child
301            .stdout
302            .take()
303            .ok_or_else(|| anyhow::anyhow!("No stdout"))?;
304        let mut stdin = child
305            .stdin
306            .take()
307            .ok_or_else(|| anyhow::anyhow!("No stdin"))?;
308        let stderr = child
309            .stderr
310            .take()
311            .ok_or_else(|| anyhow::anyhow!("No stderr"))?;
312
313        let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
314        let (read_tx, read_rx) = mpsc::channel::<String>(100);
315
316        // Stderr drain task — capture instead of inheriting so it doesn't corrupt the TUI
317        tokio::spawn(async move {
318            let mut reader = BufReader::new(stderr);
319            let mut line = String::new();
320            loop {
321                line.clear();
322                match reader.read_line(&mut line).await {
323                    Ok(0) => break,
324                    Ok(_) => {
325                        let trimmed = line.trim();
326                        if !trimmed.is_empty() {
327                            warn!(target: "mcp_subprocess", "{trimmed}");
328                        }
329                    }
330                    Err(_) => break,
331                }
332            }
333        });
334
335        // Writer task
336        tokio::spawn(async move {
337            while let Some(msg) = write_rx.recv().await {
338                trace!("Process TX: {}", msg);
339                if let Err(e) = stdin.write_all(format!("{}\n", msg).as_bytes()).await {
340                    error!("Failed to write to process stdin: {}", e);
341                    break;
342                }
343                if let Err(e) = stdin.flush().await {
344                    error!("Failed to flush process stdin: {}", e);
345                    break;
346                }
347            }
348        });
349
350        // Reader task
351        tokio::spawn(async move {
352            let mut reader = BufReader::new(stdout);
353            let mut line = String::new();
354            loop {
355                line.clear();
356                match reader.read_line(&mut line).await {
357                    Ok(0) => break, // EOF
358                    Ok(_) => {
359                        let trimmed = line.trim();
360                        if !trimmed.is_empty() {
361                            trace!("Process RX: {}", trimmed);
362                            if read_tx.send(trimmed.to_string()).await.is_err() {
363                                break;
364                            }
365                        }
366                    }
367                    Err(e) => {
368                        error!("Failed to read from process stdout: {}", e);
369                        break;
370                    }
371                }
372            }
373        });
374
375        Ok(Self {
376            _child: child,
377            tx: write_tx,
378            rx: tokio::sync::Mutex::new(read_rx),
379        })
380    }
381
382    async fn send_json(&self, value: Value) -> Result<()> {
383        let json = serde_json::to_string(&value)?;
384        self.tx.send(json).await?;
385        Ok(())
386    }
387}
388
389#[async_trait]
390impl Transport for ProcessTransport {
391    async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
392        self.send_json(serde_json::to_value(&request)?).await
393    }
394
395    async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
396        self.send_json(serde_json::to_value(&response)?).await
397    }
398
399    async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
400        self.send_json(serde_json::to_value(&notification)?).await
401    }
402
403    async fn receive(&self) -> Result<Option<McpMessage>> {
404        let mut rx = self.rx.lock().await;
405        match rx.recv().await {
406            Some(line) => {
407                let value: Value = serde_json::from_str(&line)?;
408                let msg = McpMessage::from_json(value)?;
409                Ok(Some(msg))
410            }
411            None => Ok(None),
412        }
413    }
414
415    async fn close(&self) -> Result<()> {
416        Ok(())
417    }
418}