use std::{collections::BTreeMap, path::PathBuf, process::Stdio};
use anyhow::{Context, Result, bail};
use tokio::{io::AsyncReadExt, process::Command};
use crate::{
daemon_state::{DaemonState, RuntimeProcessMetadata},
protocol::{OutputStream, ProcessSummary, RunSpec},
store::Store,
};
#[derive(Debug, Clone)]
pub struct Supervisor {
state: DaemonState,
}
impl Supervisor {
pub fn new(state: DaemonState) -> Self {
Self { state }
}
pub fn spawn(&self, store: Store, spec: RunSpec) -> Result<ProcessSummary> {
let (program, args) = spec
.command
.split_first()
.context("missing command to run")?;
if program.is_empty() {
bail!("missing command to run");
}
let cwd = PathBuf::from(&spec.cwd);
let env = effective_env(&spec)?;
let env_keys = spec
.env
.iter()
.map(|env| env.key.clone())
.collect::<Vec<_>>();
let mut command = Command::new(program);
command
.args(args)
.current_dir(&cwd)
.env_clear()
.envs(env)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.process_group(0);
let spawn_result = command.spawn();
let mut child = match spawn_result {
Ok(child) => child,
Err(error) => {
let message = format!("failed to spawn {}: {error}", spec.command.join(" "));
let _ = store.insert_failed_process(
spec.name.as_deref(),
&spec.command,
&cwd,
&message,
spec.inherit_env,
&spec.env_files,
&env_keys,
);
bail!(message);
}
};
let pid = child.id().context("spawned process did not expose a pid")?;
let stdout = child.stdout.take();
let stderr = child.stderr.take();
let process = store.insert_process_with_timeout(
spec.name.as_deref(),
&spec.command,
&cwd,
pid,
pid,
spec.inherit_env,
&spec.env_files,
&env_keys,
None,
)?;
let process_id = process.id;
let state = self.state.clone();
state.insert_process(RuntimeProcessMetadata {
id: process_id,
pid,
pgid: pid,
});
if let Some(stdout) = stdout {
tokio::spawn(capture_output(
store.clone(),
process_id,
OutputStream::Stdout,
stdout,
));
}
if let Some(stderr) = stderr {
tokio::spawn(capture_output(
store.clone(),
process_id,
OutputStream::Stderr,
stderr,
));
}
tokio::spawn(async move {
let exit_code = match child.wait().await {
Ok(status) => status.code(),
Err(_) => None,
};
let _ = store.mark_process_finished(process_id, exit_code);
state.finish_process(process_id, exit_code);
});
Ok(process)
}
}
fn effective_env(spec: &RunSpec) -> Result<BTreeMap<String, String>> {
let mut env = if spec.inherit_env {
std::env::vars().collect::<BTreeMap<_, _>>()
} else {
BTreeMap::new()
};
for env_file in &spec.env_files {
for (key, value) in read_env_file(env_file)? {
env.insert(key, value);
}
}
for env_var in &spec.env {
env.insert(env_var.key.clone(), env_var.value.clone());
}
Ok(env)
}
fn read_env_file(path: &str) -> Result<Vec<(String, String)>> {
let contents =
std::fs::read_to_string(path).with_context(|| format!("failed to read env file {path}"))?;
let mut values = Vec::new();
for (index, line) in contents.lines().enumerate() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
let Some((key, value)) = line.split_once('=') else {
bail!(
"invalid env file line {} in {path}: expected KEY=VALUE",
index + 1
);
};
if key.is_empty() || key.contains('\0') {
bail!("invalid env key on line {} in {path}", index + 1);
}
values.push((key.to_owned(), value.to_owned()));
}
Ok(values)
}
async fn capture_output<R>(store: Store, process_id: i64, stream: OutputStream, mut reader: R)
where
R: tokio::io::AsyncRead + Unpin,
{
let mut buffer = [0; 8192];
loop {
let bytes_read = match reader.read(&mut buffer).await {
Ok(0) => break,
Ok(bytes_read) => bytes_read,
Err(_) => break,
};
let _ = store.insert_output_chunk(process_id, stream, &buffer[..bytes_read]);
}
}