meerkat-mobkit 0.6.52

Companion orchestration platform for the Meerkat multi-agent runtime
Documentation
//! Process boundary types for gateway binary communication.

use std::io::{BufRead, BufReader};
use std::process::{Child, Command, Stdio};
use std::sync::mpsc;
use std::time::Duration;

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProcessBoundaryError {
    SpawnFailed(String),
    MissingStdout,
    Io(String),
    Timeout { timeout_ms: u64 },
    EmptyOutput,
    InvalidJsonLine,
}

impl std::fmt::Display for ProcessBoundaryError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::SpawnFailed(msg) => write!(f, "spawn failed: {msg}"),
            Self::MissingStdout => write!(f, "missing stdout handle"),
            Self::Io(msg) => write!(f, "I/O error: {msg}"),
            Self::Timeout { timeout_ms } => write!(f, "timed out after {timeout_ms}ms"),
            Self::EmptyOutput => write!(f, "empty output"),
            Self::InvalidJsonLine => write!(f, "invalid JSON line"),
        }
    }
}

impl std::error::Error for ProcessBoundaryError {}

pub fn run_process_json_line(
    command: &str,
    args: &[String],
    env: &[(String, String)],
    timeout: Duration,
) -> Result<String, ProcessBoundaryError> {
    let mut child = Command::new(command)
        .args(args)
        .envs(env.iter().map(|(k, v)| (k, v)))
        .stdout(Stdio::piped())
        .stderr(Stdio::null())
        .spawn()
        .map_err(|err| ProcessBoundaryError::SpawnFailed(err.to_string()))?;

    let stdout = child
        .stdout
        .take()
        .ok_or(ProcessBoundaryError::MissingStdout)?;
    let (tx, rx) = mpsc::channel();

    std::thread::spawn(move || {
        let mut reader = BufReader::new(stdout);
        let mut line = String::new();
        let read = reader.read_line(&mut line).map_err(|err| err.to_string());
        let _ = tx.send((read, line));
    });

    match rx.recv_timeout(timeout) {
        Ok((Ok(0), _)) => {
            wait_with_context(&mut child, "failed to wait for process after empty output")?;
            Err(ProcessBoundaryError::EmptyOutput)
        }
        Ok((Ok(_), mut line)) => {
            wait_with_context(
                &mut child,
                "failed to wait for process after reading output",
            )?;
            if line.ends_with('\n') {
                line.pop();
                if line.ends_with('\r') {
                    line.pop();
                }
            }
            if serde_json::from_str::<serde_json::Value>(&line).is_err() {
                return Err(ProcessBoundaryError::InvalidJsonLine);
            }
            Ok(line)
        }
        Ok((Err(err), _)) => {
            wait_with_context(
                &mut child,
                "failed to wait for process after stdout read failure",
            )?;
            Err(ProcessBoundaryError::Io(err))
        }
        Err(_) => {
            let timeout_ms = timeout.as_millis() as u64;
            cleanup_timeout_with_process(&mut child, timeout_ms)?;
            Err(ProcessBoundaryError::Timeout { timeout_ms })
        }
    }
}

fn wait_with_context(child: &mut Child, context: &str) -> Result<(), ProcessBoundaryError> {
    child
        .wait()
        .map(|_| ())
        .map_err(|err| ProcessBoundaryError::Io(format!("{context}: {err}")))
}

fn cleanup_timeout_with_process(
    child: &mut Child,
    timeout_ms: u64,
) -> Result<(), ProcessBoundaryError> {
    match child.try_wait() {
        Ok(Some(_)) => return Ok(()),
        Ok(None) => {}
        Err(error) => {
            return Err(ProcessBoundaryError::Io(format!(
                "failed to probe process status after timeout({timeout_ms}ms): {error}"
            )));
        }
    }

    if let Err(kill_error) = child.kill() {
        return match child.try_wait() {
            Ok(Some(_)) => Ok(()),
            Ok(None) => Err(ProcessBoundaryError::Io(format!(
                "failed to kill process after timeout({timeout_ms}ms): {kill_error}"
            ))),
            Err(probe_error) => Err(ProcessBoundaryError::Io(format!(
                "failed to kill process after timeout({timeout_ms}ms): {kill_error}; failed to probe process status: {probe_error}"
            ))),
        };
    }

    child.wait().map(|_| ()).map_err(|error| {
        ProcessBoundaryError::Io(format!(
            "failed to wait for process after timeout kill({timeout_ms}ms): {error}"
        ))
    })
}

#[cfg(test)]
fn cleanup_timeout_with_ops<FTryWait, FKill, FWait>(
    timeout_ms: u64,
    mut try_wait: FTryWait,
    mut kill: FKill,
    mut wait: FWait,
) -> Result<(), ProcessBoundaryError>
where
    FTryWait: FnMut() -> std::io::Result<Option<()>>,
    FKill: FnMut() -> std::io::Result<()>,
    FWait: FnMut() -> std::io::Result<()>,
{
    match try_wait() {
        Ok(Some(())) => return Ok(()),
        Ok(None) => {}
        Err(error) => {
            return Err(ProcessBoundaryError::Io(format!(
                "failed to probe process status after timeout({timeout_ms}ms): {error}"
            )));
        }
    }

    if let Err(kill_error) = kill() {
        return match try_wait() {
            Ok(Some(())) => Ok(()),
            Ok(None) => Err(ProcessBoundaryError::Io(format!(
                "failed to kill process after timeout({timeout_ms}ms): {kill_error}"
            ))),
            Err(probe_error) => Err(ProcessBoundaryError::Io(format!(
                "failed to kill process after timeout({timeout_ms}ms): {kill_error}; failed to probe process status: {probe_error}"
            ))),
        };
    }

    wait().map_err(|error| {
        ProcessBoundaryError::Io(format!(
            "failed to wait for process after timeout kill({timeout_ms}ms): {error}"
        ))
    })
}

#[cfg(test)]
#[allow(clippy::expect_used, clippy::panic)]
mod tests {
    use std::io;

    use super::{ProcessBoundaryError, cleanup_timeout_with_ops};

    #[test]
    fn timeout_cleanup_handles_kill_race_without_type_drift() {
        let mut try_wait_results = vec![Ok(None), Ok(Some(()))].into_iter();
        let mut kill_attempts = 0;
        let result = cleanup_timeout_with_ops(
            25,
            || try_wait_results.next().expect("try_wait result"),
            || {
                kill_attempts += 1;
                Err(io::Error::new(io::ErrorKind::NotFound, "already exited"))
            },
            || panic!("wait must not run when process already exited"),
        );

        assert_eq!(kill_attempts, 1);
        assert_eq!(result, Ok(()));
    }

    #[test]
    fn timeout_cleanup_returns_io_on_fatal_kill_failure() {
        let mut try_wait_results = vec![Ok(None), Ok(None)].into_iter();
        let result = cleanup_timeout_with_ops(
            25,
            || try_wait_results.next().expect("try_wait result"),
            || {
                Err(io::Error::new(
                    io::ErrorKind::PermissionDenied,
                    "permission denied",
                ))
            },
            || Ok(()),
        );

        assert!(matches!(
            result,
            Err(ProcessBoundaryError::Io(message))
                if message.contains("failed to kill process after timeout(25ms)")
        ));
    }
}