#![cfg(feature = "daemon")]
use running_process::daemon::client::DaemonClient;
use running_process::daemon::paths;
use running_process::daemon::pipe_session::PipeSpawnRequest;
use running_process::daemon::server::DaemonServer;
use std::path::PathBuf;
use std::process::Command;
use std::time::{Duration, Instant};
fn testbin_path(name: &str) -> PathBuf {
let output = Command::new(env!("CARGO"))
.args([
"build",
"-p",
"testbins",
"--bin",
name,
"--message-format=json",
])
.stderr(std::process::Stdio::inherit())
.output()
.expect("cargo build failed");
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines() {
if !line.contains("\"compiler-artifact\"") || !line.contains(name) {
continue;
}
if let Ok(v) = serde_json::from_str::<serde_json::Value>(line) {
if v["reason"] == "compiler-artifact"
&& v["target"]["kind"]
.as_array()
.is_some_and(|a| a.iter().any(|k| k == "bin"))
{
if let Some(exe) = v["executable"].as_str() {
let p = PathBuf::from(exe);
let deadline = Instant::now() + Duration::from_secs(5);
while !p.exists() && Instant::now() < deadline {
std::thread::sleep(Duration::from_millis(50));
}
return p;
}
}
}
}
panic!("could not find binary artifact for {name}");
}
fn start_server(scope: &str) -> (tokio::task::JoinHandle<()>, String) {
let socket = paths::socket_path(Some(scope));
let db = paths::db_path(Some(scope)).to_string_lossy().into_owned();
let server = DaemonServer::new(
socket.clone(),
db,
"bulk-ops-test".to_string(),
scope.to_string(),
std::env::current_dir()
.unwrap_or_default()
.to_string_lossy()
.into_owned(),
)
.expect("DaemonServer::new");
let handle = tokio::spawn(async move {
server.run().await.expect("server.run");
});
(handle, socket)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn purge_removes_only_exited_sessions() {
let scope = format!("bulk-purge-{}", line!());
let (_handle, socket) = start_server(&scope);
tokio::time::sleep(Duration::from_millis(300)).await;
let env_reporter = tokio::task::spawn_blocking(|| testbin_path("testbin-env-reporter"))
.await
.expect("testbin");
let socket_for_test = socket.clone();
tokio::task::spawn_blocking(move || {
let mut client = DaemonClient::connect_to(&socket_for_test).expect("connect");
let alive = client
.spawn_pipe_session(
&PipeSpawnRequest::new([env_reporter.to_string_lossy().into_owned()])
.with_originator("bulk-purge"),
)
.expect("spawn alive");
let to_terminate = client
.spawn_pipe_session(
&PipeSpawnRequest::new([env_reporter.to_string_lossy().into_owned()])
.with_originator("bulk-purge"),
)
.expect("spawn to_terminate");
client
.terminate_pipe_session(&to_terminate.session_id, 500)
.expect("terminate");
let deadline = Instant::now() + Duration::from_secs(10);
loop {
let listed = client.list_pipe_sessions("bulk-purge").expect("list");
if let Some(e) = listed
.iter()
.find(|s| s.session_id == to_terminate.session_id)
{
if e.exited {
break;
}
}
if Instant::now() >= deadline {
panic!("session did not exit");
}
std::thread::sleep(Duration::from_millis(200));
}
let purged = client.purge_exited_sessions("bulk-purge").expect("purge");
assert_eq!(purged.pty_purged, 0);
assert_eq!(purged.pipe_purged, 1);
let remaining = client.list_pipe_sessions("bulk-purge").expect("list after");
assert_eq!(remaining.len(), 1);
assert_eq!(remaining[0].session_id, alive.session_id);
client
.terminate_pipe_session(&alive.session_id, 500)
.expect("terminate cleanup");
})
.await
.expect("blocking task");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn bulk_terminate_older_than_zero_terminates_everything_in_scope() {
let scope = format!("bulk-kill-{}", line!());
let (_handle, socket) = start_server(&scope);
tokio::time::sleep(Duration::from_millis(300)).await;
let env_reporter = tokio::task::spawn_blocking(|| testbin_path("testbin-env-reporter"))
.await
.expect("testbin");
let socket_for_test = socket.clone();
tokio::task::spawn_blocking(move || {
let mut client = DaemonClient::connect_to(&socket_for_test).expect("connect");
let mut ids = Vec::new();
for _ in 0..3 {
let session = client
.spawn_pipe_session(
&PipeSpawnRequest::new([env_reporter.to_string_lossy().into_owned()])
.with_originator("bulk-kill"),
)
.expect("spawn");
ids.push(session.session_id);
}
let untouched = client
.spawn_pipe_session(
&PipeSpawnRequest::new([env_reporter.to_string_lossy().into_owned()])
.with_originator("other"),
)
.expect("spawn untouched");
ids.push(untouched.session_id.clone());
let result = client
.bulk_terminate_sessions(0, "bulk-kill", 500)
.expect("bulk terminate");
assert_eq!(result.pty_terminated, 0);
assert_eq!(result.pipe_terminated, 3);
let deadline = Instant::now() + Duration::from_secs(15);
loop {
let listed = client.list_pipe_sessions("bulk-kill").expect("list");
if listed.iter().all(|s| s.exited) {
break;
}
if Instant::now() >= deadline {
panic!("bulk-killed sessions did not exit");
}
std::thread::sleep(Duration::from_millis(200));
}
let other = client.list_pipe_sessions("other").expect("list other");
let untouched_entry = other
.iter()
.find(|s| s.session_id == untouched.session_id)
.expect("untouched session present");
assert!(
!untouched_entry.exited,
"untouched (different originator) must still be alive"
);
client
.terminate_pipe_session(&untouched.session_id, 500)
.expect("terminate untouched");
})
.await
.expect("blocking task");
}