#![cfg(all(unix, feature = "subprocess"))]
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, Instant};
use kaish_kernel::{ExecuteOptions, Kernel, KernelConfig};
use kaish_kernel::ast::Value;
mod common_cancel {
use super::*;
use std::fs;
pub fn child_alive(pid: u32) -> bool {
nix::sys::signal::kill(nix::unistd::Pid::from_raw(pid as i32), None).is_ok()
}
pub async fn wait_for_dead(pid: u32, max: Duration) -> bool {
let deadline = Instant::now() + max;
while Instant::now() < deadline {
if !child_alive(pid) {
return true;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
!child_alive(pid)
}
pub fn pid_writer(tmp_dir: &Path, pid_file: &Path, inner: &str) -> std::path::PathBuf {
let stem = pid_file
.file_stem()
.map(|s| s.to_string_lossy().into_owned())
.unwrap_or_else(|| "pid".to_string());
let script_path = tmp_dir.join(format!("pid_writer_{}.sh", stem));
let script = format!(
"#!/bin/bash\necho $$ > {pf}\nexec {inner}\n",
pf = pid_file.display(),
inner = inner,
);
fs::write(&script_path, script).expect("write pid_writer script");
use std::os::unix::fs::PermissionsExt;
let mut perms = fs::metadata(&script_path).expect("stat script").permissions();
perms.set_mode(0o755);
fs::set_permissions(&script_path, perms).expect("chmod script");
script_path
}
pub fn read_pid(pid_file: &Path) -> Option<u32> {
let s = fs::read_to_string(pid_file).ok()?;
s.trim().parse().ok()
}
pub fn kernel_for_test() -> Arc<Kernel> {
Kernel::new(
KernelConfig::repl()
.with_skip_validation(true)
.with_kill_grace(Duration::from_millis(500)),
)
.expect("kernel")
.into_arc()
}
}
use common_cancel::{child_alive, kernel_for_test, pid_writer, read_pid, wait_for_dead};
#[tokio::test]
async fn request_timeout_kills_external_child() {
let tmp = tempfile::tempdir().expect("tempdir");
let pid_file = tmp.path().join("pid");
let script = pid_writer(tmp.path(), &pid_file, "sleep 60");
let kernel = kernel_for_test();
let result = kernel
.execute_with_options(
&format!("bash {}", script.display()),
ExecuteOptions::new().with_timeout(Duration::from_millis(150)))
.await
.expect("execute");
assert_eq!(result.code, 124, "expected timeout exit 124, got {} err={}", result.code, result.err);
let pid = read_pid(&pid_file).expect("pid_file");
assert!(
wait_for_dead(pid, Duration::from_secs(2)).await,
"child pid {} still alive after timeout + grace",
pid,
);
}
#[tokio::test]
async fn per_call_timeout_overrides_config_default() {
let kernel = Kernel::new(
KernelConfig::repl()
.with_skip_validation(true)
.with_request_timeout(Duration::from_secs(60))
.with_kill_grace(Duration::from_millis(300)),
)
.expect("kernel")
.into_arc();
let tmp = tempfile::tempdir().expect("tempdir");
let pid_file = tmp.path().join("pid");
let script = pid_writer(tmp.path(), &pid_file, "sleep 60");
let result = kernel
.execute_with_options(
&format!("bash {}", script.display()),
ExecuteOptions::new().with_timeout(Duration::from_millis(150)))
.await
.expect("execute");
assert_eq!(result.code, 124);
let pid = read_pid(&pid_file).expect("pid_file");
assert!(
wait_for_dead(pid, Duration::from_secs(2)).await,
"per-call timeout did not kill child {}",
pid,
);
}
#[tokio::test]
async fn zero_duration_timeout_returns_124_without_spawn() {
let tmp = tempfile::tempdir().expect("tempdir");
let pid_file = tmp.path().join("pid");
let script = pid_writer(tmp.path(), &pid_file, "sleep 60");
let kernel = kernel_for_test();
let result = kernel
.execute_with_options(
&format!("bash {}", script.display()),
ExecuteOptions::new().with_timeout(Duration::ZERO))
.await
.expect("execute");
assert_eq!(result.code, 124);
assert!(!pid_file.exists(), "pid_file exists; bash was spawned despite ZERO timeout");
}
#[tokio::test]
async fn kernel_cancel_kills_running_external() {
let tmp = tempfile::tempdir().expect("tempdir");
let pid_file = tmp.path().join("pid");
let script = pid_writer(tmp.path(), &pid_file, "sleep 60");
let kernel = kernel_for_test();
let kernel_clone = kernel.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
kernel_clone.cancel();
});
let result = kernel
.execute(&format!("bash {}", script.display()))
.await
.expect("execute");
assert!(
result.code == 130 || result.code == 143,
"expected 130 or 143, got {}",
result.code,
);
let pid = read_pid(&pid_file).expect("pid_file");
assert!(
wait_for_dead(pid, Duration::from_secs(2)).await,
"Kernel::cancel did not kill external pid {}",
pid,
);
}
#[tokio::test]
async fn timeout_builtin_kills_inner_external() {
let tmp = tempfile::tempdir().expect("tempdir");
let pid_file = tmp.path().join("pid");
let script = pid_writer(tmp.path(), &pid_file, "sleep 60");
let kernel = kernel_for_test();
let result = kernel
.execute(&format!("timeout 1 bash {}", script.display()))
.await
.expect("execute");
assert_eq!(result.code, 124, "expected 124, got code={} err={}", result.code, result.err);
let pid = read_pid(&pid_file).expect("pid_file");
assert!(
wait_for_dead(pid, Duration::from_secs(3)).await,
"timeout builtin left pid {} alive",
pid,
);
}
#[tokio::test]
async fn pipeline_cascade_kills_both_stages() {
let tmp = tempfile::tempdir().expect("tempdir");
let p1 = tmp.path().join("pid1");
let p2 = tmp.path().join("pid2");
let s1 = pid_writer(tmp.path(), &p1, "sleep 60");
let s2 = pid_writer(tmp.path(), &p2, "cat");
let kernel = kernel_for_test();
let result = kernel
.execute_with_options(
&format!("bash {} | bash {}", s1.display(), s2.display()),
ExecuteOptions::new().with_timeout(Duration::from_millis(200)))
.await
.expect("execute");
assert_eq!(result.code, 124);
let pid1 = read_pid(&p1).expect("pid1");
let pid2 = read_pid(&p2).expect("pid2");
assert!(wait_for_dead(pid1, Duration::from_secs(2)).await, "pid1 {} alive", pid1);
assert!(wait_for_dead(pid2, Duration::from_secs(2)).await, "pid2 {} alive", pid2);
}
#[tokio::test]
async fn grace_escalation_sigkills_term_trapping_child() {
let tmp = tempfile::tempdir().expect("tempdir");
let pid_file = tmp.path().join("pid");
let script = pid_writer(tmp.path(), &pid_file, "bash -c 'trap \"\" TERM; sleep 60'");
let kernel = Kernel::new(
KernelConfig::repl()
.with_skip_validation(true)
.with_kill_grace(Duration::from_millis(200)),
)
.expect("kernel")
.into_arc();
let started = Instant::now();
let result = kernel
.execute_with_options(
&format!("bash {}", script.display()),
ExecuteOptions::new().with_timeout(Duration::from_millis(100)))
.await
.expect("execute");
let elapsed = started.elapsed();
assert_eq!(result.code, 124);
let pid = read_pid(&pid_file).expect("pid");
assert!(
wait_for_dead(pid, Duration::from_secs(3)).await,
"TERM-trapping pid {} survived SIGKILL escalation",
pid,
);
assert!(
elapsed < Duration::from_secs(5),
"took too long ({:?}) — escalation may not have fired",
elapsed,
);
}
#[tokio::test]
async fn background_amp_job_survives_parent_cancel() {
let tmp = tempfile::tempdir().expect("tempdir");
let pid_file = tmp.path().join("pid");
let script = pid_writer(tmp.path(), &pid_file, "sleep 5");
let kernel = kernel_for_test();
let _ = kernel
.execute(&format!("bash {} &", script.display()))
.await
.expect("execute");
let started = Instant::now();
while !pid_file.exists() && started.elapsed() < Duration::from_secs(1) {
tokio::time::sleep(Duration::from_millis(20)).await;
}
let pid = read_pid(&pid_file).expect("pid");
kernel.cancel();
tokio::time::sleep(Duration::from_millis(300)).await;
assert!(
child_alive(pid),
"background job pid {} died after parent cancel (should survive)",
pid,
);
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid as i32),
nix::sys::signal::Signal::SIGKILL,
);
}
#[tokio::test]
async fn embedder_cancel_token_does_not_leak_into_kernel_state() {
let tmp = tempfile::tempdir().expect("tempdir");
let pid_file = tmp.path().join("pid");
let script = pid_writer(tmp.path(), &pid_file, "sleep 60");
let kernel = kernel_for_test();
let embedder_token = tokio_util::sync::CancellationToken::new();
let token_clone = embedder_token.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
token_clone.cancel();
});
let result = kernel
.execute_with_options(
&format!("bash {}", script.display()),
ExecuteOptions::new().with_cancel_token(embedder_token.clone()))
.await
.expect("execute");
let pid = read_pid(&pid_file).expect("pid");
assert!(
wait_for_dead(pid, Duration::from_secs(2)).await,
"embedder cancel did not kill pid {}",
pid,
);
assert!(embedder_token.is_cancelled());
let echo_result = kernel.execute("echo ok").await.expect("second execute");
assert!(echo_result.ok(), "kernel leaked embedder token: second call failed code={} err={}",
echo_result.code, echo_result.err);
drop(result);
}
#[tokio::test]
async fn vars_overlay_visible_during_call_and_cleaned_up_after() {
let kernel = kernel_for_test();
let mut vars = std::collections::HashMap::new();
vars.insert("OVERLAY_X".to_string(), Value::String("hello".to_string()));
let result = kernel
.execute_with_options(r#"echo "${OVERLAY_X}""#, ExecuteOptions::new().with_vars(vars))
.await
.expect("execute");
assert!(result.ok(), "echo failed: {}", result.err);
assert_eq!(result.text_out().trim(), "hello");
let after = kernel
.execute_with_options(r#"echo "${OVERLAY_X:-unset}""#, ExecuteOptions::default())
.await
.expect("execute");
assert_eq!(after.text_out().trim(), "unset", "vars overlay leaked between calls");
}
#[tokio::test]
async fn vars_plus_timeout_combo_kills_child_with_vars_visible() {
let tmp = tempfile::tempdir().expect("tempdir");
let pid_file = tmp.path().join("pid");
let inner_path = tmp.path().join("inner.sh");
std::fs::write(
&inner_path,
format!(
"#!/bin/bash\necho $$ > {pf}\necho \"WHO=$WHO\" >> {pf}\nsleep 60\n",
pf = pid_file.display(),
),
)
.expect("write inner");
use std::os::unix::fs::PermissionsExt;
let mut perms = std::fs::metadata(&inner_path).expect("stat").permissions();
perms.set_mode(0o755);
std::fs::set_permissions(&inner_path, perms).expect("chmod");
let kernel = kernel_for_test();
let mut vars = std::collections::HashMap::new();
vars.insert("WHO".to_string(), Value::String("amy".to_string()));
let result = kernel
.execute_with_options(
&format!("bash {}", inner_path.display()),
ExecuteOptions::new()
.with_vars(vars)
.with_timeout(Duration::from_millis(200)))
.await
.expect("execute");
assert_eq!(result.code, 124, "expected 124, got {}", result.code);
let contents = std::fs::read_to_string(&pid_file).expect("read pid_file");
let mut lines = contents.lines();
let pid: u32 = lines.next().expect("pid line").parse().expect("parse pid");
let who_line = lines.next().expect("who line");
assert_eq!(who_line, "WHO=amy", "vars overlay did not reach child env");
assert!(
wait_for_dead(pid, Duration::from_secs(2)).await,
"combo (vars+timeout) left pid {} alive",
pid,
);
}
#[tokio::test]
async fn cancel_token_plus_vars_combo() {
let tmp = tempfile::tempdir().expect("tempdir");
let pid_file = tmp.path().join("pid");
let script = pid_writer(tmp.path(), &pid_file, "sleep 60");
let kernel = kernel_for_test();
let token = tokio_util::sync::CancellationToken::new();
let token_clone = token.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
token_clone.cancel();
});
let mut vars = std::collections::HashMap::new();
vars.insert("FROM_OPTS".to_string(), Value::String("yes".to_string()));
let _ = kernel
.execute_with_options(
&format!("bash {}", script.display()),
ExecuteOptions::new()
.with_vars(vars)
.with_cancel_token(token))
.await
.expect("execute");
let pid = read_pid(&pid_file).expect("pid");
assert!(wait_for_dead(pid, Duration::from_secs(2)).await);
}
#[tokio::test]
async fn scatter_timeout_kills_stuck_workers() {
let kernel = kernel_for_test();
let result = kernel
.execute(
r#"echo "1\n2\n3" | split "\n" | scatter --limit 3 --timeout 300ms | bash -c "sleep 60" | gather --format json"#,
)
.await
.expect("execute");
let output = result.text_out();
let trimmed = output.trim();
assert!(
trimmed.contains("\"timed_out\": true"),
"scatter --timeout did not surface timed_out in JSON: {}",
trimmed,
);
}
#[tokio::test]
async fn on_output_callback_fires_per_statement() {
let kernel = kernel_for_test();
let mut count = 0usize;
let mut cb = |_: &kaish_kernel::interpreter::ExecResult| {
count += 1;
};
let _ = kernel
.execute_with_options_streaming("echo a\necho b\necho c", ExecuteOptions::default(), &mut cb)
.await
.expect("execute");
assert!(count >= 3, "callback fired {} times for 3 statements", count);
}
#[tokio::test]
async fn per_call_cwd_overrides_and_restores() {
let kernel = kernel_for_test();
let original_cwd = kernel.cwd().await;
let tmp = tempfile::tempdir().expect("tempdir");
let scratch = tmp.path().to_path_buf();
let result = kernel
.execute_with_options("pwd", ExecuteOptions::new().with_cwd(scratch.clone()))
.await
.expect("execute");
assert!(result.ok(), "pwd failed: {}", result.err);
let printed = result.text_out().trim().to_string();
let canonical_scratch = scratch.canonicalize().unwrap_or(scratch.clone());
let canonical_printed = std::path::PathBuf::from(&printed)
.canonicalize()
.unwrap_or_else(|_| std::path::PathBuf::from(&printed));
assert_eq!(
canonical_printed, canonical_scratch,
"expected pwd inside the per-call cwd, got {}",
printed,
);
let restored = kernel.cwd().await;
assert_eq!(
restored, original_cwd,
"kernel cwd should be restored after per-call override; was {:?}",
restored,
);
}
#[tokio::test]
async fn timeout_does_not_fire_when_command_finishes_first() {
let kernel = kernel_for_test();
let result = kernel
.execute_with_options("echo done", ExecuteOptions::new().with_timeout(Duration::from_secs(30)))
.await
.expect("execute");
assert!(result.ok(), "expected ok, got {}", result.code);
assert_eq!(result.text_out().trim(), "done");
}