#![cfg(feature = "daemon")]
use running_process::client::{SessionTeeFileRequest, SessionTeeKind, SessionTeeStream};
use running_process::daemon::client::DaemonClient;
use running_process::daemon::paths;
use running_process::daemon::pipe_session::{PipeSpawnRequest, PipeStreamAttachment};
use running_process::daemon::server::DaemonServer;
use running_process::proto::daemon::PipeStreamKind;
use std::fs;
use std::path::PathBuf;
use std::process::Command;
use std::time::{Duration, Instant};
fn testbin_path(name: &str) -> PathBuf {
let output = Command::new(env!("CARGO"))
.args([
"build",
"-p",
"testbins",
"--bin",
name,
"--message-format=json",
])
.stderr(std::process::Stdio::inherit())
.output()
.expect("cargo build failed");
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines() {
if !line.contains("\"compiler-artifact\"") || !line.contains(name) {
continue;
}
if let Ok(v) = serde_json::from_str::<serde_json::Value>(line) {
if v["reason"] == "compiler-artifact"
&& v["target"]["kind"]
.as_array()
.is_some_and(|a| a.iter().any(|k| k == "bin"))
{
if let Some(exe) = v["executable"].as_str() {
let p = PathBuf::from(exe);
let deadline = Instant::now() + Duration::from_secs(5);
while !p.exists() && Instant::now() < deadline {
std::thread::sleep(Duration::from_millis(50));
}
return p;
}
}
}
}
panic!("could not find binary artifact for {name}");
}
fn start_server(scope: &str) -> (tokio::task::JoinHandle<()>, String) {
let socket = paths::socket_path(Some(scope));
let db = paths::db_path(Some(scope)).to_string_lossy().into_owned();
let server = DaemonServer::new(
socket.clone(),
db,
"pipe-test".to_string(),
scope.to_string(),
std::env::current_dir()
.unwrap_or_default()
.to_string_lossy()
.into_owned(),
)
.expect("DaemonServer::new");
let handle = tokio::spawn(async move {
server.run().await.expect("server.run");
});
(handle, socket)
}
fn wait_for_file_contains(path: &PathBuf, needle: &[u8]) -> Vec<u8> {
let deadline = Instant::now() + Duration::from_secs(10);
loop {
let bytes = fs::read(path).unwrap_or_default();
if bytes.windows(needle.len()).any(|window| window == needle) {
return bytes;
}
if Instant::now() >= deadline {
panic!(
"timed out waiting for file {:?} to contain {:?}; got {:?}",
path,
String::from_utf8_lossy(needle),
String::from_utf8_lossy(&bytes)
);
}
std::thread::sleep(Duration::from_millis(50));
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn spawn_attach_stdout_then_terminate_lifecycle() {
let scope = format!("pipe-{}", line!());
let (_handle, socket) = start_server(&scope);
tokio::time::sleep(Duration::from_millis(300)).await;
let env_reporter = tokio::task::spawn_blocking(|| testbin_path("testbin-env-reporter"))
.await
.expect("testbin");
let socket_for_test = socket.clone();
tokio::task::spawn_blocking(move || {
let mut control = DaemonClient::connect_to(&socket_for_test).expect("control connect");
let argv = vec![env_reporter.to_string_lossy().into_owned()];
let spawned = control
.spawn_pipe_session(&PipeSpawnRequest::new(argv).with_originator("pipe-lifecycle-test"))
.expect("spawn pipe session");
assert!(!spawned.session_id.is_empty());
let listed = control.list_pipe_sessions("").expect("list");
let entry = listed
.iter()
.find(|s| s.session_id == spawned.session_id)
.expect("pipe session not present in list");
assert!(!entry.stdout_attached);
assert!(!entry.stderr_attached);
assert!(!entry.exited);
std::thread::sleep(Duration::from_millis(500));
let attachment = PipeStreamAttachment::attach_to(
&socket_for_test,
&spawned.session_id,
PipeStreamKind::Stdout,
false,
)
.expect("attach stdout");
let text = String::from_utf8_lossy(&attachment.initial_backlog);
assert!(
text.contains("READY"),
"expected READY in initial backlog, got: {text:?}"
);
let listed_after = control
.list_pipe_sessions("pipe-lifecycle-test")
.expect("list after attach");
let entry = listed_after
.iter()
.find(|s| s.session_id == spawned.session_id)
.expect("session disappeared from filtered list");
assert!(entry.stdout_attached);
match PipeStreamAttachment::attach_to(
&socket_for_test,
&spawned.session_id,
PipeStreamKind::Stdout,
false,
) {
Ok(_) => panic!("second attach should not succeed without steal"),
Err(running_process::daemon::pipe_session::PipeAttachError::Server {
code, ..
}) => {
assert_eq!(
code,
running_process::proto::daemon::StatusCode::AlreadyAttached
);
}
Err(other) => panic!("unexpected attach error: {other}"),
}
drop(attachment);
std::thread::sleep(Duration::from_millis(150));
control
.terminate_pipe_session(&spawned.session_id, 1000)
.expect("terminate");
let deadline = Instant::now() + Duration::from_secs(15);
loop {
let listed = control.list_pipe_sessions("").expect("list during wait");
if let Some(entry) = listed.iter().find(|s| s.session_id == spawned.session_id) {
if entry.exited {
break;
}
}
if Instant::now() >= deadline {
panic!("pipe session did not exit within budget");
}
std::thread::sleep(Duration::from_millis(200));
}
})
.await
.expect("blocking task");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn pipe_stdout_file_tee_can_be_registered_over_ipc() {
let scope = format!("pipe-tee-ipc-{}", line!());
let (_handle, socket) = start_server(&scope);
tokio::time::sleep(Duration::from_millis(300)).await;
let emitter = tokio::task::spawn_blocking(|| testbin_path("testbin-emitter"))
.await
.expect("testbin");
let tempdir = tempfile::tempdir().expect("tempdir");
let transcript = tempdir.path().join("stdout.log");
let socket_for_test = socket.clone();
tokio::task::spawn_blocking(move || {
let mut client = DaemonClient::connect_to(&socket_for_test).expect("connect");
let spawned = client
.spawn_pipe_session(
&PipeSpawnRequest::new([emitter.to_string_lossy().into_owned()])
.with_originator("pipe-tee-ipc"),
)
.expect("spawn");
let request = SessionTeeFileRequest::new(
&spawned.session_id,
SessionTeeKind::Pipe,
SessionTeeStream::Stdout,
&transcript,
)
.truncate()
.queue_capacity(8);
let handle = client
.register_session_file_tee(&request)
.expect("register file tee");
let bytes = wait_for_file_contains(&transcript, b"tick");
assert!(bytes.windows(b"tick".len()).any(|window| window == b"tick"));
let status = client
.get_session_tee_status(SessionTeeKind::Pipe, &spawned.session_id, handle)
.expect("tee status");
assert_eq!(status.stream, SessionTeeStream::Stdout);
assert!(!status.disconnected);
client
.unregister_session_tee(SessionTeeKind::Pipe, &spawned.session_id, handle)
.expect("unregister file tee");
client
.terminate_pipe_session(&spawned.session_id, 1000)
.expect("terminate");
})
.await
.expect("blocking task");
}