use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration;
use astrid_core::principal::PrincipalId;
use super::{PersistentProcessRegistry, SpawnParams};
fn spawn_raw(
script: &str,
) -> (
tokio::process::Child,
tokio::process::ChildStdout,
tokio::process::ChildStderr,
u32,
) {
let mut std_cmd = std::process::Command::new("sh");
std_cmd
.arg("-c")
.arg(script)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.stdin(Stdio::null());
#[cfg(unix)]
{
use std::os::unix::process::CommandExt as _;
std_cmd.process_group(0);
}
let mut cmd = tokio::process::Command::from(std_cmd);
cmd.kill_on_drop(true);
let mut child = cmd.spawn().expect("spawn test child");
let pid = child.id().expect("child pid");
let stdout = child.stdout.take().expect("stdout pipe");
let stderr = child.stderr.take().expect("stderr pipe");
(child, stdout, stderr, pid)
}
#[allow(clippy::too_many_arguments)]
fn params(
creator: &PrincipalId,
capsule: &str,
child: tokio::process::Child,
stdout: tokio::process::ChildStdout,
stderr: tokio::process::ChildStderr,
os_pid: u32,
concurrent_cap: usize,
) -> SpawnParams {
SpawnParams {
creator: creator.clone(),
capsule_id: Arc::from(capsule),
command: "sh -c <test>".to_string(),
os_pid,
child,
stdout,
stderr,
stdin: None,
concurrent_cap,
label: None,
overflow: None,
log_ring_bytes: None,
max_lifetime_ms: None,
idle_timeout_ms: None,
exit_retention_ms: None,
}
}
fn spawn_raw_stdin(
script: &str,
) -> (
tokio::process::Child,
tokio::process::ChildStdout,
tokio::process::ChildStderr,
tokio::process::ChildStdin,
u32,
) {
let mut std_cmd = std::process::Command::new("sh");
std_cmd
.arg("-c")
.arg(script)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.stdin(Stdio::piped());
#[cfg(unix)]
{
use std::os::unix::process::CommandExt as _;
std_cmd.process_group(0);
}
let mut cmd = tokio::process::Command::from(std_cmd);
cmd.kill_on_drop(true);
let mut child = cmd.spawn().expect("spawn test child");
let pid = child.id().expect("child pid");
let stdout = child.stdout.take().expect("stdout pipe");
let stderr = child.stderr.take().expect("stderr pipe");
let stdin = child.stdin.take().expect("stdin pipe");
(child, stdout, stderr, stdin, pid)
}
async fn drain_stdout_until(
reg: &PersistentProcessRegistry,
id: &str,
principal: &PrincipalId,
capsule: &str,
needle: &str,
) -> String {
let mut stdout = String::new();
for _ in 0..200 {
if let Ok(logs) = reg.read_logs(id, principal, capsule) {
stdout.push_str(&logs.stdout);
if stdout.contains(needle) {
break;
}
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
stdout
}
#[tokio::test]
async fn spawn_wait_read_and_owner_isolation() {
let reg = PersistentProcessRegistry::new(tokio::runtime::Handle::current());
let alice = PrincipalId::new("alice").unwrap();
let bob = PrincipalId::new("bob").unwrap();
let (child, so, se, pid) = spawn_raw("echo hello; echo oops 1>&2; exit 0");
let id = reg
.spawn(params(&alice, "cap-a", child, so, se, pid, 8))
.expect("spawn-persistent");
assert!(reg.status(&id, &alice, "cap-a").is_ok());
assert!(reg.status(&id, &bob, "cap-a").is_err());
assert!(reg.status(&id, &alice, "cap-b").is_err());
assert!(reg.status("not-a-real-id", &alice, "cap-a").is_err());
assert_eq!(reg.status(&id, &alice, "cap-a").unwrap().id, id);
let listed = reg.list(&alice, "cap-a", None);
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].id, id);
let exit = reg
.wait(&id, &alice, "cap-a", Duration::from_secs(5))
.await
.expect("wait");
assert_eq!(exit.exit_code, Some(0));
tokio::time::sleep(Duration::from_millis(200)).await;
let logs = reg.read_logs(&id, &alice, "cap-a").expect("read-logs");
assert!(logs.stdout.contains("hello"), "stdout: {:?}", logs.stdout);
assert!(logs.stderr.contains("oops"), "stderr: {:?}", logs.stderr);
assert!(!logs.running);
assert!(reg.status(&id, &alice, "cap-a").is_ok());
reg.release(&id, &alice, "cap-a").expect("release");
assert!(reg.status(&id, &alice, "cap-a").is_err());
}
#[tokio::test]
async fn concurrent_cap_enforced_and_stop_reaps() {
let reg = PersistentProcessRegistry::new(tokio::runtime::Handle::current());
let p = PrincipalId::new("alice").unwrap();
let (c1, o1, e1, pid1) = spawn_raw("sleep 30");
let id1 = reg
.spawn(params(&p, "cap", c1, o1, e1, pid1, 1))
.expect("first spawn");
let (c2, o2, e2, pid2) = spawn_raw("sleep 30");
let err = reg
.spawn(params(&p, "cap", c2, o2, e2, pid2, 1))
.expect_err("cap should reject");
assert!(
matches!(
err,
crate::engine::wasm::bindings::astrid::process::host::ErrorCode::Quota
),
"expected Quota, got {err:?}"
);
let exit = reg.stop(&id1, &p, "cap", None).await.expect("stop");
assert_ne!(exit.exit_code, Some(0));
assert!(reg.status(&id1, &p, "cap").is_err());
let (c3, o3, e3, pid3) = spawn_raw("sleep 30");
let id3 = reg
.spawn(params(&p, "cap", c3, o3, e3, pid3, 1))
.expect("third spawn after slot freed");
reg.stop(&id3, &p, "cap", None).await.expect("cleanup stop");
}
#[tokio::test]
async fn read_since_is_non_draining_with_cursor() {
use crate::engine::wasm::bindings::astrid::process::host::{LogCursor, LogStream};
let reg = PersistentProcessRegistry::new(tokio::runtime::Handle::current());
let p = PrincipalId::new("alice").unwrap();
let (child, so, se, pid) = spawn_raw("printf 'abcXYZ'; exit 0");
let id = reg
.spawn(params(&p, "cap", child, so, se, pid, 8))
.expect("spawn");
reg.wait(&id, &p, "cap", Duration::from_secs(5))
.await
.expect("wait");
tokio::time::sleep(Duration::from_millis(200)).await;
let chunk = reg
.read_since(
&id,
&p,
"cap",
LogStream::Stdout,
&LogCursor { token: None },
3,
)
.expect("read-since");
assert_eq!(chunk.data, b"abc");
assert_eq!(chunk.bytes_dropped, 0);
let chunk2 = reg
.read_since(&id, &p, "cap", LogStream::Stdout, &chunk.next, 100)
.expect("read-since 2");
assert_eq!(chunk2.data, b"XYZ");
assert!(chunk2.drained_eof);
let from_start = reg
.read_since(
&id,
&p,
"cap",
LogStream::Stdout,
&LogCursor { token: None },
100,
)
.expect("read-since from start again");
assert_eq!(from_start.data, b"abcXYZ");
reg.release(&id, &p, "cap").expect("release");
}
#[tokio::test]
async fn write_stdin_delivers_survives_reset_and_close_eofs() {
use crate::engine::wasm::bindings::astrid::process::host::ErrorCode;
let reg = PersistentProcessRegistry::new(tokio::runtime::Handle::current());
let alice = PrincipalId::new("alice").unwrap();
let bob = PrincipalId::new("bob").unwrap();
let (child, so, se, stdin, pid) =
spawn_raw_stdin("while IFS= read -r line; do echo \"got:$line\"; done");
let spawn_params = SpawnParams {
creator: alice.clone(),
capsule_id: Arc::from("cap"),
command: "sh -c <stdin-echo>".to_string(),
os_pid: pid,
child,
stdout: so,
stderr: se,
stdin: Some(stdin),
concurrent_cap: 8,
label: None,
overflow: None,
log_ring_bytes: None,
max_lifetime_ms: None,
idle_timeout_ms: None,
exit_retention_ms: None,
};
let id = reg
.spawn(spawn_params)
.expect("spawn-persistent with stdin");
assert!(matches!(
reg.write_stdin(&id, &alice, "cap", &vec![0u8; super::MAX_STDIN_WRITE + 1])
.await,
Err(ErrorCode::TooLarge)
));
let n = reg
.write_stdin(&id, &alice, "cap", b"hello\n")
.await
.expect("write 1");
assert_eq!(n, 6);
let stdout = drain_stdout_until(®, &id, &alice, "cap", "got:hello").await;
assert!(stdout.contains("got:hello"), "stdout: {stdout:?}");
let n2 = reg
.write_stdin(&id, &alice, "cap", b"world\n")
.await
.expect("write 2 (post-reset)");
assert_eq!(n2, 6);
let stdout = drain_stdout_until(®, &id, &alice, "cap", "got:world").await;
assert!(stdout.contains("got:world"), "stdout: {stdout:?}");
assert!(matches!(
reg.write_stdin(&id, &bob, "cap", b"x\n").await,
Err(ErrorCode::NoSuchProcess)
));
reg.close_stdin(&id, &alice, "cap").expect("close-stdin");
let exit = reg
.wait(&id, &alice, "cap", Duration::from_secs(5))
.await
.expect("wait after close-stdin");
assert_eq!(exit.exit_code, Some(0), "child exits cleanly on stdin EOF");
assert!(matches!(
reg.write_stdin(&id, &alice, "cap", b"late\n").await,
Err(ErrorCode::Closed)
));
reg.release(&id, &alice, "cap").expect("release");
}