Skip to main content

codetether_agent/mcp/
transport.rs

1//! MCP transport layer - stdio and SSE implementations
2
3use super::types::*;
4use anyhow::Result;
5use async_trait::async_trait;
6use serde_json::Value;
7use std::io::{BufRead, Write};
8use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
9use tokio::sync::mpsc;
10use tracing::{debug, error, trace};
11
12/// Transport trait for MCP communication
13#[async_trait]
14pub trait Transport: Send + Sync {
15    /// Send a JSON-RPC request
16    async fn send_request(&self, request: JsonRpcRequest) -> Result<()>;
17    
18    /// Send a JSON-RPC response
19    async fn send_response(&self, response: JsonRpcResponse) -> Result<()>;
20    
21    /// Send a JSON-RPC notification
22    async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()>;
23    
24    /// Receive incoming messages
25    async fn receive(&self) -> Result<Option<McpMessage>>;
26    
27    /// Close the transport
28    async fn close(&self) -> Result<()>;
29}
30
31/// Incoming MCP message (can be request, response, or notification)
32#[derive(Debug, Clone)]
33pub enum McpMessage {
34    Request(JsonRpcRequest),
35    Response(JsonRpcResponse),
36    Notification(JsonRpcNotification),
37}
38
39impl McpMessage {
40    pub fn from_json(value: Value) -> Result<Self> {
41        // Check if it has an id
42        if value.get("id").is_some() {
43            // Has id - could be request or response
44            if value.get("method").is_some() {
45                // Has method - it's a request
46                let request: JsonRpcRequest = serde_json::from_value(value)?;
47                Ok(McpMessage::Request(request))
48            } else {
49                // No method - it's a response
50                let response: JsonRpcResponse = serde_json::from_value(value)?;
51                Ok(McpMessage::Response(response))
52            }
53        } else {
54            // No id - it's a notification
55            let notification: JsonRpcNotification = serde_json::from_value(value)?;
56            Ok(McpMessage::Notification(notification))
57        }
58    }
59}
60
61/// Stdio transport for MCP (synchronous version for server mode)
62pub struct StdioTransport {
63    /// Sender channel for outgoing messages (kept alive for transport lifetime)
64    #[allow(dead_code)]
65    tx: mpsc::Sender<String>,
66    rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
67}
68
69impl Default for StdioTransport {
70    fn default() -> Self {
71        Self::new()
72    }
73}
74
75impl StdioTransport {
76    /// Create a new stdio transport
77    pub fn new() -> Self {
78        let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
79        let (read_tx, read_rx) = mpsc::channel::<String>(100);
80        
81        // Spawn writer thread (blocking IO)
82        std::thread::spawn(move || {
83            let mut stdout = std::io::stdout().lock();
84            while let Some(msg) = write_rx.blocking_recv() {
85                trace!("MCP TX: {}", msg);
86                if let Err(e) = writeln!(stdout, "{}", msg) {
87                    error!("Failed to write to stdout: {}", e);
88                    break;
89                }
90                if let Err(e) = stdout.flush() {
91                    error!("Failed to flush stdout: {}", e);
92                    break;
93                }
94            }
95        });
96        
97        // Spawn reader thread (blocking IO)
98        std::thread::spawn(move || {
99            let stdin = std::io::stdin();
100            let reader = stdin.lock();
101            for line in reader.lines() {
102                match line {
103                    Ok(msg) if !msg.is_empty() => {
104                        trace!("MCP RX: {}", msg);
105                        if read_tx.blocking_send(msg).is_err() {
106                            break;
107                        }
108                    }
109                    Ok(_) => continue, // Empty line
110                    Err(e) => {
111                        error!("Failed to read from stdin: {}", e);
112                        break;
113                    }
114                }
115            }
116        });
117        
118        Self {
119            tx: write_tx,
120            rx: tokio::sync::Mutex::new(read_rx),
121        }
122    }
123    
124    async fn send_json(&self, value: Value) -> Result<()> {
125        let json = serde_json::to_string(&value)?;
126        self.tx.send(json).await?;
127        Ok(())
128    }
129}
130
131#[async_trait]
132impl Transport for StdioTransport {
133    async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
134        self.send_json(serde_json::to_value(&request)?).await
135    }
136    
137    async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
138        self.send_json(serde_json::to_value(&response)?).await
139    }
140    
141    async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
142        self.send_json(serde_json::to_value(&notification)?).await
143    }
144    
145    async fn receive(&self) -> Result<Option<McpMessage>> {
146        let mut rx = self.rx.lock().await;
147        match rx.recv().await {
148            Some(line) => {
149                let value: Value = serde_json::from_str(&line)?;
150                let msg = McpMessage::from_json(value)?;
151                Ok(Some(msg))
152            }
153            None => Ok(None),
154        }
155    }
156    
157    async fn close(&self) -> Result<()> {
158        // Channel will close when transport is dropped
159        Ok(())
160    }
161}
162
163/// SSE transport for MCP (HTTP-based)
164pub struct SseTransport {
165    endpoint: String,
166    client: reqwest::Client,
167    _tx: mpsc::Sender<String>,
168    rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
169}
170
171impl SseTransport {
172    /// Create a new SSE transport connecting to the given endpoint
173    pub async fn new(endpoint: String) -> Result<Self> {
174        let client = reqwest::Client::new();
175        let (write_tx, _write_rx) = mpsc::channel::<String>(100);
176        let (read_tx, read_rx) = mpsc::channel::<String>(100);
177        
178        // TODO: Start SSE connection for receiving messages
179        // For now, this is a placeholder
180        let endpoint_clone = endpoint.clone();
181        let read_tx_clone = read_tx;
182        
183        tokio::spawn(async move {
184            debug!("SSE transport connecting to: {}", endpoint_clone);
185            // SSE event stream handling would go here
186            let _ = read_tx_clone;
187        });
188        
189        Ok(Self {
190            endpoint,
191            client,
192            _tx: write_tx,
193            rx: tokio::sync::Mutex::new(read_rx),
194        })
195    }
196    
197    async fn send_json(&self, value: Value) -> Result<()> {
198        let json = serde_json::to_string(&value)?;
199        debug!("SSE TX: {}", json);
200        
201        // POST to the endpoint
202        self.client
203            .post(&self.endpoint)
204            .header("Content-Type", "application/json")
205            .body(json)
206            .send()
207            .await?;
208        
209        Ok(())
210    }
211}
212
213#[async_trait]
214impl Transport for SseTransport {
215    async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
216        self.send_json(serde_json::to_value(&request)?).await
217    }
218    
219    async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
220        self.send_json(serde_json::to_value(&response)?).await
221    }
222    
223    async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
224        self.send_json(serde_json::to_value(&notification)?).await
225    }
226    
227    async fn receive(&self) -> Result<Option<McpMessage>> {
228        let mut rx = self.rx.lock().await;
229        match rx.recv().await {
230            Some(line) => {
231                let value: Value = serde_json::from_str(&line)?;
232                let msg = McpMessage::from_json(value)?;
233                Ok(Some(msg))
234            }
235            None => Ok(None),
236        }
237    }
238    
239    async fn close(&self) -> Result<()> {
240        Ok(())
241    }
242}
243
244/// Process transport for connecting to MCP servers via subprocess
245pub struct ProcessTransport {
246    _child: tokio::process::Child,
247    tx: mpsc::Sender<String>,
248    rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
249}
250
251impl ProcessTransport {
252    /// Spawn a subprocess and connect via stdio
253    pub async fn spawn(command: &str, args: &[&str]) -> Result<Self> {
254        use tokio::process::Command;
255        
256        let mut child = Command::new(command)
257            .args(args)
258            .stdin(std::process::Stdio::piped())
259            .stdout(std::process::Stdio::piped())
260            .stderr(std::process::Stdio::inherit())
261            .spawn()?;
262        
263        let stdout = child.stdout.take().ok_or_else(|| anyhow::anyhow!("No stdout"))?;
264        let mut stdin = child.stdin.take().ok_or_else(|| anyhow::anyhow!("No stdin"))?;
265        
266        let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
267        let (read_tx, read_rx) = mpsc::channel::<String>(100);
268        
269        // Writer task
270        tokio::spawn(async move {
271            while let Some(msg) = write_rx.recv().await {
272                trace!("Process TX: {}", msg);
273                if let Err(e) = stdin.write_all(format!("{}\n", msg).as_bytes()).await {
274                    error!("Failed to write to process stdin: {}", e);
275                    break;
276                }
277                if let Err(e) = stdin.flush().await {
278                    error!("Failed to flush process stdin: {}", e);
279                    break;
280                }
281            }
282        });
283        
284        // Reader task
285        tokio::spawn(async move {
286            let mut reader = BufReader::new(stdout);
287            let mut line = String::new();
288            loop {
289                line.clear();
290                match reader.read_line(&mut line).await {
291                    Ok(0) => break, // EOF
292                    Ok(_) => {
293                        let trimmed = line.trim();
294                        if !trimmed.is_empty() {
295                            trace!("Process RX: {}", trimmed);
296                            if read_tx.send(trimmed.to_string()).await.is_err() {
297                                break;
298                            }
299                        }
300                    }
301                    Err(e) => {
302                        error!("Failed to read from process stdout: {}", e);
303                        break;
304                    }
305                }
306            }
307        });
308        
309        Ok(Self {
310            _child: child,
311            tx: write_tx,
312            rx: tokio::sync::Mutex::new(read_rx),
313        })
314    }
315    
316    async fn send_json(&self, value: Value) -> Result<()> {
317        let json = serde_json::to_string(&value)?;
318        self.tx.send(json).await?;
319        Ok(())
320    }
321}
322
323#[async_trait]
324impl Transport for ProcessTransport {
325    async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
326        self.send_json(serde_json::to_value(&request)?).await
327    }
328    
329    async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
330        self.send_json(serde_json::to_value(&response)?).await
331    }
332    
333    async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
334        self.send_json(serde_json::to_value(&notification)?).await
335    }
336    
337    async fn receive(&self) -> Result<Option<McpMessage>> {
338        let mut rx = self.rx.lock().await;
339        match rx.recv().await {
340            Some(line) => {
341                let value: Value = serde_json::from_str(&line)?;
342                let msg = McpMessage::from_json(value)?;
343                Ok(Some(msg))
344            }
345            None => Ok(None),
346        }
347    }
348    
349    async fn close(&self) -> Result<()> {
350        Ok(())
351    }
352}