meerkat_mobkit/
process.rs1use std::io::{BufRead, BufReader};
4use std::process::{Child, Command, Stdio};
5use std::sync::mpsc;
6use std::time::Duration;
7
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub enum ProcessBoundaryError {
10 SpawnFailed(String),
11 MissingStdout,
12 Io(String),
13 Timeout { timeout_ms: u64 },
14 EmptyOutput,
15 InvalidJsonLine,
16}
17
18impl std::fmt::Display for ProcessBoundaryError {
19 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20 match self {
21 Self::SpawnFailed(msg) => write!(f, "spawn failed: {msg}"),
22 Self::MissingStdout => write!(f, "missing stdout handle"),
23 Self::Io(msg) => write!(f, "I/O error: {msg}"),
24 Self::Timeout { timeout_ms } => write!(f, "timed out after {timeout_ms}ms"),
25 Self::EmptyOutput => write!(f, "empty output"),
26 Self::InvalidJsonLine => write!(f, "invalid JSON line"),
27 }
28 }
29}
30
31impl std::error::Error for ProcessBoundaryError {}
32
33pub fn run_process_json_line(
34 command: &str,
35 args: &[String],
36 env: &[(String, String)],
37 timeout: Duration,
38) -> Result<String, ProcessBoundaryError> {
39 let mut child = Command::new(command)
40 .args(args)
41 .envs(env.iter().map(|(k, v)| (k, v)))
42 .stdout(Stdio::piped())
43 .stderr(Stdio::null())
44 .spawn()
45 .map_err(|err| ProcessBoundaryError::SpawnFailed(err.to_string()))?;
46
47 let stdout = child
48 .stdout
49 .take()
50 .ok_or(ProcessBoundaryError::MissingStdout)?;
51 let (tx, rx) = mpsc::channel();
52
53 std::thread::spawn(move || {
54 let mut reader = BufReader::new(stdout);
55 let mut line = String::new();
56 let read = reader.read_line(&mut line).map_err(|err| err.to_string());
57 let _ = tx.send((read, line));
58 });
59
60 match rx.recv_timeout(timeout) {
61 Ok((Ok(0), _)) => {
62 wait_with_context(&mut child, "failed to wait for process after empty output")?;
63 Err(ProcessBoundaryError::EmptyOutput)
64 }
65 Ok((Ok(_), mut line)) => {
66 wait_with_context(
67 &mut child,
68 "failed to wait for process after reading output",
69 )?;
70 if line.ends_with('\n') {
71 line.pop();
72 if line.ends_with('\r') {
73 line.pop();
74 }
75 }
76 if serde_json::from_str::<serde_json::Value>(&line).is_err() {
77 return Err(ProcessBoundaryError::InvalidJsonLine);
78 }
79 Ok(line)
80 }
81 Ok((Err(err), _)) => {
82 wait_with_context(
83 &mut child,
84 "failed to wait for process after stdout read failure",
85 )?;
86 Err(ProcessBoundaryError::Io(err))
87 }
88 Err(_) => {
89 let timeout_ms = timeout.as_millis() as u64;
90 cleanup_timeout_with_process(&mut child, timeout_ms)?;
91 Err(ProcessBoundaryError::Timeout { timeout_ms })
92 }
93 }
94}
95
96fn wait_with_context(child: &mut Child, context: &str) -> Result<(), ProcessBoundaryError> {
97 child
98 .wait()
99 .map(|_| ())
100 .map_err(|err| ProcessBoundaryError::Io(format!("{context}: {err}")))
101}
102
103fn cleanup_timeout_with_process(
104 child: &mut Child,
105 timeout_ms: u64,
106) -> Result<(), ProcessBoundaryError> {
107 match child.try_wait() {
108 Ok(Some(_)) => return Ok(()),
109 Ok(None) => {}
110 Err(error) => {
111 return Err(ProcessBoundaryError::Io(format!(
112 "failed to probe process status after timeout({timeout_ms}ms): {error}"
113 )));
114 }
115 }
116
117 if let Err(kill_error) = child.kill() {
118 return match child.try_wait() {
119 Ok(Some(_)) => Ok(()),
120 Ok(None) => Err(ProcessBoundaryError::Io(format!(
121 "failed to kill process after timeout({timeout_ms}ms): {kill_error}"
122 ))),
123 Err(probe_error) => Err(ProcessBoundaryError::Io(format!(
124 "failed to kill process after timeout({timeout_ms}ms): {kill_error}; failed to probe process status: {probe_error}"
125 ))),
126 };
127 }
128
129 child.wait().map(|_| ()).map_err(|error| {
130 ProcessBoundaryError::Io(format!(
131 "failed to wait for process after timeout kill({timeout_ms}ms): {error}"
132 ))
133 })
134}
135
136#[cfg(test)]
137fn cleanup_timeout_with_ops<FTryWait, FKill, FWait>(
138 timeout_ms: u64,
139 mut try_wait: FTryWait,
140 mut kill: FKill,
141 mut wait: FWait,
142) -> Result<(), ProcessBoundaryError>
143where
144 FTryWait: FnMut() -> std::io::Result<Option<()>>,
145 FKill: FnMut() -> std::io::Result<()>,
146 FWait: FnMut() -> std::io::Result<()>,
147{
148 match try_wait() {
149 Ok(Some(())) => return Ok(()),
150 Ok(None) => {}
151 Err(error) => {
152 return Err(ProcessBoundaryError::Io(format!(
153 "failed to probe process status after timeout({timeout_ms}ms): {error}"
154 )));
155 }
156 }
157
158 if let Err(kill_error) = kill() {
159 return match try_wait() {
160 Ok(Some(())) => Ok(()),
161 Ok(None) => Err(ProcessBoundaryError::Io(format!(
162 "failed to kill process after timeout({timeout_ms}ms): {kill_error}"
163 ))),
164 Err(probe_error) => Err(ProcessBoundaryError::Io(format!(
165 "failed to kill process after timeout({timeout_ms}ms): {kill_error}; failed to probe process status: {probe_error}"
166 ))),
167 };
168 }
169
170 wait().map_err(|error| {
171 ProcessBoundaryError::Io(format!(
172 "failed to wait for process after timeout kill({timeout_ms}ms): {error}"
173 ))
174 })
175}
176
177#[cfg(test)]
178#[allow(clippy::expect_used, clippy::panic)]
179mod tests {
180 use std::io;
181
182 use super::{ProcessBoundaryError, cleanup_timeout_with_ops};
183
184 #[test]
185 fn timeout_cleanup_handles_kill_race_without_type_drift() {
186 let mut try_wait_results = vec![Ok(None), Ok(Some(()))].into_iter();
187 let mut kill_attempts = 0;
188 let result = cleanup_timeout_with_ops(
189 25,
190 || try_wait_results.next().expect("try_wait result"),
191 || {
192 kill_attempts += 1;
193 Err(io::Error::new(io::ErrorKind::NotFound, "already exited"))
194 },
195 || panic!("wait must not run when process already exited"),
196 );
197
198 assert_eq!(kill_attempts, 1);
199 assert_eq!(result, Ok(()));
200 }
201
202 #[test]
203 fn timeout_cleanup_returns_io_on_fatal_kill_failure() {
204 let mut try_wait_results = vec![Ok(None), Ok(None)].into_iter();
205 let result = cleanup_timeout_with_ops(
206 25,
207 || try_wait_results.next().expect("try_wait result"),
208 || {
209 Err(io::Error::new(
210 io::ErrorKind::PermissionDenied,
211 "permission denied",
212 ))
213 },
214 || Ok(()),
215 );
216
217 assert!(matches!(
218 result,
219 Err(ProcessBoundaryError::Io(message))
220 if message.contains("failed to kill process after timeout(25ms)")
221 ));
222 }
223}