codetether_agent/mcp/
transport.rs1use super::types::*;
4use anyhow::Result;
5use async_trait::async_trait;
6use serde_json::Value;
7use std::io::{BufRead, Write};
8use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
9use tokio::sync::mpsc;
10use tracing::{debug, error, trace};
11
12#[async_trait]
14pub trait Transport: Send + Sync {
15 async fn send_request(&self, request: JsonRpcRequest) -> Result<()>;
17
18 async fn send_response(&self, response: JsonRpcResponse) -> Result<()>;
20
21 async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()>;
23
24 async fn receive(&self) -> Result<Option<McpMessage>>;
26
27 async fn close(&self) -> Result<()>;
29}
30
31#[derive(Debug, Clone)]
33pub enum McpMessage {
34 Request(JsonRpcRequest),
35 Response(JsonRpcResponse),
36 Notification(JsonRpcNotification),
37}
38
39impl McpMessage {
40 pub fn from_json(value: Value) -> Result<Self> {
41 if value.get("id").is_some() {
43 if value.get("method").is_some() {
45 let request: JsonRpcRequest = serde_json::from_value(value)?;
47 Ok(McpMessage::Request(request))
48 } else {
49 let response: JsonRpcResponse = serde_json::from_value(value)?;
51 Ok(McpMessage::Response(response))
52 }
53 } else {
54 let notification: JsonRpcNotification = serde_json::from_value(value)?;
56 Ok(McpMessage::Notification(notification))
57 }
58 }
59}
60
61pub struct StdioTransport {
63 #[allow(dead_code)]
65 tx: mpsc::Sender<String>,
66 rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
67}
68
69impl Default for StdioTransport {
70 fn default() -> Self {
71 Self::new()
72 }
73}
74
75impl StdioTransport {
76 pub fn new() -> Self {
78 let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
79 let (read_tx, read_rx) = mpsc::channel::<String>(100);
80
81 std::thread::spawn(move || {
83 let mut stdout = std::io::stdout().lock();
84 while let Some(msg) = write_rx.blocking_recv() {
85 trace!("MCP TX: {}", msg);
86 if let Err(e) = writeln!(stdout, "{}", msg) {
87 error!("Failed to write to stdout: {}", e);
88 break;
89 }
90 if let Err(e) = stdout.flush() {
91 error!("Failed to flush stdout: {}", e);
92 break;
93 }
94 }
95 });
96
97 std::thread::spawn(move || {
99 let stdin = std::io::stdin();
100 let reader = stdin.lock();
101 for line in reader.lines() {
102 match line {
103 Ok(msg) if !msg.is_empty() => {
104 trace!("MCP RX: {}", msg);
105 if read_tx.blocking_send(msg).is_err() {
106 break;
107 }
108 }
109 Ok(_) => continue, Err(e) => {
111 error!("Failed to read from stdin: {}", e);
112 break;
113 }
114 }
115 }
116 });
117
118 Self {
119 tx: write_tx,
120 rx: tokio::sync::Mutex::new(read_rx),
121 }
122 }
123
124 async fn send_json(&self, value: Value) -> Result<()> {
125 let json = serde_json::to_string(&value)?;
126 self.tx.send(json).await?;
127 Ok(())
128 }
129}
130
131#[async_trait]
132impl Transport for StdioTransport {
133 async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
134 self.send_json(serde_json::to_value(&request)?).await
135 }
136
137 async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
138 self.send_json(serde_json::to_value(&response)?).await
139 }
140
141 async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
142 self.send_json(serde_json::to_value(¬ification)?).await
143 }
144
145 async fn receive(&self) -> Result<Option<McpMessage>> {
146 let mut rx = self.rx.lock().await;
147 match rx.recv().await {
148 Some(line) => {
149 let value: Value = serde_json::from_str(&line)?;
150 let msg = McpMessage::from_json(value)?;
151 Ok(Some(msg))
152 }
153 None => Ok(None),
154 }
155 }
156
157 async fn close(&self) -> Result<()> {
158 Ok(())
160 }
161}
162
163pub struct SseTransport {
165 endpoint: String,
166 client: reqwest::Client,
167 _tx: mpsc::Sender<String>,
168 rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
169}
170
171impl SseTransport {
172 pub async fn new(endpoint: String) -> Result<Self> {
174 let client = reqwest::Client::new();
175 let (write_tx, _write_rx) = mpsc::channel::<String>(100);
176 let (read_tx, read_rx) = mpsc::channel::<String>(100);
177
178 let endpoint_clone = endpoint.clone();
181 let read_tx_clone = read_tx;
182
183 tokio::spawn(async move {
184 debug!("SSE transport connecting to: {}", endpoint_clone);
185 let _ = read_tx_clone;
187 });
188
189 Ok(Self {
190 endpoint,
191 client,
192 _tx: write_tx,
193 rx: tokio::sync::Mutex::new(read_rx),
194 })
195 }
196
197 async fn send_json(&self, value: Value) -> Result<()> {
198 let json = serde_json::to_string(&value)?;
199 debug!("SSE TX: {}", json);
200
201 self.client
203 .post(&self.endpoint)
204 .header("Content-Type", "application/json")
205 .body(json)
206 .send()
207 .await?;
208
209 Ok(())
210 }
211}
212
213#[async_trait]
214impl Transport for SseTransport {
215 async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
216 self.send_json(serde_json::to_value(&request)?).await
217 }
218
219 async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
220 self.send_json(serde_json::to_value(&response)?).await
221 }
222
223 async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
224 self.send_json(serde_json::to_value(¬ification)?).await
225 }
226
227 async fn receive(&self) -> Result<Option<McpMessage>> {
228 let mut rx = self.rx.lock().await;
229 match rx.recv().await {
230 Some(line) => {
231 let value: Value = serde_json::from_str(&line)?;
232 let msg = McpMessage::from_json(value)?;
233 Ok(Some(msg))
234 }
235 None => Ok(None),
236 }
237 }
238
239 async fn close(&self) -> Result<()> {
240 Ok(())
241 }
242}
243
244pub struct ProcessTransport {
246 _child: tokio::process::Child,
247 tx: mpsc::Sender<String>,
248 rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
249}
250
251impl ProcessTransport {
252 pub async fn spawn(command: &str, args: &[&str]) -> Result<Self> {
254 use tokio::process::Command;
255
256 let mut child = Command::new(command)
257 .args(args)
258 .stdin(std::process::Stdio::piped())
259 .stdout(std::process::Stdio::piped())
260 .stderr(std::process::Stdio::inherit())
261 .spawn()?;
262
263 let stdout = child.stdout.take().ok_or_else(|| anyhow::anyhow!("No stdout"))?;
264 let mut stdin = child.stdin.take().ok_or_else(|| anyhow::anyhow!("No stdin"))?;
265
266 let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
267 let (read_tx, read_rx) = mpsc::channel::<String>(100);
268
269 tokio::spawn(async move {
271 while let Some(msg) = write_rx.recv().await {
272 trace!("Process TX: {}", msg);
273 if let Err(e) = stdin.write_all(format!("{}\n", msg).as_bytes()).await {
274 error!("Failed to write to process stdin: {}", e);
275 break;
276 }
277 if let Err(e) = stdin.flush().await {
278 error!("Failed to flush process stdin: {}", e);
279 break;
280 }
281 }
282 });
283
284 tokio::spawn(async move {
286 let mut reader = BufReader::new(stdout);
287 let mut line = String::new();
288 loop {
289 line.clear();
290 match reader.read_line(&mut line).await {
291 Ok(0) => break, Ok(_) => {
293 let trimmed = line.trim();
294 if !trimmed.is_empty() {
295 trace!("Process RX: {}", trimmed);
296 if read_tx.send(trimmed.to_string()).await.is_err() {
297 break;
298 }
299 }
300 }
301 Err(e) => {
302 error!("Failed to read from process stdout: {}", e);
303 break;
304 }
305 }
306 }
307 });
308
309 Ok(Self {
310 _child: child,
311 tx: write_tx,
312 rx: tokio::sync::Mutex::new(read_rx),
313 })
314 }
315
316 async fn send_json(&self, value: Value) -> Result<()> {
317 let json = serde_json::to_string(&value)?;
318 self.tx.send(json).await?;
319 Ok(())
320 }
321}
322
323#[async_trait]
324impl Transport for ProcessTransport {
325 async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
326 self.send_json(serde_json::to_value(&request)?).await
327 }
328
329 async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
330 self.send_json(serde_json::to_value(&response)?).await
331 }
332
333 async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
334 self.send_json(serde_json::to_value(¬ification)?).await
335 }
336
337 async fn receive(&self) -> Result<Option<McpMessage>> {
338 let mut rx = self.rx.lock().await;
339 match rx.recv().await {
340 Some(line) => {
341 let value: Value = serde_json::from_str(&line)?;
342 let msg = McpMessage::from_json(value)?;
343 Ok(Some(msg))
344 }
345 None => Ok(None),
346 }
347 }
348
349 async fn close(&self) -> Result<()> {
350 Ok(())
351 }
352}