use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use astrid_core::principal::PrincipalId;
use tokio::sync::watch;
use crate::engine::wasm::bindings::astrid::process::host::{
ErrorCode, ExitInfo, ProcessInfo, ProcessPhase, ProcessSignal,
};
use super::ring::{LogRing, Stream};
#[derive(Clone, Copy)]
pub(super) struct ExitRecord {
pub(super) exit_code: Option<i32>,
pub(super) signal: Option<i32>,
}
impl From<ExitRecord> for ExitInfo {
fn from(e: ExitRecord) -> Self {
ExitInfo {
exit_code: e.exit_code,
signal: e.signal,
}
}
}
#[derive(Clone, Copy, PartialEq, Eq)]
pub(super) enum Phase {
Running,
Exited,
}
impl From<Phase> for ProcessPhase {
fn from(p: Phase) -> Self {
match p {
Phase::Running => ProcessPhase::Running,
Phase::Exited => ProcessPhase::Exited,
}
}
}
pub(super) struct ProcessCore {
pub(super) phase: Phase,
pub(super) exit: Option<ExitRecord>,
pub(super) exited_at: Option<Instant>,
pub(super) stdout: LogRing,
pub(super) stderr: LogRing,
pub(super) stdin: Option<tokio::process::ChildStdin>,
pub(super) stdin_open: bool,
pub(super) last_touch: Instant,
}
pub(super) struct PersistentEntry {
pub(super) id: String,
pub(super) creator: PrincipalId,
pub(super) capsule_id: Arc<str>,
pub(super) label: String,
pub(super) command: String,
pub(super) os_pid: u32,
pub(super) spawned_at: Instant,
pub(super) max_lifetime: Duration,
pub(super) idle_timeout: Duration,
pub(super) exit_retention: Duration,
pub(super) core: Arc<Mutex<ProcessCore>>,
pub(super) exit_rx: watch::Receiver<Option<ExitRecord>>,
pub(super) monitor: tokio::task::JoinHandle<()>,
}
impl PersistentEntry {
pub(super) fn is_live(&self) -> bool {
self.core
.lock()
.map(|c| c.phase != Phase::Exited)
.unwrap_or(false)
}
pub(super) fn info(&self) -> ProcessInfo {
let c = self.core.lock().unwrap_or_else(|e| e.into_inner());
let now = Instant::now();
let running = c.phase != Phase::Exited;
ProcessInfo {
id: self.id.clone(),
label: self.label.clone(),
command: self.command.clone(),
os_pid: running.then_some(self.os_pid),
phase: c.phase.into(),
exit: c.exit.map(Into::into),
age_ms: now.saturating_duration_since(self.spawned_at).as_millis() as u64,
idle_ms: now.saturating_duration_since(c.last_touch).as_millis() as u64,
buffered_bytes: (c.stdout.len() + c.stderr.len()) as u64,
bytes_dropped: c.stdout.overflow_dropped + c.stderr.overflow_dropped,
stdin_open: c.stdin_open,
cpu_ms: None, mem_bytes_peak: None, }
}
}
pub(super) struct Resolved {
pub(super) key: [u8; 32],
pub(super) core: Arc<Mutex<ProcessCore>>,
pub(super) exit_rx: watch::Receiver<Option<ExitRecord>>,
pub(super) os_pid: u32,
}
pub(super) fn current_exit(core: &Arc<Mutex<ProcessCore>>) -> Option<ExitRecord> {
core.lock().ok().and_then(|c| c.exit)
}
pub(super) async fn wait_for_exit(
rx: &mut watch::Receiver<Option<ExitRecord>>,
) -> Option<ExitRecord> {
if let Some(e) = *rx.borrow() {
return Some(e);
}
loop {
if rx.changed().await.is_err() {
return None;
}
if let Some(e) = *rx.borrow() {
return Some(e);
}
}
}
pub(super) fn spawn_monitor(
runtime: &tokio::runtime::Handle,
mut child: tokio::process::Child,
core: Arc<Mutex<ProcessCore>>,
exit_tx: watch::Sender<Option<ExitRecord>>,
) -> tokio::task::JoinHandle<()> {
runtime.spawn(async move {
let status = child.wait().await;
let record = match status {
Ok(st) => ExitRecord {
exit_code: st.code(),
signal: exit_signal(&st),
},
Err(_) => ExitRecord {
exit_code: Some(-1),
signal: None,
},
};
if let Ok(mut c) = core.lock() {
c.phase = Phase::Exited;
c.exit = Some(record);
c.exited_at = Some(Instant::now());
c.stdin = None;
c.stdin_open = false;
}
let _ = exit_tx.send(Some(record));
})
}
#[cfg(unix)]
fn exit_signal(st: &std::process::ExitStatus) -> Option<i32> {
use std::os::unix::process::ExitStatusExt as _;
st.signal()
}
#[cfg(not(unix))]
fn exit_signal(_st: &std::process::ExitStatus) -> Option<i32> {
None
}
pub(super) const READER_CHUNK_BYTES: usize = 4096;
pub(super) fn spawn_ring_reader<R>(
runtime: &tokio::runtime::Handle,
mut pipe: R,
core: Arc<Mutex<ProcessCore>>,
which: Stream,
) where
R: tokio::io::AsyncReadExt + Unpin + Send + 'static,
{
runtime.spawn(async move {
let mut chunk = vec![0u8; READER_CHUNK_BYTES];
loop {
match pipe.read(&mut chunk).await {
Ok(0) => break,
Ok(n) => {
let mut accepted = false;
while !accepted {
{
let mut c = core.lock().unwrap_or_else(|e| e.into_inner());
let ring = match which {
Stream::Out => &mut c.stdout,
Stream::Err => &mut c.stderr,
};
accepted = ring.push(&chunk[..n]);
}
if !accepted {
tokio::time::sleep(Duration::from_millis(25)).await;
}
}
},
Err(_) => break,
}
}
});
}
pub(super) fn reap_entry(entry: PersistentEntry) {
if entry.is_live() {
let _ = send_signal(entry.os_pid, nix::sys::signal::Signal::SIGKILL);
}
entry.monitor.abort();
}
pub(super) fn map_signal(sig: ProcessSignal) -> nix::sys::signal::Signal {
use nix::sys::signal::Signal;
match sig {
ProcessSignal::Term => Signal::SIGTERM,
ProcessSignal::Hup => Signal::SIGHUP,
ProcessSignal::Usr1 => Signal::SIGUSR1,
ProcessSignal::Usr2 => Signal::SIGUSR2,
ProcessSignal::Int => Signal::SIGINT,
ProcessSignal::Stop => Signal::SIGSTOP,
ProcessSignal::Cont => Signal::SIGCONT,
}
}
pub(super) fn send_signal(pid: u32, sig: nix::sys::signal::Signal) -> Result<(), ErrorCode> {
if pid == 0 {
return Err(ErrorCode::Closed);
}
#[cfg(unix)]
{
let raw = i32::try_from(pid).map_err(|_| ErrorCode::InvalidInput)?;
let target = nix::unistd::Pid::from_raw(raw);
if nix::sys::signal::killpg(target, sig).is_err() {
nix::sys::signal::kill(target, sig)
.map_err(|e| ErrorCode::Unknown(format!("signal {sig:?}: {e}")))?;
}
Ok(())
}
#[cfg(not(unix))]
{
let _ = (pid, sig);
Err(ErrorCode::Unknown(
"process signals unsupported on this platform".to_string(),
))
}
}