use running_process::daemon::client::{DaemonClient, SpawnCommandRequest};
use running_process::proto::daemon::StatusCode;
use super::{scaled, start_server_with_tempdb};
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn subprocess_stdout_high_volume_does_not_block() {
let scope = format!("seam-stdout-vol-{}", line!());
let (server_handle, socket, _tmp_dir) = start_server_with_tempdb(&scope);
tokio::time::sleep(scaled(std::time::Duration::from_millis(300))).await;
let workdir = tempfile::tempdir().expect("tempdir");
let done = workdir.path().join("done.txt");
let done_str = done.to_string_lossy().into_owned();
let command = if cfg!(windows) {
format!(
"for /L %i in (1,1,256) do @echo \
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx & echo done> \"{done_str}\""
)
} else {
format!(
"i=0; while [ $i -lt 256 ]; do \
printf '%s\\n' $(yes x | head -c 1024 | tr -d '\\n'); \
i=$((i+1)); done; echo done > \"{done_str}\""
)
};
let socket_for_client = socket.clone();
let done_for_client = done.clone();
let result = tokio::task::spawn_blocking(move || {
let mut client = DaemonClient::connect_to(&socket_for_client).expect("connect");
let spawned = client
.spawn_command(&SpawnCommandRequest::shell(command))
.expect("spawn_command");
assert!(spawned.pid > 0);
let deadline = std::time::Instant::now() + scaled(std::time::Duration::from_secs(10));
while !done_for_client.exists() && std::time::Instant::now() < deadline {
std::thread::sleep(std::time::Duration::from_millis(50));
}
assert!(
done_for_client.exists(),
"subprocess never reached the done marker — stdout-to-NUL \
must not be blocking the shell loop"
);
let _ = client.shutdown(true, 5.0);
})
.await;
result.expect("client task");
let _ = tokio::time::timeout(scaled(std::time::Duration::from_secs(5)), server_handle).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn subprocess_stdin_returns_eof() {
let scope = format!("seam-stdin-eof-{}", line!());
let (server_handle, socket, _tmp_dir) = start_server_with_tempdb(&scope);
tokio::time::sleep(scaled(std::time::Duration::from_millis(300))).await;
let workdir = tempfile::tempdir().expect("tempdir");
let done = workdir.path().join("read-completed.txt");
let done_str = done.to_string_lossy().into_owned();
let command = if cfg!(windows) {
format!("set /p var= & echo done> \"{done_str}\"")
} else {
format!("read line || true; printf done > \"{done_str}\"")
};
let socket_for_client = socket.clone();
let done_for_client = done.clone();
let result = tokio::task::spawn_blocking(move || {
let mut client = DaemonClient::connect_to(&socket_for_client).expect("connect");
let _spawned = client
.spawn_command(&SpawnCommandRequest::shell(command))
.expect("spawn_command");
let deadline = std::time::Instant::now() + scaled(std::time::Duration::from_secs(5));
while !done_for_client.exists() && std::time::Instant::now() < deadline {
std::thread::sleep(std::time::Duration::from_millis(50));
}
assert!(
done_for_client.exists(),
"subprocess hung reading stdin — should return EOF immediately"
);
let _ = client.shutdown(true, 5.0);
})
.await;
result.expect("client task");
let _ = tokio::time::timeout(scaled(std::time::Duration::from_secs(5)), server_handle).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn concurrent_subprocesses_write_distinct_files() {
let scope = format!("seam-concurrent-{}", line!());
let (server_handle, socket, _tmp_dir) = start_server_with_tempdb(&scope);
tokio::time::sleep(scaled(std::time::Duration::from_millis(300))).await;
let workdir = tempfile::tempdir().expect("tempdir");
let files: Vec<std::path::PathBuf> = (0..3)
.map(|i| workdir.path().join(format!("out-{i}.txt")))
.collect();
let expected: Vec<String> = (0..3).map(|i| format!("payload-{i}")).collect();
let socket_for_client = socket.clone();
let files_for_client = files.clone();
let expected_for_client = expected.clone();
let result = tokio::task::spawn_blocking(move || {
let mut client = DaemonClient::connect_to(&socket_for_client).expect("connect");
for (path, payload) in files_for_client.iter().zip(expected_for_client.iter()) {
let path_str = path.to_string_lossy().into_owned();
let command = if cfg!(windows) {
format!("echo {payload}> \"{path_str}\"")
} else {
format!("printf '%s' '{payload}' > \"{path_str}\"")
};
let spawned = client
.spawn_command(&SpawnCommandRequest::shell(command))
.expect("spawn_command");
assert!(spawned.pid > 0);
}
let deadline = std::time::Instant::now() + scaled(std::time::Duration::from_secs(10));
while !files_for_client.iter().all(|p| p.exists()) && std::time::Instant::now() < deadline {
std::thread::sleep(std::time::Duration::from_millis(50));
}
for (path, expected) in files_for_client.iter().zip(expected_for_client.iter()) {
let contents =
std::fs::read_to_string(path).unwrap_or_else(|e| panic!("read {path:?}: {e}"));
assert!(
contents.contains(expected),
"expected {expected:?} in {path:?}, got {contents:?}"
);
}
let _ = client.shutdown(true, 5.0);
})
.await;
result.expect("client task");
let _ = tokio::time::timeout(scaled(std::time::Duration::from_secs(5)), server_handle).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn subprocess_outlives_daemon_shutdown_without_pipe_dangling() {
let scope = format!("seam-outlives-{}", line!());
let (server_handle, socket, _tmp_dir) = start_server_with_tempdb(&scope);
tokio::time::sleep(scaled(std::time::Duration::from_millis(300))).await;
let workdir = tempfile::tempdir().expect("tempdir");
let marker = workdir.path().join("after-daemon-died.txt");
let marker_str = marker.to_string_lossy().into_owned();
let command = if cfg!(windows) {
format!("ping 127.0.0.1 -n 3 >NUL & echo survived> \"{marker_str}\"")
} else {
format!("sleep 2; printf survived > \"{marker_str}\"")
};
let socket_for_client = socket.clone();
let result = tokio::task::spawn_blocking(move || {
let mut client = DaemonClient::connect_to(&socket_for_client).expect("connect");
let spawned = client
.spawn_command(&SpawnCommandRequest::shell(command))
.expect("spawn_command");
assert!(spawned.pid > 0);
let _ = client.shutdown(true, 5.0);
})
.await;
result.expect("client task");
tokio::time::timeout(scaled(std::time::Duration::from_secs(5)), server_handle)
.await
.expect("server did not stop in time")
.expect("server task panicked");
let deadline = std::time::Instant::now() + scaled(std::time::Duration::from_secs(10));
while !marker.exists() && std::time::Instant::now() < deadline {
std::thread::sleep(std::time::Duration::from_millis(100));
}
let contents = std::fs::read_to_string(&marker).expect("marker should exist");
assert!(
contents.contains("survived"),
"subprocess marker has wrong content: {contents:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn subprocess_with_nonexistent_cwd_returns_spawn_error() {
let scope = format!("seam-bogus-cwd-{}", line!());
let (server_handle, socket, _tmp_dir) = start_server_with_tempdb(&scope);
tokio::time::sleep(scaled(std::time::Duration::from_millis(300))).await;
let bogus = if cfg!(windows) {
std::path::PathBuf::from("Z:\\does\\not\\exist\\at\\all")
} else {
std::path::PathBuf::from("/this/does/not/exist/at/all")
};
let command = "echo should-not-run".to_string();
let socket_for_client = socket.clone();
let bogus_for_client = bogus.clone();
let result = tokio::task::spawn_blocking(move || {
let mut client = DaemonClient::connect_to(&socket_for_client).expect("connect");
let req = SpawnCommandRequest::shell(command).with_cwd(bogus_for_client);
let outcome = client.spawn_command(&req);
assert!(
outcome.is_err(),
"spawn with non-existent cwd should error, got {outcome:?}"
);
let _ = client.shutdown(true, 5.0);
})
.await;
result.expect("client task");
let _ = tokio::time::timeout(scaled(std::time::Duration::from_secs(5)), server_handle).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn spawn_response_pid_is_alive() {
let scope = format!("seam-pid-alive-{}", line!());
let (server_handle, socket, _tmp_dir) = start_server_with_tempdb(&scope);
tokio::time::sleep(scaled(std::time::Duration::from_millis(300))).await;
let command = if cfg!(windows) {
"ping 127.0.0.1 -n 3 >NUL".to_string()
} else {
"sleep 2".to_string()
};
let socket_for_client = socket.clone();
let result = tokio::task::spawn_blocking(move || {
let mut client = DaemonClient::connect_to(&socket_for_client).expect("connect");
let spawned = client
.spawn_command(&SpawnCommandRequest::shell(command))
.expect("spawn_command");
assert!(spawned.pid > 0, "pid must be > 0, got {}", spawned.pid);
std::thread::sleep(scaled(std::time::Duration::from_millis(300)));
let list_resp = client.list_active().expect("list_active");
let processes = list_resp
.list_active
.expect("list_active payload")
.processes;
let tracked = processes
.iter()
.find(|p| p.pid == spawned.pid)
.expect("spawned pid should be tracked");
assert_eq!(tracked.containment, "detached");
let kill_resp = client.kill_tree(spawned.pid, 3.0).expect("kill_tree");
assert_eq!(kill_resp.code, StatusCode::Ok as i32);
let _ = client.shutdown(true, 5.0);
})
.await;
result.expect("client task");
let _ = tokio::time::timeout(scaled(std::time::Duration::from_secs(5)), server_handle).await;
}