Skip to main content

codetether_agent/mcp/
transport.rs

1//! MCP transport layer - stdio and SSE implementations
2
3use super::types::*;
4use anyhow::Result;
5use async_trait::async_trait;
6use serde_json::Value;
7use std::io::{BufRead, Write};
8use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
9use tokio::sync::mpsc;
10use tracing::{debug, error, trace, warn};
11
12/// Transport trait for MCP communication
13#[async_trait]
14pub trait Transport: Send + Sync {
15    /// Send a JSON-RPC request
16    async fn send_request(&self, request: JsonRpcRequest) -> Result<()>;
17
18    /// Send a JSON-RPC response
19    async fn send_response(&self, response: JsonRpcResponse) -> Result<()>;
20
21    /// Send a JSON-RPC notification
22    async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()>;
23
24    /// Receive incoming messages
25    async fn receive(&self) -> Result<Option<McpMessage>>;
26
27    /// Close the transport
28    async fn close(&self) -> Result<()>;
29}
30
31/// Incoming MCP message (can be request, response, or notification)
32#[derive(Debug, Clone)]
33pub enum McpMessage {
34    Request(JsonRpcRequest),
35    Response(JsonRpcResponse),
36    Notification(JsonRpcNotification),
37}
38
39impl McpMessage {
40    pub fn from_json(value: Value) -> Result<Self> {
41        // Check if it has an id
42        if value.get("id").is_some() {
43            // Has id - could be request or response
44            if value.get("method").is_some() {
45                // Has method - it's a request
46                let request: JsonRpcRequest = serde_json::from_value(value)?;
47                Ok(McpMessage::Request(request))
48            } else {
49                // No method - it's a response
50                let response: JsonRpcResponse = serde_json::from_value(value)?;
51                Ok(McpMessage::Response(response))
52            }
53        } else {
54            // No id - it's a notification
55            let notification: JsonRpcNotification = serde_json::from_value(value)?;
56            Ok(McpMessage::Notification(notification))
57        }
58    }
59}
60
61/// Stdio transport for MCP (synchronous version for server mode)
62pub struct StdioTransport {
63    /// Sender channel for outgoing messages (kept alive for transport lifetime)
64    #[allow(dead_code)]
65    tx: mpsc::Sender<String>,
66    rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
67}
68
69impl Default for StdioTransport {
70    fn default() -> Self {
71        Self::new()
72    }
73}
74
75impl StdioTransport {
76    /// Create a new stdio transport
77    pub fn new() -> Self {
78        let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
79        let (read_tx, read_rx) = mpsc::channel::<String>(100);
80
81        // Spawn writer thread (blocking IO)
82        std::thread::spawn(move || {
83            let mut stdout = std::io::stdout().lock();
84            while let Some(msg) = write_rx.blocking_recv() {
85                trace!("MCP TX: {}", msg);
86                if let Err(e) = writeln!(stdout, "{}", msg) {
87                    error!("Failed to write to stdout: {}", e);
88                    break;
89                }
90                if let Err(e) = stdout.flush() {
91                    error!("Failed to flush stdout: {}", e);
92                    break;
93                }
94            }
95        });
96
97        // Spawn reader thread (blocking IO)
98        std::thread::spawn(move || {
99            let stdin = std::io::stdin();
100            let reader = stdin.lock();
101            for line in reader.lines() {
102                match line {
103                    Ok(msg) if !msg.is_empty() => {
104                        trace!("MCP RX: {}", msg);
105                        if read_tx.blocking_send(msg).is_err() {
106                            break;
107                        }
108                    }
109                    Ok(_) => continue, // Empty line
110                    Err(e) => {
111                        error!("Failed to read from stdin: {}", e);
112                        break;
113                    }
114                }
115            }
116        });
117
118        Self {
119            tx: write_tx,
120            rx: tokio::sync::Mutex::new(read_rx),
121        }
122    }
123
124    async fn send_json(&self, value: Value) -> Result<()> {
125        let json = serde_json::to_string(&value)?;
126        self.tx.send(json).await?;
127        Ok(())
128    }
129}
130
131#[async_trait]
132impl Transport for StdioTransport {
133    async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
134        self.send_json(serde_json::to_value(&request)?).await
135    }
136
137    async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
138        self.send_json(serde_json::to_value(&response)?).await
139    }
140
141    async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
142        self.send_json(serde_json::to_value(&notification)?).await
143    }
144
145    async fn receive(&self) -> Result<Option<McpMessage>> {
146        let mut rx = self.rx.lock().await;
147        match rx.recv().await {
148            Some(line) => {
149                let value: Value = serde_json::from_str(&line)?;
150                let msg = McpMessage::from_json(value)?;
151                Ok(Some(msg))
152            }
153            None => Ok(None),
154        }
155    }
156
157    async fn close(&self) -> Result<()> {
158        // Channel will close when transport is dropped
159        Ok(())
160    }
161}
162
163/// SSE transport for MCP (HTTP-based)
164pub struct SseTransport {
165    endpoint: String,
166    client: reqwest::Client,
167    _tx: mpsc::Sender<String>,
168    rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
169}
170
171impl SseTransport {
172    /// Create a new SSE transport connecting to the given endpoint
173    pub async fn new(endpoint: String) -> Result<Self> {
174        let client = reqwest::Client::new();
175        let (write_tx, _write_rx) = mpsc::channel::<String>(100);
176        let (read_tx, read_rx) = mpsc::channel::<String>(100);
177
178        // TODO: Start SSE connection for receiving messages
179        // For now, this is a placeholder
180        let endpoint_clone = endpoint.clone();
181        let read_tx_clone = read_tx;
182
183        tokio::spawn(async move {
184            debug!("SSE transport connecting to: {}", endpoint_clone);
185            // SSE event stream handling would go here
186            let _ = read_tx_clone;
187        });
188
189        Ok(Self {
190            endpoint,
191            client,
192            _tx: write_tx,
193            rx: tokio::sync::Mutex::new(read_rx),
194        })
195    }
196
197    async fn send_json(&self, value: Value) -> Result<()> {
198        let json = serde_json::to_string(&value)?;
199        debug!("SSE TX: {}", json);
200
201        // POST to the endpoint
202        self.client
203            .post(&self.endpoint)
204            .header("Content-Type", "application/json")
205            .body(json)
206            .send()
207            .await?;
208
209        Ok(())
210    }
211}
212
213#[async_trait]
214impl Transport for SseTransport {
215    async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
216        self.send_json(serde_json::to_value(&request)?).await
217    }
218
219    async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
220        self.send_json(serde_json::to_value(&response)?).await
221    }
222
223    async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
224        self.send_json(serde_json::to_value(&notification)?).await
225    }
226
227    async fn receive(&self) -> Result<Option<McpMessage>> {
228        let mut rx = self.rx.lock().await;
229        match rx.recv().await {
230            Some(line) => {
231                let value: Value = serde_json::from_str(&line)?;
232                let msg = McpMessage::from_json(value)?;
233                Ok(Some(msg))
234            }
235            None => Ok(None),
236        }
237    }
238
239    async fn close(&self) -> Result<()> {
240        Ok(())
241    }
242}
243
244/// Process transport for connecting to MCP servers via subprocess
245pub struct ProcessTransport {
246    _child: tokio::process::Child,
247    tx: mpsc::Sender<String>,
248    rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
249}
250
251impl ProcessTransport {
252    /// Spawn a subprocess and connect via stdio
253    pub async fn spawn(command: &str, args: &[&str]) -> Result<Self> {
254        use tokio::process::Command;
255
256        let mut child = Command::new(command)
257            .args(args)
258            .stdin(std::process::Stdio::piped())
259            .stdout(std::process::Stdio::piped())
260            .stderr(std::process::Stdio::piped())
261            .spawn()?;
262
263        let stdout = child
264            .stdout
265            .take()
266            .ok_or_else(|| anyhow::anyhow!("No stdout"))?;
267        let mut stdin = child
268            .stdin
269            .take()
270            .ok_or_else(|| anyhow::anyhow!("No stdin"))?;
271        let stderr = child
272            .stderr
273            .take()
274            .ok_or_else(|| anyhow::anyhow!("No stderr"))?;
275
276        let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
277        let (read_tx, read_rx) = mpsc::channel::<String>(100);
278
279        // Stderr drain task — capture instead of inheriting so it doesn't corrupt the TUI
280        tokio::spawn(async move {
281            let mut reader = BufReader::new(stderr);
282            let mut line = String::new();
283            loop {
284                line.clear();
285                match reader.read_line(&mut line).await {
286                    Ok(0) => break,
287                    Ok(_) => {
288                        let trimmed = line.trim();
289                        if !trimmed.is_empty() {
290                            warn!(target: "mcp_subprocess", "{trimmed}");
291                        }
292                    }
293                    Err(_) => break,
294                }
295            }
296        });
297
298        // Writer task
299        tokio::spawn(async move {
300            while let Some(msg) = write_rx.recv().await {
301                trace!("Process TX: {}", msg);
302                if let Err(e) = stdin.write_all(format!("{}\n", msg).as_bytes()).await {
303                    error!("Failed to write to process stdin: {}", e);
304                    break;
305                }
306                if let Err(e) = stdin.flush().await {
307                    error!("Failed to flush process stdin: {}", e);
308                    break;
309                }
310            }
311        });
312
313        // Reader task
314        tokio::spawn(async move {
315            let mut reader = BufReader::new(stdout);
316            let mut line = String::new();
317            loop {
318                line.clear();
319                match reader.read_line(&mut line).await {
320                    Ok(0) => break, // EOF
321                    Ok(_) => {
322                        let trimmed = line.trim();
323                        if !trimmed.is_empty() {
324                            trace!("Process RX: {}", trimmed);
325                            if read_tx.send(trimmed.to_string()).await.is_err() {
326                                break;
327                            }
328                        }
329                    }
330                    Err(e) => {
331                        error!("Failed to read from process stdout: {}", e);
332                        break;
333                    }
334                }
335            }
336        });
337
338        Ok(Self {
339            _child: child,
340            tx: write_tx,
341            rx: tokio::sync::Mutex::new(read_rx),
342        })
343    }
344
345    async fn send_json(&self, value: Value) -> Result<()> {
346        let json = serde_json::to_string(&value)?;
347        self.tx.send(json).await?;
348        Ok(())
349    }
350}
351
352#[async_trait]
353impl Transport for ProcessTransport {
354    async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
355        self.send_json(serde_json::to_value(&request)?).await
356    }
357
358    async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
359        self.send_json(serde_json::to_value(&response)?).await
360    }
361
362    async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
363        self.send_json(serde_json::to_value(&notification)?).await
364    }
365
366    async fn receive(&self) -> Result<Option<McpMessage>> {
367        let mut rx = self.rx.lock().await;
368        match rx.recv().await {
369            Some(line) => {
370                let value: Value = serde_json::from_str(&line)?;
371                let msg = McpMessage::from_json(value)?;
372                Ok(Some(msg))
373            }
374            None => Ok(None),
375        }
376    }
377
378    async fn close(&self) -> Result<()> {
379        Ok(())
380    }
381}