codetether_agent/mcp/
transport.rs1use super::types::*;
4use anyhow::Result;
5use async_trait::async_trait;
6use serde_json::Value;
7use std::io::{BufRead, Write};
8use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
9use tokio::sync::mpsc;
10use tracing::{debug, error, trace};
11
12#[async_trait]
14pub trait Transport: Send + Sync {
15 async fn send_request(&self, request: JsonRpcRequest) -> Result<()>;
17
18 async fn send_response(&self, response: JsonRpcResponse) -> Result<()>;
20
21 async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()>;
23
24 async fn receive(&self) -> Result<Option<McpMessage>>;
26
27 async fn close(&self) -> Result<()>;
29}
30
31#[derive(Debug, Clone)]
33pub enum McpMessage {
34 Request(JsonRpcRequest),
35 Response(JsonRpcResponse),
36 Notification(JsonRpcNotification),
37}
38
39impl McpMessage {
40 pub fn from_json(value: Value) -> Result<Self> {
41 if value.get("id").is_some() {
43 if value.get("method").is_some() {
45 let request: JsonRpcRequest = serde_json::from_value(value)?;
47 Ok(McpMessage::Request(request))
48 } else {
49 let response: JsonRpcResponse = serde_json::from_value(value)?;
51 Ok(McpMessage::Response(response))
52 }
53 } else {
54 let notification: JsonRpcNotification = serde_json::from_value(value)?;
56 Ok(McpMessage::Notification(notification))
57 }
58 }
59}
60
61pub struct StdioTransport {
63 #[allow(dead_code)]
65 tx: mpsc::Sender<String>,
66 rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
67}
68
69impl Default for StdioTransport {
70 fn default() -> Self {
71 Self::new()
72 }
73}
74
75impl StdioTransport {
76 pub fn new() -> Self {
78 let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
79 let (read_tx, read_rx) = mpsc::channel::<String>(100);
80
81 std::thread::spawn(move || {
83 let mut stdout = std::io::stdout().lock();
84 while let Some(msg) = write_rx.blocking_recv() {
85 trace!("MCP TX: {}", msg);
86 if let Err(e) = writeln!(stdout, "{}", msg) {
87 error!("Failed to write to stdout: {}", e);
88 break;
89 }
90 if let Err(e) = stdout.flush() {
91 error!("Failed to flush stdout: {}", e);
92 break;
93 }
94 }
95 });
96
97 std::thread::spawn(move || {
99 let stdin = std::io::stdin();
100 let reader = stdin.lock();
101 for line in reader.lines() {
102 match line {
103 Ok(msg) if !msg.is_empty() => {
104 trace!("MCP RX: {}", msg);
105 if read_tx.blocking_send(msg).is_err() {
106 break;
107 }
108 }
109 Ok(_) => continue, Err(e) => {
111 error!("Failed to read from stdin: {}", e);
112 break;
113 }
114 }
115 }
116 });
117
118 Self {
119 tx: write_tx,
120 rx: tokio::sync::Mutex::new(read_rx),
121 }
122 }
123
124 async fn send_json(&self, value: Value) -> Result<()> {
125 let json = serde_json::to_string(&value)?;
126 self.tx.send(json).await?;
127 Ok(())
128 }
129}
130
131#[async_trait]
132impl Transport for StdioTransport {
133 async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
134 self.send_json(serde_json::to_value(&request)?).await
135 }
136
137 async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
138 self.send_json(serde_json::to_value(&response)?).await
139 }
140
141 async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
142 self.send_json(serde_json::to_value(¬ification)?).await
143 }
144
145 async fn receive(&self) -> Result<Option<McpMessage>> {
146 let mut rx = self.rx.lock().await;
147 match rx.recv().await {
148 Some(line) => {
149 let value: Value = serde_json::from_str(&line)?;
150 let msg = McpMessage::from_json(value)?;
151 Ok(Some(msg))
152 }
153 None => Ok(None),
154 }
155 }
156
157 async fn close(&self) -> Result<()> {
158 Ok(())
160 }
161}
162
163pub struct SseTransport {
165 endpoint: String,
166 client: reqwest::Client,
167 _tx: mpsc::Sender<String>,
168 rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
169}
170
171impl SseTransport {
172 pub async fn new(endpoint: String) -> Result<Self> {
174 let client = reqwest::Client::new();
175 let (write_tx, _write_rx) = mpsc::channel::<String>(100);
176 let (read_tx, read_rx) = mpsc::channel::<String>(100);
177
178 let endpoint_clone = endpoint.clone();
181 let read_tx_clone = read_tx;
182
183 tokio::spawn(async move {
184 debug!("SSE transport connecting to: {}", endpoint_clone);
185 let _ = read_tx_clone;
187 });
188
189 Ok(Self {
190 endpoint,
191 client,
192 _tx: write_tx,
193 rx: tokio::sync::Mutex::new(read_rx),
194 })
195 }
196
197 async fn send_json(&self, value: Value) -> Result<()> {
198 let json = serde_json::to_string(&value)?;
199 debug!("SSE TX: {}", json);
200
201 self.client
203 .post(&self.endpoint)
204 .header("Content-Type", "application/json")
205 .body(json)
206 .send()
207 .await?;
208
209 Ok(())
210 }
211}
212
213#[async_trait]
214impl Transport for SseTransport {
215 async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
216 self.send_json(serde_json::to_value(&request)?).await
217 }
218
219 async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
220 self.send_json(serde_json::to_value(&response)?).await
221 }
222
223 async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
224 self.send_json(serde_json::to_value(¬ification)?).await
225 }
226
227 async fn receive(&self) -> Result<Option<McpMessage>> {
228 let mut rx = self.rx.lock().await;
229 match rx.recv().await {
230 Some(line) => {
231 let value: Value = serde_json::from_str(&line)?;
232 let msg = McpMessage::from_json(value)?;
233 Ok(Some(msg))
234 }
235 None => Ok(None),
236 }
237 }
238
239 async fn close(&self) -> Result<()> {
240 Ok(())
241 }
242}
243
244pub struct ProcessTransport {
246 _child: tokio::process::Child,
247 tx: mpsc::Sender<String>,
248 rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
249}
250
251impl ProcessTransport {
252 pub async fn spawn(command: &str, args: &[&str]) -> Result<Self> {
254 use tokio::process::Command;
255
256 let mut child = Command::new(command)
257 .args(args)
258 .stdin(std::process::Stdio::piped())
259 .stdout(std::process::Stdio::piped())
260 .stderr(std::process::Stdio::inherit())
261 .spawn()?;
262
263 let stdout = child
264 .stdout
265 .take()
266 .ok_or_else(|| anyhow::anyhow!("No stdout"))?;
267 let mut stdin = child
268 .stdin
269 .take()
270 .ok_or_else(|| anyhow::anyhow!("No stdin"))?;
271
272 let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
273 let (read_tx, read_rx) = mpsc::channel::<String>(100);
274
275 tokio::spawn(async move {
277 while let Some(msg) = write_rx.recv().await {
278 trace!("Process TX: {}", msg);
279 if let Err(e) = stdin.write_all(format!("{}\n", msg).as_bytes()).await {
280 error!("Failed to write to process stdin: {}", e);
281 break;
282 }
283 if let Err(e) = stdin.flush().await {
284 error!("Failed to flush process stdin: {}", e);
285 break;
286 }
287 }
288 });
289
290 tokio::spawn(async move {
292 let mut reader = BufReader::new(stdout);
293 let mut line = String::new();
294 loop {
295 line.clear();
296 match reader.read_line(&mut line).await {
297 Ok(0) => break, Ok(_) => {
299 let trimmed = line.trim();
300 if !trimmed.is_empty() {
301 trace!("Process RX: {}", trimmed);
302 if read_tx.send(trimmed.to_string()).await.is_err() {
303 break;
304 }
305 }
306 }
307 Err(e) => {
308 error!("Failed to read from process stdout: {}", e);
309 break;
310 }
311 }
312 }
313 });
314
315 Ok(Self {
316 _child: child,
317 tx: write_tx,
318 rx: tokio::sync::Mutex::new(read_rx),
319 })
320 }
321
322 async fn send_json(&self, value: Value) -> Result<()> {
323 let json = serde_json::to_string(&value)?;
324 self.tx.send(json).await?;
325 Ok(())
326 }
327}
328
329#[async_trait]
330impl Transport for ProcessTransport {
331 async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
332 self.send_json(serde_json::to_value(&request)?).await
333 }
334
335 async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
336 self.send_json(serde_json::to_value(&response)?).await
337 }
338
339 async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
340 self.send_json(serde_json::to_value(¬ification)?).await
341 }
342
343 async fn receive(&self) -> Result<Option<McpMessage>> {
344 let mut rx = self.rx.lock().await;
345 match rx.recv().await {
346 Some(line) => {
347 let value: Value = serde_json::from_str(&line)?;
348 let msg = McpMessage::from_json(value)?;
349 Ok(Some(msg))
350 }
351 None => Ok(None),
352 }
353 }
354
355 async fn close(&self) -> Result<()> {
356 Ok(())
357 }
358}