mod common;
use std::sync::{Arc, Mutex};
use agent_os_client::{AgentOs, ClientError, ExecOptions, SpawnOptions, StdinInput};
use futures::StreamExt;
async fn commands_available(os: &AgentOs) -> Option<String> {
let result = os.exec("echo", ExecOptions::default()).await;
match result {
Ok(res) if res.exit_code == 0 => Some(res.stdout),
Ok(_) | Err(_) => None,
}
}
#[tokio::test]
async fn process_surface_exec_spawn_and_snapshot() {
if !common::sidecar_available() {
eprintln!("skipping process_surface_exec_spawn_and_snapshot: sidecar binary not built");
return;
}
let os = common::new_vm().await;
const MISSING_PID: u32 = 999_999;
assert!(
os.list_processes().is_empty(),
"a fresh VM has no SDK-spawned processes"
);
assert!(
matches!(os.get_process(MISSING_PID), Err(ClientError::ProcessNotFound(p)) if p == MISSING_PID),
"get_process(unknown) must return ProcessNotFound"
);
assert!(
matches!(
os.write_process_stdin(MISSING_PID, StdinInput::Text("x".to_string())),
Err(ClientError::ProcessNotFound(_))
),
"write_process_stdin(unknown) must return ProcessNotFound"
);
assert!(
matches!(os.close_process_stdin(MISSING_PID), Err(ClientError::ProcessNotFound(_))),
"close_process_stdin(unknown) must return ProcessNotFound"
);
assert!(
matches!(os.stop_process(MISSING_PID), Err(ClientError::ProcessNotFound(_))),
"stop_process(unknown) must return ProcessNotFound"
);
assert!(
matches!(os.kill_process(MISSING_PID), Err(ClientError::ProcessNotFound(_))),
"kill_process(unknown) must return ProcessNotFound"
);
assert!(
matches!(os.on_process_stdout(MISSING_PID), Err(ClientError::ProcessNotFound(_))),
"on_process_stdout(unknown) must return ProcessNotFound"
);
assert!(
matches!(os.wait_process(MISSING_PID).await, Err(ClientError::ProcessNotFound(_))),
"wait_process(unknown) must return ProcessNotFound"
);
let all = os.all_processes().await.expect("all_processes snapshot");
let tree = os.process_tree().await.expect("process_tree snapshot");
assert!(
all.len() >= tree.len(),
"the process forest cannot have more roots than total processes"
);
if commands_available(&os).await.is_none() {
eprintln!(
"skipping process_surface_exec_spawn_and_snapshot: WASM command packages (sh/echo) \
not present in this environment"
);
os.shutdown().await.expect("shutdown");
return;
}
let echoed = os
.exec(
"cat",
ExecOptions {
stdin: Some(StdinInput::Text("hello-stdout".to_string())),
..Default::default()
},
)
.await
.expect("exec cat");
assert_eq!(echoed.exit_code, 0, "cat should exit 0");
assert_eq!(
echoed.stdout, "hello-stdout",
"cat must echo its stdin verbatim to stdout"
);
assert!(echoed.stderr.is_empty(), "cat should not write stderr");
let streamed = Arc::new(Mutex::new(Vec::<u8>::new()));
let streamed_cb = Arc::clone(&streamed);
let res = os
.exec(
"cat",
ExecOptions {
stdin: Some(StdinInput::Text("stream-me".to_string())),
on_stdout: Some(Box::new(move |chunk: &[u8]| {
streamed_cb.lock().unwrap().extend_from_slice(chunk);
})),
..Default::default()
},
)
.await
.expect("exec cat with on_stdout");
assert_eq!(res.exit_code, 0);
assert_eq!(
&*streamed.lock().unwrap(),
b"stream-me",
"on_stdout must receive the streamed bytes during the exec"
);
let binary: Vec<u8> = vec![0, 159, 146, 150, 255, 254, 1, 2, 3];
let captured = Arc::new(Mutex::new(Vec::<u8>::new()));
let captured_cb = Arc::clone(&captured);
let bin_input = binary.clone();
let res = os
.exec(
"cat",
ExecOptions {
stdin: Some(StdinInput::Bytes(bin_input)),
on_stdout: Some(Box::new(move |chunk: &[u8]| {
captured_cb.lock().unwrap().extend_from_slice(chunk);
})),
..Default::default()
},
)
.await
.expect("exec cat binary");
assert_eq!(res.exit_code, 0);
assert_eq!(
&*captured.lock().unwrap(),
&binary,
"binary stdout must round-trip byte-for-byte through on_stdout"
);
let handle = os
.spawn("cat", Vec::new(), SpawnOptions::default())
.expect("spawn cat");
assert!(
handle.pid >= 1_000_000,
"spawn pid is drawn from the synthetic pid space (>= SYNTHETIC_PID_BASE), got {}",
handle.pid
);
let mut stdout = os
.on_process_stdout(handle.pid)
.expect("subscribe spawn stdout");
let info = os.get_process(handle.pid).expect("get_process");
assert_eq!(info.pid, handle.pid);
assert_eq!(info.command, "cat");
assert!(info.running, "freshly spawned process should be running");
assert!(
os.list_processes().iter().any(|p| p.pid == handle.pid),
"spawned process must appear in list_processes"
);
os.write_process_stdin(handle.pid, StdinInput::Text("spawned-input".to_string()))
.expect("write stdin");
os.close_process_stdin(handle.pid).expect("close stdin");
let collected = tokio::time::timeout(std::time::Duration::from_secs(10), async {
let mut buf = Vec::<u8>::new();
while let Some(chunk) = stdout.next().await {
buf.extend_from_slice(&chunk);
}
buf
})
.await
.expect("spawn stdout did not close within timeout");
assert_eq!(
collected, b"spawned-input",
"spawned cat must echo the written stdin to its stdout stream"
);
let exit_code = tokio::time::timeout(
std::time::Duration::from_secs(10),
os.wait_process(handle.pid),
)
.await
.expect("wait_process timed out")
.expect("wait_process");
assert_eq!(exit_code, 0, "cat should exit 0 after EOF");
let all = os.all_processes().await.expect("all_processes");
let tree = os.process_tree().await.expect("process_tree");
assert!(
all.len() >= tree.len(),
"the process forest cannot contain more roots than total processes"
);
let flat_pids: std::collections::BTreeSet<u32> = all.iter().map(|p| p.pid).collect();
for root in &tree {
assert!(
flat_pids.contains(&root.info.pid),
"every process_tree root must exist in all_processes"
);
}
os.shutdown().await.expect("shutdown");
}