sovran_mcp/transport/
stdio.rs1use crate::messaging::JsonRpcMessage;
2use crate::transport::Transport;
3use crate::McpError;
4use std::io::{self, BufRead, Read, Write};
5use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
6use std::sync::{Arc, Mutex};
7use tracing::debug;
8
9pub struct TimeoutBufReader<R> {
10 inner: io::BufReader<R>,
11}
12
13impl<R: Read> TimeoutBufReader<R> {
14 pub fn new(inner: R) -> Self {
15 Self {
16 inner: io::BufReader::new(inner),
17 }
18 }
19
20 pub fn read_line(&mut self, buf: &mut String) -> io::Result<usize> {
21 self.inner.read_line(buf)
22 }
23}
24
25pub struct StdioTransport {
27 stdin: Arc<Mutex<Option<ChildStdin>>>,
28 stdout: Arc<Mutex<Option<TimeoutBufReader<ChildStdout>>>>,
29 child: Arc<Mutex<Option<Child>>>,
30 program: String,
31 args: Vec<String>,
32}
33
34impl StdioTransport {
35 pub fn new(program: &str, args: &[&str]) -> Result<Self, McpError> {
36 debug!(
37 "Creating StdioTransport for {} with args: {:?}",
38 program, args
39 );
40 Ok(StdioTransport {
41 stdin: Arc::new(Mutex::new(None)),
42 stdout: Arc::new(Mutex::new(None)),
43 child: Arc::new(Mutex::new(None)),
44 program: program.to_string(),
45 args: args.iter().map(|&s| s.to_string()).collect(),
46 })
47 }
48}
49
50impl Transport for StdioTransport {
51 fn send(&self, message: &JsonRpcMessage) -> Result<(), McpError> {
52 let mut stdin_guard = self.stdin.lock().unwrap();
53 let stdin = stdin_guard
54 .as_mut()
55 .ok_or_else(|| McpError::TransportNotOpen)?;
56
57 let serialized = serde_json::to_string(message)?;
58 stdin.write_all(serialized.as_bytes())?;
59 stdin.write_all(b"\n")?;
60 stdin.flush()?; Ok(())
63 }
64
65 fn receive(&self) -> Result<JsonRpcMessage, McpError> {
66 let mut stdout_guard = self.stdout.lock().unwrap();
67 let stdout = stdout_guard
68 .as_mut()
69 .ok_or_else(|| McpError::TransportNotOpen)?;
70
71 let mut line = String::new();
72 debug!("stdio: waiting on message");
73 stdout.read_line(&mut line)?;
74 debug!("stdio: Received message: {:?}", line);
75
76 let message: JsonRpcMessage = serde_json::from_str(&line)?;
77 Ok(message)
78 }
79
80 fn open(&self) -> Result<(), McpError> {
81 debug!("StdioTransport: Opening transport");
82 let mut child = Command::new(&self.program)
83 .args(&self.args)
84 .stdin(Stdio::piped())
85 .stdout(Stdio::piped())
86 .stderr(Stdio::piped()) .spawn()?;
88
89 let pid = child.id();
90 debug!("StdioTransport: Started child process with PID {}", pid);
91
92 let stdin = child
93 .stdin
94 .take()
95 .ok_or_else(|| McpError::StdinNotAvailable)?;
96 let stdout = child
97 .stdout
98 .take()
99 .ok_or_else(|| McpError::StdoutNotAvailable)?;
100 let stderr = child
101 .stderr
102 .take()
103 .ok_or_else(|| McpError::StderrNotAvailable)?;
104
105 *self.stdin.lock().unwrap() = Some(stdin);
106 *self.stdout.lock().unwrap() = Some(TimeoutBufReader::new(stdout));
107
108 let stderr_reader = io::BufReader::new(stderr);
110 std::thread::spawn(move || {
111 for line in stderr_reader.lines() {
112 if let Ok(line) = line {
113 println!("MCP Server Stderr: {}", line);
114 }
115 }
116 });
117
118 *self.child.lock().unwrap() = Some(child);
119
120 debug!("StdioTransport: Transport opened successfully");
121 Ok(())
122 }
123
124 fn close(&self) -> Result<(), McpError> {
125 debug!("StdioTransport: Starting close");
126 if let Some(mut child) = self.child.lock().unwrap().take() {
127 let pid = child.id();
128 debug!("StdioTransport: Killing child process {}", pid);
129 let _ = child.kill();
130 let _ = child.wait();
131 debug!("StdioTransport: Child process {} terminated", pid);
132 }
133
134 *self.stdin.lock().unwrap() = None;
136 *self.stdout.lock().unwrap() = None;
137
138 debug!("StdioTransport: Close completed");
139 Ok(())
140 }
141}