agentox_core/client/
stdio.rs1use crate::client::transport::Transport;
4use crate::error::TransportError;
5use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
6use tokio::process::{Child, ChildStdin, ChildStdout};
7
8const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
10
11pub struct StdioTransport {
13 child: Child,
14 stdin: Option<BufWriter<ChildStdin>>,
16 stdout: BufReader<ChildStdout>,
17 command: String,
19 read_timeout: std::time::Duration,
21}
22
23impl StdioTransport {
24 pub async fn spawn(command: &str) -> Result<Self, TransportError> {
30 Self::spawn_inner(command, false).await
31 }
32
33 pub async fn spawn_quiet(command: &str) -> Result<Self, TransportError> {
38 Self::spawn_inner(command, true).await
39 }
40
41 async fn spawn_inner(command: &str, quiet: bool) -> Result<Self, TransportError> {
43 let words =
44 shell_words::split(command).map_err(|e| TransportError::CommandParse(e.to_string()))?;
45
46 if words.is_empty() {
47 return Err(TransportError::CommandParse("empty command".to_string()));
48 }
49
50 let program = &words[0];
51 let args = &words[1..];
52
53 tracing::debug!(program = %program, args = ?args, quiet, "spawning MCP server process");
54
55 let stderr_cfg = if quiet {
56 std::process::Stdio::null()
57 } else {
58 std::process::Stdio::inherit()
59 };
60
61 let mut child = tokio::process::Command::new(program)
62 .args(args)
63 .stdin(std::process::Stdio::piped())
64 .stdout(std::process::Stdio::piped())
65 .stderr(stderr_cfg)
66 .spawn()
67 .map_err(TransportError::Io)?;
68
69 let stdin = child
70 .stdin
71 .take()
72 .ok_or_else(|| TransportError::CommandParse("failed to capture stdin".to_string()))?;
73 let stdout = child
74 .stdout
75 .take()
76 .ok_or_else(|| TransportError::CommandParse("failed to capture stdout".to_string()))?;
77
78 Ok(Self {
79 child,
80 stdin: Some(BufWriter::new(stdin)),
81 stdout: BufReader::new(stdout),
82 command: command.to_string(),
83 read_timeout: DEFAULT_READ_TIMEOUT,
84 })
85 }
86
87 pub fn set_read_timeout(&mut self, timeout: std::time::Duration) {
89 self.read_timeout = timeout;
90 }
91
92 pub fn command(&self) -> &str {
94 &self.command
95 }
96
97 async fn write_line(&mut self, message: &str) -> Result<(), TransportError> {
99 let stdin = self
100 .stdin
101 .as_mut()
102 .ok_or_else(|| TransportError::ProcessExit("stdin already closed".to_string()))?;
103
104 stdin
105 .write_all(message.as_bytes())
106 .await
107 .map_err(TransportError::Io)?;
108 stdin.write_all(b"\n").await.map_err(TransportError::Io)?;
109 stdin.flush().await.map_err(TransportError::Io)?;
110
111 Ok(())
112 }
113
114 async fn read_line(&mut self) -> Result<Option<String>, TransportError> {
116 let read_future = async {
117 let mut line = String::new();
118 let bytes_read = self
119 .stdout
120 .read_line(&mut line)
121 .await
122 .map_err(TransportError::Io)?;
123
124 if bytes_read == 0 {
125 return Err(TransportError::ProcessExit(
126 "server closed stdout".to_string(),
127 ));
128 }
129
130 let trimmed = line.trim().to_string();
131 if trimmed.is_empty() {
132 return Ok(None);
133 }
134
135 Ok(Some(trimmed))
136 };
137
138 match tokio::time::timeout(self.read_timeout, read_future).await {
139 Ok(result) => result,
140 Err(_) => Err(TransportError::Timeout(self.read_timeout)),
141 }
142 }
143}
144
145#[async_trait::async_trait]
146impl Transport for StdioTransport {
147 async fn write_raw(&mut self, message: &str) -> Result<(), TransportError> {
148 self.write_line(message).await
149 }
150
151 async fn request_raw(&mut self, message: &str) -> Result<Option<String>, TransportError> {
152 self.write_line(message).await?;
153 self.read_line().await
154 }
155
156 async fn shutdown(&mut self) -> Result<(), TransportError> {
157 drop(self.stdin.take());
159
160 match tokio::time::timeout(std::time::Duration::from_secs(5), self.child.wait()).await {
162 Ok(Ok(status)) => {
163 tracing::debug!(status = %status, "server process exited");
164 Ok(())
165 }
166 Ok(Err(e)) => Err(TransportError::Io(e)),
167 Err(_) => {
168 tracing::warn!("server process did not exit within 5s, sending SIGKILL");
169 self.child.kill().await.map_err(TransportError::Io)?;
170 Ok(())
171 }
172 }
173 }
174}