Skip to main content

agentox_core/client/
stdio.rs

1//! Stdio transport — spawns a child process and communicates via stdin/stdout.
2
3use crate::client::transport::Transport;
4use crate::error::TransportError;
5use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
6use tokio::process::{Child, ChildStdin, ChildStdout};
7
8/// Default read timeout for waiting on server responses.
9const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
10
11/// MCP transport over stdio — spawns a subprocess and pipes JSON-RPC messages.
12pub struct StdioTransport {
13    child: Child,
14    /// Wrapped in Option so we can take() it during shutdown to explicitly close stdin.
15    stdin: Option<BufWriter<ChildStdin>>,
16    stdout: BufReader<ChildStdout>,
17    /// The original command string, stored for reconnection.
18    command: String,
19    /// Maximum time to wait for a response line from the server.
20    read_timeout: std::time::Duration,
21}
22
23impl StdioTransport {
24    /// Spawn a child process from a shell command string.
25    ///
26    /// The command is split using shell word-splitting rules (handles quotes).
27    /// stdin and stdout are piped for JSON-RPC communication; stderr is inherited
28    /// so the user can see server logs.
29    pub async fn spawn(command: &str) -> Result<Self, TransportError> {
30        Self::spawn_inner(command, false).await
31    }
32
33    /// Spawn a child process with stderr suppressed.
34    ///
35    /// Used for disposable sessions where server startup messages would clutter
36    /// the terminal output.
37    pub async fn spawn_quiet(command: &str) -> Result<Self, TransportError> {
38        Self::spawn_inner(command, true).await
39    }
40
41    /// Internal spawn implementation.
42    async fn spawn_inner(command: &str, quiet: bool) -> Result<Self, TransportError> {
43        let words =
44            shell_words::split(command).map_err(|e| TransportError::CommandParse(e.to_string()))?;
45
46        if words.is_empty() {
47            return Err(TransportError::CommandParse("empty command".to_string()));
48        }
49
50        let program = &words[0];
51        let args = &words[1..];
52
53        tracing::debug!(program = %program, args = ?args, quiet, "spawning MCP server process");
54
55        let stderr_cfg = if quiet {
56            std::process::Stdio::null()
57        } else {
58            std::process::Stdio::inherit()
59        };
60
61        let mut child = tokio::process::Command::new(program)
62            .args(args)
63            .stdin(std::process::Stdio::piped())
64            .stdout(std::process::Stdio::piped())
65            .stderr(stderr_cfg)
66            .spawn()
67            .map_err(TransportError::Io)?;
68
69        let stdin = child
70            .stdin
71            .take()
72            .ok_or_else(|| TransportError::CommandParse("failed to capture stdin".to_string()))?;
73        let stdout = child
74            .stdout
75            .take()
76            .ok_or_else(|| TransportError::CommandParse("failed to capture stdout".to_string()))?;
77
78        Ok(Self {
79            child,
80            stdin: Some(BufWriter::new(stdin)),
81            stdout: BufReader::new(stdout),
82            command: command.to_string(),
83            read_timeout: DEFAULT_READ_TIMEOUT,
84        })
85    }
86
87    /// Set the read timeout for this transport.
88    pub fn set_read_timeout(&mut self, timeout: std::time::Duration) {
89        self.read_timeout = timeout;
90    }
91
92    /// Get the original command string (for reconnection).
93    pub fn command(&self) -> &str {
94        &self.command
95    }
96
97    /// Internal helper — write a line to the child's stdin.
98    async fn write_line(&mut self, message: &str) -> Result<(), TransportError> {
99        let stdin = self
100            .stdin
101            .as_mut()
102            .ok_or_else(|| TransportError::ProcessExit("stdin already closed".to_string()))?;
103
104        stdin
105            .write_all(message.as_bytes())
106            .await
107            .map_err(TransportError::Io)?;
108        stdin.write_all(b"\n").await.map_err(TransportError::Io)?;
109        stdin.flush().await.map_err(TransportError::Io)?;
110
111        Ok(())
112    }
113
114    /// Internal helper — read one line from the child's stdout, with timeout.
115    async fn read_line(&mut self) -> Result<Option<String>, TransportError> {
116        let read_future = async {
117            let mut line = String::new();
118            let bytes_read = self
119                .stdout
120                .read_line(&mut line)
121                .await
122                .map_err(TransportError::Io)?;
123
124            if bytes_read == 0 {
125                return Err(TransportError::ProcessExit(
126                    "server closed stdout".to_string(),
127                ));
128            }
129
130            let trimmed = line.trim().to_string();
131            if trimmed.is_empty() {
132                return Ok(None);
133            }
134
135            Ok(Some(trimmed))
136        };
137
138        match tokio::time::timeout(self.read_timeout, read_future).await {
139            Ok(result) => result,
140            Err(_) => Err(TransportError::Timeout(self.read_timeout)),
141        }
142    }
143}
144
145#[async_trait::async_trait]
146impl Transport for StdioTransport {
147    async fn write_raw(&mut self, message: &str) -> Result<(), TransportError> {
148        self.write_line(message).await
149    }
150
151    async fn request_raw(&mut self, message: &str) -> Result<Option<String>, TransportError> {
152        self.write_line(message).await?;
153        self.read_line().await
154    }
155
156    async fn shutdown(&mut self) -> Result<(), TransportError> {
157        // Explicitly drop stdin to close the pipe and signal EOF to the child
158        drop(self.stdin.take());
159
160        // Give the process time to exit gracefully, then force-kill
161        match tokio::time::timeout(std::time::Duration::from_secs(5), self.child.wait()).await {
162            Ok(Ok(status)) => {
163                tracing::debug!(status = %status, "server process exited");
164                Ok(())
165            }
166            Ok(Err(e)) => Err(TransportError::Io(e)),
167            Err(_) => {
168                tracing::warn!("server process did not exit within 5s, sending SIGKILL");
169                self.child.kill().await.map_err(TransportError::Io)?;
170                Ok(())
171            }
172        }
173    }
174}