Skip to main content

meerkat_mobkit/
process.rs

1//! Process boundary types for gateway binary communication.
2
3use std::io::{BufRead, BufReader};
4use std::process::{Child, Command, Stdio};
5use std::sync::mpsc;
6use std::time::Duration;
7
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub enum ProcessBoundaryError {
10    SpawnFailed(String),
11    MissingStdout,
12    Io(String),
13    Timeout { timeout_ms: u64 },
14    EmptyOutput,
15    InvalidJsonLine,
16}
17
18impl std::fmt::Display for ProcessBoundaryError {
19    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20        match self {
21            Self::SpawnFailed(msg) => write!(f, "spawn failed: {msg}"),
22            Self::MissingStdout => write!(f, "missing stdout handle"),
23            Self::Io(msg) => write!(f, "I/O error: {msg}"),
24            Self::Timeout { timeout_ms } => write!(f, "timed out after {timeout_ms}ms"),
25            Self::EmptyOutput => write!(f, "empty output"),
26            Self::InvalidJsonLine => write!(f, "invalid JSON line"),
27        }
28    }
29}
30
31impl std::error::Error for ProcessBoundaryError {}
32
33pub fn run_process_json_line(
34    command: &str,
35    args: &[String],
36    env: &[(String, String)],
37    timeout: Duration,
38) -> Result<String, ProcessBoundaryError> {
39    let mut child = Command::new(command)
40        .args(args)
41        .envs(env.iter().map(|(k, v)| (k, v)))
42        .stdout(Stdio::piped())
43        .stderr(Stdio::null())
44        .spawn()
45        .map_err(|err| ProcessBoundaryError::SpawnFailed(err.to_string()))?;
46
47    let stdout = child
48        .stdout
49        .take()
50        .ok_or(ProcessBoundaryError::MissingStdout)?;
51    let (tx, rx) = mpsc::channel();
52
53    std::thread::spawn(move || {
54        let mut reader = BufReader::new(stdout);
55        let mut line = String::new();
56        let read = reader.read_line(&mut line).map_err(|err| err.to_string());
57        let _ = tx.send((read, line));
58    });
59
60    match rx.recv_timeout(timeout) {
61        Ok((Ok(0), _)) => {
62            wait_with_context(&mut child, "failed to wait for process after empty output")?;
63            Err(ProcessBoundaryError::EmptyOutput)
64        }
65        Ok((Ok(_), mut line)) => {
66            wait_with_context(
67                &mut child,
68                "failed to wait for process after reading output",
69            )?;
70            if line.ends_with('\n') {
71                line.pop();
72                if line.ends_with('\r') {
73                    line.pop();
74                }
75            }
76            if serde_json::from_str::<serde_json::Value>(&line).is_err() {
77                return Err(ProcessBoundaryError::InvalidJsonLine);
78            }
79            Ok(line)
80        }
81        Ok((Err(err), _)) => {
82            wait_with_context(
83                &mut child,
84                "failed to wait for process after stdout read failure",
85            )?;
86            Err(ProcessBoundaryError::Io(err))
87        }
88        Err(_) => {
89            let timeout_ms = timeout.as_millis() as u64;
90            cleanup_timeout_with_process(&mut child, timeout_ms)?;
91            Err(ProcessBoundaryError::Timeout { timeout_ms })
92        }
93    }
94}
95
96fn wait_with_context(child: &mut Child, context: &str) -> Result<(), ProcessBoundaryError> {
97    child
98        .wait()
99        .map(|_| ())
100        .map_err(|err| ProcessBoundaryError::Io(format!("{context}: {err}")))
101}
102
103fn cleanup_timeout_with_process(
104    child: &mut Child,
105    timeout_ms: u64,
106) -> Result<(), ProcessBoundaryError> {
107    match child.try_wait() {
108        Ok(Some(_)) => return Ok(()),
109        Ok(None) => {}
110        Err(error) => {
111            return Err(ProcessBoundaryError::Io(format!(
112                "failed to probe process status after timeout({timeout_ms}ms): {error}"
113            )));
114        }
115    }
116
117    if let Err(kill_error) = child.kill() {
118        return match child.try_wait() {
119            Ok(Some(_)) => Ok(()),
120            Ok(None) => Err(ProcessBoundaryError::Io(format!(
121                "failed to kill process after timeout({timeout_ms}ms): {kill_error}"
122            ))),
123            Err(probe_error) => Err(ProcessBoundaryError::Io(format!(
124                "failed to kill process after timeout({timeout_ms}ms): {kill_error}; failed to probe process status: {probe_error}"
125            ))),
126        };
127    }
128
129    child.wait().map(|_| ()).map_err(|error| {
130        ProcessBoundaryError::Io(format!(
131            "failed to wait for process after timeout kill({timeout_ms}ms): {error}"
132        ))
133    })
134}
135
136#[cfg(test)]
137fn cleanup_timeout_with_ops<FTryWait, FKill, FWait>(
138    timeout_ms: u64,
139    mut try_wait: FTryWait,
140    mut kill: FKill,
141    mut wait: FWait,
142) -> Result<(), ProcessBoundaryError>
143where
144    FTryWait: FnMut() -> std::io::Result<Option<()>>,
145    FKill: FnMut() -> std::io::Result<()>,
146    FWait: FnMut() -> std::io::Result<()>,
147{
148    match try_wait() {
149        Ok(Some(())) => return Ok(()),
150        Ok(None) => {}
151        Err(error) => {
152            return Err(ProcessBoundaryError::Io(format!(
153                "failed to probe process status after timeout({timeout_ms}ms): {error}"
154            )));
155        }
156    }
157
158    if let Err(kill_error) = kill() {
159        return match try_wait() {
160            Ok(Some(())) => Ok(()),
161            Ok(None) => Err(ProcessBoundaryError::Io(format!(
162                "failed to kill process after timeout({timeout_ms}ms): {kill_error}"
163            ))),
164            Err(probe_error) => Err(ProcessBoundaryError::Io(format!(
165                "failed to kill process after timeout({timeout_ms}ms): {kill_error}; failed to probe process status: {probe_error}"
166            ))),
167        };
168    }
169
170    wait().map_err(|error| {
171        ProcessBoundaryError::Io(format!(
172            "failed to wait for process after timeout kill({timeout_ms}ms): {error}"
173        ))
174    })
175}
176
177#[cfg(test)]
178#[allow(clippy::expect_used, clippy::panic)]
179mod tests {
180    use std::io;
181
182    use super::{ProcessBoundaryError, cleanup_timeout_with_ops};
183
184    #[test]
185    fn timeout_cleanup_handles_kill_race_without_type_drift() {
186        let mut try_wait_results = vec![Ok(None), Ok(Some(()))].into_iter();
187        let mut kill_attempts = 0;
188        let result = cleanup_timeout_with_ops(
189            25,
190            || try_wait_results.next().expect("try_wait result"),
191            || {
192                kill_attempts += 1;
193                Err(io::Error::new(io::ErrorKind::NotFound, "already exited"))
194            },
195            || panic!("wait must not run when process already exited"),
196        );
197
198        assert_eq!(kill_attempts, 1);
199        assert_eq!(result, Ok(()));
200    }
201
202    #[test]
203    fn timeout_cleanup_returns_io_on_fatal_kill_failure() {
204        let mut try_wait_results = vec![Ok(None), Ok(None)].into_iter();
205        let result = cleanup_timeout_with_ops(
206            25,
207            || try_wait_results.next().expect("try_wait result"),
208            || {
209                Err(io::Error::new(
210                    io::ErrorKind::PermissionDenied,
211                    "permission denied",
212                ))
213            },
214            || Ok(()),
215        );
216
217        assert!(matches!(
218            result,
219            Err(ProcessBoundaryError::Io(message))
220                if message.contains("failed to kill process after timeout(25ms)")
221        ));
222    }
223}