use std::process::{Child, ChildStderr, ChildStdout, Command, Stdio};
use std::time::Instant;
use crate::audit::SpawnRecord;
use crate::nonblock_fd::set_nonblocking_fd;
use crate::outstanding_table::InsertError as OutstandingInsertError;
use std::os::unix::io::AsRawFd;
use super::env;
use super::{Recovery, RecoveryMode, RecoveryOutcome};
pub(super) struct Outstanding {
pub(super) child: Child,
pub(super) spawned_at: Instant,
pub(super) killed: bool,
pub(super) wallclock_at_spawn_ms: u64,
pub(super) stdout_handle: Option<ChildStdout>,
pub(super) stderr_handle: Option<ChildStderr>,
pub(super) stdout_len: u32,
pub(super) stderr_len: u32,
pub(super) truncated: bool,
}
pub(super) fn take_capture_handles(
child: &mut Child,
capture_on: bool,
) -> (Option<ChildStdout>, Option<ChildStderr>) {
if !capture_on {
return (None, None);
}
let out = child.stdout.take().map(|h| {
let _ = set_nonblocking_fd(h.as_raw_fd());
h
});
let err = child.stderr.take().map(|h| {
let _ = set_nonblocking_fd(h.as_raw_fd());
h
});
(out, err)
}
impl Recovery {
pub(super) fn spawn_exec_child(
&mut self,
pid: u32,
wallclock_ms: u64,
now: Instant,
) -> RecoveryOutcome {
let capture_on = self.capture_cap > 0;
match &self.mode {
RecoveryMode::Exec { program, args } => {
let pid_str = pid.to_string();
let substituted: Vec<String> = std::iter::once(program.clone())
.chain(args.iter().map(|a| a.replace("{pid}", &pid_str)))
.collect();
let mut cmd = Command::new(&substituted[0]);
env::apply_env(&mut cmd, self.recovery_inherit_env, &self.recovery_env);
for arg in &substituted[1..] {
cmd.arg(arg);
}
if capture_on {
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
}
let template_len: u32 = substituted
.iter()
.map(|a| a.len() as u32 + 1)
.sum::<u32>()
.saturating_sub(1);
match cmd.spawn() {
Ok(mut child) => {
let child_pid = child.id();
let (out_handle, err_handle) = take_capture_handles(&mut child, capture_on);
self.emit_spawn_audit(
wallclock_ms,
pid,
child_pid,
"exec",
substituted[0].as_str(),
template_len,
);
match self.outstanding.try_insert(
pid,
Outstanding {
child,
spawned_at: now,
killed: false,
wallclock_at_spawn_ms: wallclock_ms,
stdout_handle: out_handle,
stderr_handle: err_handle,
stdout_len: 0,
stderr_len: 0,
truncated: false,
},
) {
Ok(()) => RecoveryOutcome::Spawned { child_pid },
Err(OutstandingInsertError::AlreadyPresent) => {
debug_assert!(
false,
"OutstandingTable::try_insert returned AlreadyPresent \
after the `contains` guard above",
);
RecoveryOutcome::Spawned { child_pid }
}
Err(OutstandingInsertError::Full) => {
self.refused_outstanding_capacity =
self.refused_outstanding_capacity.saturating_add(1);
RecoveryOutcome::RefusedOutstandingCapacity { pid }
}
}
}
Err(e) => RecoveryOutcome::SpawnFailed(e),
}
}
}
}
fn emit_spawn_audit(
&mut self,
wallclock_ms: u64,
agent_pid: u32,
child_pid: u32,
mode: &str,
program: &str,
template_len: u32,
) {
let source = self.source.clone();
let Some(sink) = self.audit_sink.as_mut() else {
return;
};
sink.record_spawn(&SpawnRecord {
wallclock_ms,
observer_ns: 0,
agent_pid,
child_pid,
mode,
program,
source: &source,
template_len,
});
}
}