use std::sync::Arc;
use std::time::Duration;
use kaish_kernel::{Kernel, KernelConfig};
async fn setup() -> Arc<Kernel> {
Kernel::new(KernelConfig::isolated().with_skip_validation(true))
.expect("failed to create kernel")
.into_arc()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_cwd_no_clobber() {
let kernel = setup().await;
for i in 0..8 {
kernel
.execute(&format!("mkdir -p /task-{i}"))
.await
.expect("mkdir");
}
let mut handles = Vec::new();
for i in 0..8 {
let k = Arc::clone(&kernel);
handles.push(tokio::spawn(async move {
for _ in 0..50 {
let script = format!("cd /task-{i}; pwd");
let r = k.execute(&script).await.expect("execute");
let pwd = r.text_out().trim().to_string();
assert_eq!(
pwd, format!("/task-{i}"),
"task {i} saw another task's cwd: {pwd}"
);
}
}));
}
for h in handles {
h.await.expect("task panicked");
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_var_no_clobber() {
let kernel = setup().await;
let mut handles = Vec::new();
for i in 0..8 {
let k = Arc::clone(&kernel);
handles.push(tokio::spawn(async move {
let label = format!("task-{i}");
let script = format!(
"MY_VAR={label}\nfor n in $(seq 1 50); do echo $MY_VAR; done",
);
let r = k.execute(&script).await.expect("execute");
for line in r.text_out().lines() {
assert_eq!(line, label, "task {i} saw wrong value: {line}");
}
}));
}
for h in handles {
h.await.expect("task panicked");
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_alias_no_clobber() {
let kernel = setup().await;
let mut handles = Vec::new();
for i in 0..4 {
let k = Arc::clone(&kernel);
handles.push(tokio::spawn(async move {
let label = format!("task-{i}");
let script = format!("alias greet='echo {label}'\ngreet\ngreet\ngreet");
let r = k.execute(&script).await.expect("execute");
for line in r.text_out().lines() {
assert_eq!(line, label, "task {i} got {line}");
}
}));
}
for h in handles {
h.await.expect("task panicked");
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_stderr_isolation() {
let kernel = setup().await;
let mut handles = Vec::new();
for i in 0..4 {
let k = Arc::clone(&kernel);
handles.push(tokio::spawn(async move {
let tag = format!("tag-{i}");
let script = format!(
"for n in $(seq 1 20); do echo {tag} 1>&2; done",
);
let r = k.execute(&script).await.expect("execute");
for line in r.err.lines().filter(|l| !l.is_empty()) {
assert!(
line.contains(&tag),
"task {i} got stderr from another task: {line}"
);
}
}));
}
for h in handles {
h.await.expect("task panicked");
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn background_job_does_not_block_foreground() {
let kernel = setup().await;
kernel.execute("sleep 0.4 &").await.expect("spawn");
let start = std::time::Instant::now();
let r = kernel.execute("echo ready").await.expect("echo");
let elapsed = start.elapsed();
assert_eq!(r.text_out().trim(), "ready");
assert!(
elapsed < Duration::from_millis(300),
"foreground echo took {elapsed:?} — background job is blocking the lock"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn background_job_runs_user_function() {
let kernel = setup().await;
kernel
.execute("greet() { echo hello-from-user-fn; }")
.await
.expect("define tool");
kernel.execute("greet &").await.expect("run in background");
wait_for_job(&kernel, 1, Duration::from_secs(2)).await;
let r = kernel
.execute("cat /v/jobs/1/stdout")
.await
.expect("read stdout");
assert!(
r.text_out().contains("hello-from-user-fn"),
"background user function didn't run: {}",
r.text_out()
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn background_job_snapshot_isolation() {
let kernel = setup().await;
kernel.execute("VAR=before").await.expect("set");
kernel
.execute("snap() { sleep 0.2; echo $VAR; }")
.await
.expect("define");
kernel.execute("snap &").await.expect("spawn");
kernel.execute("VAR=after").await.expect("mutate");
wait_for_job(&kernel, 1, Duration::from_secs(2)).await;
let r = kernel
.execute("cat /v/jobs/1/stdout")
.await
.expect("read stdout");
assert_eq!(
r.text_out().trim(),
"before",
"background job saw parent's post-spawn mutation"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn parent_does_not_see_background_mutation() {
let kernel = setup().await;
kernel.execute("VAR=parent-value").await.expect("set");
kernel
.execute("mutator() { VAR=set-in-bg; echo done; }")
.await
.expect("define");
kernel.execute("mutator &").await.expect("spawn");
wait_for_job(&kernel, 1, Duration::from_secs(2)).await;
let r = kernel.execute("echo $VAR").await.expect("echo");
assert_eq!(
r.text_out().trim(),
"parent-value",
"background mutation leaked into parent scope"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn scatter_parallel_runs_user_function() {
let kernel = setup().await;
kernel
.execute(r#"shout() { echo "loud-${ITEM}"; }"#)
.await
.expect("define tool");
let r = kernel
.execute(r#"split "a b c" | scatter | shout | gather"#)
.await
.expect("scatter");
assert!(r.ok(), "scatter failed: {}", r.err);
let out = r.text_out();
assert!(out.contains("loud-a"), "missing loud-a: {out}");
assert!(out.contains("loud-b"), "missing loud-b: {out}");
assert!(out.contains("loud-c"), "missing loud-c: {out}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn scatter_plain_text_stdin_fans_out_per_line() {
let kernel = setup().await;
kernel
.execute(r#"shout() { echo "loud-${ITEM}"; }"#)
.await
.expect("define tool");
let r = kernel
.execute(r#"printf "a\nb\nc\n" | scatter --as ITEM | shout | gather"#)
.await
.expect("scatter");
assert!(r.ok(), "scatter failed: {}", r.err);
let out = r.text_out();
assert!(out.contains("loud-a"), "missing loud-a (no fan-out?): {out}");
assert!(out.contains("loud-b"), "missing loud-b: {out}");
assert!(out.contains("loud-c"), "missing loud-c: {out}");
assert!(
!out.contains("loud-a\nb"),
"stdin bound as one item instead of fanning out: {out}"
);
}
async fn wait_for_job(kernel: &Kernel, job_id: u64, timeout: Duration) {
let start = std::time::Instant::now();
let status_cmd = format!("cat /v/jobs/{job_id}/status");
loop {
let r = kernel.execute(&status_cmd).await.expect("status");
let text = r.text_out();
let s = text.trim();
if s.starts_with("done:") || s.starts_with("failed:") {
return;
}
if start.elapsed() > timeout {
panic!("job {job_id} did not complete within {timeout:?}");
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
}