#![cfg(feature = "daemon")]
use std::fs::{self, OpenOptions};
#[cfg(unix)]
use std::os::fd::AsRawFd;
#[cfg(windows)]
use std::os::windows::io::AsRawHandle;
use std::path::PathBuf;
use std::process::Command;
use std::sync::mpsc::Receiver;
use std::sync::Arc;
use std::time::{Duration, Instant};
use running_process::daemon::pipe_sessions::{PipeSessionRegistry, PipeStreamSelect};
#[cfg(not(windows))]
use running_process::daemon::pty_sessions::PtySessionRegistry;
use running_process::daemon::telemetry::{
TeeBackpressure, TeeEvent, TeeFileMode, TeeFileOptions, TeeRawOptions, TeeStream,
};
fn testbin_path(name: &str) -> PathBuf {
let output = Command::new(env!("CARGO"))
.args([
"build",
"-p",
"testbins",
"--bin",
name,
"--message-format=json",
])
.stderr(std::process::Stdio::inherit())
.output()
.expect("cargo build failed");
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines() {
if !line.contains("\"compiler-artifact\"") || !line.contains(name) {
continue;
}
if let Ok(v) = serde_json::from_str::<serde_json::Value>(line) {
if v["reason"] == "compiler-artifact"
&& v["target"]["kind"]
.as_array()
.is_some_and(|a| a.iter().any(|k| k == "bin"))
{
if let Some(exe) = v["executable"].as_str() {
let p = PathBuf::from(exe);
let deadline = Instant::now() + Duration::from_secs(5);
while !p.exists() && Instant::now() < deadline {
std::thread::sleep(Duration::from_millis(50));
}
return p;
}
}
}
}
panic!("could not find binary artifact for {name}");
}
fn wait_until_contains<F>(mut read: F, needle: &[u8]) -> Vec<u8>
where
F: FnMut() -> Vec<u8>,
{
let deadline = Instant::now() + Duration::from_secs(10);
loop {
let bytes = read();
if bytes.windows(needle.len()).any(|window| window == needle) {
return bytes;
}
if Instant::now() >= deadline {
panic!(
"timed out waiting for {:?} in {:?}",
String::from_utf8_lossy(needle),
String::from_utf8_lossy(&bytes)
);
}
std::thread::sleep(Duration::from_millis(50));
}
}
fn wait_for_exit<F>(mut exited: F)
where
F: FnMut() -> bool,
{
let deadline = Instant::now() + Duration::from_secs(10);
while !exited() {
if Instant::now() >= deadline {
panic!("session did not exit within deadline");
}
std::thread::sleep(Duration::from_millis(50));
}
}
fn wait_for_event_contains(receiver: &Receiver<TeeEvent>, needle: &[u8]) -> Vec<u8> {
let deadline = Instant::now() + Duration::from_secs(10);
loop {
match receiver.recv_timeout(Duration::from_millis(100)) {
Ok(TeeEvent::Bytes(bytes)) => {
if bytes.windows(needle.len()).any(|window| window == needle) {
return bytes;
}
}
Ok(TeeEvent::MissedBytes(_)) => {}
Err(_) if Instant::now() < deadline => {}
Err(err) => panic!("timed out waiting for tee event: {err}"),
}
if Instant::now() >= deadline {
panic!(
"timed out waiting for tee event containing {:?}",
String::from_utf8_lossy(needle)
);
}
}
}
fn wait_for_file_contains(path: &PathBuf, needle: &[u8]) -> Vec<u8> {
let deadline = Instant::now() + Duration::from_secs(10);
loop {
let bytes = fs::read(path).unwrap_or_default();
if bytes.windows(needle.len()).any(|window| window == needle) {
return bytes;
}
if Instant::now() >= deadline {
panic!(
"timed out waiting for file {:?} to contain {:?}; got {:?}",
path,
String::from_utf8_lossy(needle),
String::from_utf8_lossy(&bytes)
);
}
std::thread::sleep(Duration::from_millis(50));
}
}
#[test]
fn pipe_stdout_tee_ring_accumulates_without_attachment() {
let emitter = testbin_path("testbin-emitter");
let registry = Arc::new(PipeSessionRegistry::new());
let session = registry
.spawn(
vec![emitter.to_string_lossy().into_owned()],
None,
None,
"tee-ring-test".to_string(),
"testbin-emitter".to_string(),
false,
)
.expect("spawn pipe session");
let handle = session
.tee_stream_ring(PipeStreamSelect::Stdout, 128)
.expect("register stdout tee");
let bytes = wait_until_contains(
|| session.tee_snapshot(handle).expect("snapshot").bytes,
b"tick",
);
let snapshot = session.tee_snapshot(handle).expect("snapshot");
assert_eq!(snapshot.stream, TeeStream::Stdout);
assert_eq!(snapshot.bytes, bytes);
assert_eq!(snapshot.capacity, 128);
let _ = session.terminate(Duration::from_millis(100));
wait_for_exit(|| session.exit_state().is_some());
registry.remove(&session.id);
}
#[test]
fn pipe_stdout_tee_channel_receives_without_attachment() {
let emitter = testbin_path("testbin-emitter");
let registry = Arc::new(PipeSessionRegistry::new());
let session = registry
.spawn(
vec![emitter.to_string_lossy().into_owned()],
None,
None,
"tee-channel-test".to_string(),
"testbin-emitter".to_string(),
false,
)
.expect("spawn pipe session");
let (handle, receiver) = session
.tee_stream_channel(PipeStreamSelect::Stdout, 8)
.expect("register stdout channel tee");
let bytes = wait_for_event_contains(&receiver, b"tick");
assert!(bytes.windows(b"tick".len()).any(|window| window == b"tick"));
assert!(session.tee_snapshot(handle).is_none());
let status = session.tee_status(handle).expect("status");
assert_eq!(status.stream, TeeStream::Stdout);
assert!(!status.disconnected);
assert!(session.untee(handle));
let _ = session.terminate(Duration::from_millis(100));
wait_for_exit(|| session.exit_state().is_some());
registry.remove(&session.id);
}
#[test]
fn pipe_stdout_tee_file_receives_without_attachment() {
let emitter = testbin_path("testbin-emitter");
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("stdout.log");
let registry = Arc::new(PipeSessionRegistry::new());
let session = registry
.spawn(
vec![emitter.to_string_lossy().into_owned()],
None,
None,
"tee-file-test".to_string(),
"testbin-emitter".to_string(),
false,
)
.expect("spawn pipe session");
let handle = session
.tee_stream_file(
PipeStreamSelect::Stdout,
&path,
TeeFileOptions {
mode: TeeFileMode::Truncate,
queue_capacity: 8,
write_missed_markers: true,
backpressure: TeeBackpressure::DropOldest,
},
)
.expect("register stdout file tee");
let bytes = wait_for_file_contains(&path, b"tick");
assert!(bytes.windows(b"tick".len()).any(|window| window == b"tick"));
let status = session.tee_status(handle).expect("status");
assert_eq!(status.stream, TeeStream::Stdout);
assert!(!status.disconnected);
assert!(session.untee(handle));
let _ = session.terminate(Duration::from_millis(100));
wait_for_exit(|| session.exit_state().is_some());
registry.remove(&session.id);
}
#[test]
fn pipe_stdout_tee_raw_os_sink_receives_without_attachment() {
let emitter = testbin_path("testbin-emitter");
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("stdout-raw.log");
let file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&path)
.expect("open raw tee file");
let registry = Arc::new(PipeSessionRegistry::new());
let session = registry
.spawn(
vec![emitter.to_string_lossy().into_owned()],
None,
None,
"tee-raw-test".to_string(),
"testbin-emitter".to_string(),
false,
)
.expect("spawn pipe session");
#[cfg(unix)]
let handle = session
.tee_stream_raw_fd(
PipeStreamSelect::Stdout,
file.as_raw_fd(),
TeeRawOptions {
queue_capacity: 8,
write_missed_markers: true,
backpressure: TeeBackpressure::DropOldest,
},
)
.expect("register stdout raw fd tee");
#[cfg(windows)]
let handle = session
.tee_stream_raw_handle(
PipeStreamSelect::Stdout,
file.as_raw_handle(),
TeeRawOptions {
queue_capacity: 8,
write_missed_markers: true,
backpressure: TeeBackpressure::DropOldest,
},
)
.expect("register stdout raw handle tee");
let bytes = wait_for_file_contains(&path, b"tick");
assert!(bytes.windows(b"tick".len()).any(|window| window == b"tick"));
let status = session.tee_status(handle).expect("status");
assert_eq!(status.stream, TeeStream::Stdout);
assert!(!status.disconnected);
assert!(session.untee(handle));
let _ = session.terminate(Duration::from_millis(100));
wait_for_exit(|| session.exit_state().is_some());
registry.remove(&session.id);
drop(file);
}
#[test]
#[cfg(not(windows))]
fn pty_output_tee_ring_accumulates_without_attachment() {
let emitter = testbin_path("testbin-emitter");
let registry = Arc::new(PtySessionRegistry::new());
let session = registry
.spawn(
vec![emitter.to_string_lossy().into_owned()],
None,
None,
24,
80,
"tee-ring-test".to_string(),
"testbin-emitter".to_string(),
)
.expect("spawn pty session");
let handle = session.tee_output_ring(128);
let bytes = wait_until_contains(
|| session.tee_snapshot(handle).expect("snapshot").bytes,
b"tick",
);
let snapshot = session.tee_snapshot(handle).expect("snapshot");
assert_eq!(snapshot.stream, TeeStream::PtyOutput);
assert_eq!(snapshot.bytes, bytes);
assert_eq!(snapshot.capacity, 128);
let _ = session.terminate(Duration::from_millis(100));
wait_for_exit(|| session.exit_state().is_some());
registry.remove(&session.id);
}