#![cfg(feature = "daemon")]
use running_process::daemon::client::DaemonClient;
use running_process::daemon::paths;
use running_process::daemon::pty_session::{PtyAttachment, PtySpawnRequest};
use running_process::daemon::server::DaemonServer;
use running_process::proto::daemon::{pty_stream_frame::Frame as StreamOneof, StatusCode};
use std::path::PathBuf;
use std::process::Command;
use std::time::{Duration, Instant};
fn testbin_path(name: &str) -> PathBuf {
let output = Command::new(env!("CARGO"))
.args([
"build",
"-p",
"testbins",
"--bin",
name,
"--message-format=json",
])
.stderr(std::process::Stdio::inherit())
.output()
.expect("cargo build for testbin failed");
assert!(output.status.success(), "cargo build -p {name} failed");
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines() {
if !line.contains("\"compiler-artifact\"") || !line.contains(name) {
continue;
}
if let Ok(v) = serde_json::from_str::<serde_json::Value>(line) {
if v["reason"] == "compiler-artifact"
&& v["target"]["kind"]
.as_array()
.is_some_and(|a| a.iter().any(|k| k == "bin"))
{
if let Some(exe) = v["executable"].as_str() {
let p = PathBuf::from(exe);
let deadline = Instant::now() + Duration::from_secs(5);
while !p.exists() && Instant::now() < deadline {
std::thread::sleep(Duration::from_millis(50));
}
assert!(p.exists(), "cargo reported {p:?} but it does not exist");
return p;
}
}
}
}
panic!("could not find binary artifact for {name}");
}
fn start_server(scope: &str) -> (tokio::task::JoinHandle<()>, String) {
let socket = paths::socket_path(Some(scope));
let db = paths::db_path(Some(scope)).to_string_lossy().into_owned();
let server = DaemonServer::new(
socket.clone(),
db,
"pty-test".to_string(),
scope.to_string(),
std::env::current_dir()
.unwrap_or_default()
.to_string_lossy()
.into_owned(),
)
.expect("DaemonServer::new");
let handle = tokio::spawn(async move {
server.run().await.expect("server.run");
});
(handle, socket)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn spawn_attach_detach_reattach_terminate_lifecycle() {
let scope = format!("pty-{}", line!());
let (_server_handle, socket) = start_server(&scope);
tokio::time::sleep(Duration::from_millis(300)).await;
let sleeper_path = tokio::task::spawn_blocking(|| testbin_path("testbin-sleeper"))
.await
.expect("spawn_blocking for testbin path");
let socket_for_test = socket.clone();
tokio::task::spawn_blocking(move || {
let mut control = DaemonClient::connect_to(&socket_for_test).expect("control connect");
let argv = vec![sleeper_path.to_string_lossy().into_owned()];
let spawn_req = PtySpawnRequest::new(argv)
.with_size(24, 80)
.with_originator("pty-session-attach-test");
let spawned = control
.spawn_pty_session(&spawn_req)
.expect("spawn_pty_session");
assert!(!spawned.session_id.is_empty());
assert!(spawned.pid > 0);
let listed = control.list_pty_sessions("").expect("list_pty_sessions");
let entry = listed
.iter()
.find(|s| s.session_id == spawned.session_id)
.expect("spawned session not present in list");
assert!(!entry.attached);
assert!(!entry.exited);
let mut attach_a =
PtyAttachment::attach_to(&socket_for_test, &spawned.session_id, 30, 100, false)
.expect("first attach");
let after_attach = control.list_pty_sessions("").expect("list after attach");
let entry = after_attach
.iter()
.find(|s| s.session_id == spawned.session_id)
.expect("session disappeared");
assert!(entry.attached, "session should report attached=true");
assert_eq!(entry.rows, 30, "rows should reflect attach client size");
assert_eq!(entry.cols, 100, "cols should reflect attach client size");
match PtyAttachment::attach_to(&socket_for_test, &spawned.session_id, 24, 80, false) {
Ok(_) => panic!("second attach should not succeed without steal"),
Err(running_process::daemon::pty_session::AttachError::Server { code, .. }) => {
assert_eq!(
code,
StatusCode::AlreadyAttached,
"expected ALREADY_ATTACHED, got {code:?}"
);
}
Err(other) => panic!("unexpected error variant: {other}"),
}
attach_a.send_input(b"hello\n").expect("send_input");
attach_a.detach().expect("detach via input frame");
std::thread::sleep(Duration::from_millis(100));
let after_detach = control.list_pty_sessions("").expect("list after detach");
let entry = after_detach
.iter()
.find(|s| s.session_id == spawned.session_id)
.expect("session must outlive detach");
assert!(
!entry.attached,
"session should report attached=false after detach"
);
assert!(!entry.exited, "session should still be running");
let _attach_b =
PtyAttachment::attach_to(&socket_for_test, &spawned.session_id, 24, 80, false)
.expect("reattach");
control
.terminate_pty_session(&spawned.session_id, 1000)
.expect("terminate_pty_session");
let deadline = Instant::now() + Duration::from_secs(15);
loop {
let listed = control.list_pty_sessions("").expect("list during wait");
if let Some(entry) = listed.iter().find(|s| s.session_id == spawned.session_id) {
if entry.exited {
break;
}
}
if Instant::now() >= deadline {
panic!("session did not transition to exited within budget");
}
std::thread::sleep(Duration::from_millis(200));
}
})
.await
.expect("blocking task panic");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn list_filters_by_originator() {
let scope = format!("pty-{}", line!());
let (_handle, socket) = start_server(&scope);
tokio::time::sleep(Duration::from_millis(300)).await;
let sleeper_path = tokio::task::spawn_blocking(|| testbin_path("testbin-sleeper"))
.await
.expect("testbin");
let socket_for_test = socket.clone();
tokio::task::spawn_blocking(move || {
let mut client = DaemonClient::connect_to(&socket_for_test).expect("connect");
let argv = vec![sleeper_path.to_string_lossy().into_owned()];
let a = client
.spawn_pty_session(&PtySpawnRequest::new(argv.clone()).with_originator("alpha"))
.expect("spawn a");
let b = client
.spawn_pty_session(&PtySpawnRequest::new(argv).with_originator("beta"))
.expect("spawn b");
let alpha_only = client.list_pty_sessions("alpha").expect("list alpha");
let alpha_ids: Vec<&str> = alpha_only.iter().map(|s| s.session_id.as_str()).collect();
assert!(alpha_ids.contains(&a.session_id.as_str()));
assert!(!alpha_ids.contains(&b.session_id.as_str()));
client
.terminate_pty_session(&a.session_id, 500)
.expect("terminate a");
client
.terminate_pty_session(&b.session_id, 500)
.expect("terminate b");
})
.await
.expect("blocking task");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn attach_with_steal_evicts_existing_client() {
let scope = format!("pty-{}", line!());
let (_handle, socket) = start_server(&scope);
tokio::time::sleep(Duration::from_millis(300)).await;
let sleeper_path = tokio::task::spawn_blocking(|| testbin_path("testbin-sleeper"))
.await
.expect("testbin");
let socket_for_test = socket.clone();
tokio::task::spawn_blocking(move || {
let mut client = DaemonClient::connect_to(&socket_for_test).expect("connect");
let argv = vec![sleeper_path.to_string_lossy().into_owned()];
let session = client
.spawn_pty_session(&PtySpawnRequest::new(argv))
.expect("spawn");
let mut attach_a =
PtyAttachment::attach_to(&socket_for_test, &session.session_id, 24, 80, false)
.expect("attach a");
let _attach_b =
PtyAttachment::attach_to(&socket_for_test, &session.session_id, 24, 80, true)
.expect("steal attach b");
loop {
let frame = attach_a.recv_frame().expect("recv after steal");
match frame.frame {
Some(StreamOneof::Output(_)) | Some(StreamOneof::MissedBytes(_)) => continue,
Some(StreamOneof::StolenBy(_)) | Some(StreamOneof::Error(_)) => break,
other => panic!("expected terminal frame on stolen attachment, got {other:?}"),
}
}
client
.terminate_pty_session(&session.session_id, 500)
.expect("terminate");
})
.await
.expect("blocking task");
}