Skip to main content

sovran_mcp/transport/
stdio.rs

1use crate::messaging::JsonRpcMessage;
2use crate::transport::Transport;
3use crate::McpError;
4use std::io::{self, BufRead, Read, Write};
5use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
6use std::sync::{Arc, Mutex};
7use tracing::debug;
8
9pub struct TimeoutBufReader<R> {
10    inner: io::BufReader<R>,
11}
12
13impl<R: Read> TimeoutBufReader<R> {
14    pub fn new(inner: R) -> Self {
15        Self {
16            inner: io::BufReader::new(inner),
17        }
18    }
19
20    pub fn read_line(&mut self, buf: &mut String) -> io::Result<usize> {
21        self.inner.read_line(buf)
22    }
23}
24
25/// ClientStdioTransport launches a child process and communicates with it via stdio
26pub struct StdioTransport {
27    stdin: Arc<Mutex<Option<ChildStdin>>>,
28    stdout: Arc<Mutex<Option<TimeoutBufReader<ChildStdout>>>>,
29    child: Arc<Mutex<Option<Child>>>,
30    program: String,
31    args: Vec<String>,
32}
33
34impl StdioTransport {
35    pub fn new(program: &str, args: &[&str]) -> Result<Self, McpError> {
36        debug!(
37            "Creating StdioTransport for {} with args: {:?}",
38            program, args
39        );
40        Ok(StdioTransport {
41            stdin: Arc::new(Mutex::new(None)),
42            stdout: Arc::new(Mutex::new(None)),
43            child: Arc::new(Mutex::new(None)),
44            program: program.to_string(),
45            args: args.iter().map(|&s| s.to_string()).collect(),
46        })
47    }
48}
49
50impl Transport for StdioTransport {
51    fn send(&self, message: &JsonRpcMessage) -> Result<(), McpError> {
52        let mut stdin_guard = self.stdin.lock().unwrap();
53        let stdin = stdin_guard
54            .as_mut()
55            .ok_or_else(|| McpError::TransportNotOpen)?;
56
57        let serialized = serde_json::to_string(message)?;
58        stdin.write_all(serialized.as_bytes())?;
59        stdin.write_all(b"\n")?;
60        stdin.flush()?; // Ensure the data is flushed
61
62        Ok(())
63    }
64
65    fn receive(&self) -> Result<JsonRpcMessage, McpError> {
66        let mut stdout_guard = self.stdout.lock().unwrap();
67        let stdout = stdout_guard
68            .as_mut()
69            .ok_or_else(|| McpError::TransportNotOpen)?;
70
71        let mut line = String::new();
72        debug!("stdio: waiting on message");
73        stdout.read_line(&mut line)?;
74        debug!("stdio: Received message: {:?}", line);
75
76        let message: JsonRpcMessage = serde_json::from_str(&line)?;
77        Ok(message)
78    }
79
80    fn open(&self) -> Result<(), McpError> {
81        debug!("StdioTransport: Opening transport");
82        let mut child = Command::new(&self.program)
83            .args(&self.args)
84            .stdin(Stdio::piped())
85            .stdout(Stdio::piped())
86            .stderr(Stdio::piped()) // Capture stderr
87            .spawn()?;
88
89        let pid = child.id();
90        debug!("StdioTransport: Started child process with PID {}", pid);
91
92        let stdin = child
93            .stdin
94            .take()
95            .ok_or_else(|| McpError::StdinNotAvailable)?;
96        let stdout = child
97            .stdout
98            .take()
99            .ok_or_else(|| McpError::StdoutNotAvailable)?;
100        let stderr = child
101            .stderr
102            .take()
103            .ok_or_else(|| McpError::StderrNotAvailable)?;
104
105        *self.stdin.lock().unwrap() = Some(stdin);
106        *self.stdout.lock().unwrap() = Some(TimeoutBufReader::new(stdout));
107
108        // Intercept stderr
109        let stderr_reader = io::BufReader::new(stderr);
110        std::thread::spawn(move || {
111            for line in stderr_reader.lines() {
112                if let Ok(line) = line {
113                    println!("MCP Server Stderr: {}", line);
114                }
115            }
116        });
117
118        *self.child.lock().unwrap() = Some(child);
119
120        debug!("StdioTransport: Transport opened successfully");
121        Ok(())
122    }
123
124    fn close(&self) -> Result<(), McpError> {
125        debug!("StdioTransport: Starting close");
126        if let Some(mut child) = self.child.lock().unwrap().take() {
127            let pid = child.id();
128            debug!("StdioTransport: Killing child process {}", pid);
129            let _ = child.kill();
130            let _ = child.wait();
131            debug!("StdioTransport: Child process {} terminated", pid);
132        }
133
134        // Drop stdin and stdout to unblock any pending operations
135        *self.stdin.lock().unwrap() = None;
136        *self.stdout.lock().unwrap() = None;
137
138        debug!("StdioTransport: Close completed");
139        Ok(())
140    }
141}