use std::{process::Command, thread::sleep, time::Duration};
use anyhow::{Context, Result, bail};
use assert_cmd::cargo::cargo_bin;
use tempfile::tempdir;
#[test]
fn daemon_start_runs_in_background() -> Result<()> {
let runtime_dir = tempdir()?;
let state_dir = tempdir()?;
let binary = cargo_bin("pz");
let start = Command::new(&binary)
.args(["daemon", "start"])
.env("PZ_RUNTIME_DIR", runtime_dir.path())
.env("PZ_STATE_DIR", state_dir.path())
.output()
.context("failed to run daemon start")?;
assert!(
start.status.success(),
"daemon start failed: {}",
String::from_utf8_lossy(&start.stderr)
);
let stdout = String::from_utf8(start.stdout)?;
assert!(stdout.contains("pz daemon started"), "{stdout}");
let status = Command::new(&binary)
.args(["daemon", "status"])
.env("PZ_RUNTIME_DIR", runtime_dir.path())
.env("PZ_STATE_DIR", state_dir.path())
.output()
.context("failed to run daemon status")?;
assert!(
status.status.success(),
"daemon status failed: {}",
String::from_utf8_lossy(&status.stderr)
);
let stdout = String::from_utf8(status.stdout)?;
assert!(stdout.contains("pz daemon running"), "{stdout}");
let stop = Command::new(&binary)
.args(["daemon", "stop"])
.env("PZ_RUNTIME_DIR", runtime_dir.path())
.env("PZ_STATE_DIR", state_dir.path())
.output()
.context("failed to run daemon stop")?;
assert!(
stop.status.success(),
"daemon stop failed: {}",
String::from_utf8_lossy(&stop.stderr)
);
wait_for_socket_removal(runtime_dir.path().join("pz.sock"))?;
Ok(())
}
#[test]
fn logs_follow_prints_output_and_exits() -> Result<()> {
let runtime_dir = tempdir()?;
let state_dir = tempdir()?;
let binary = cargo_bin("pz");
run_pz(&binary, &runtime_dir, &state_dir, &["daemon", "start"])?;
run_pz(
&binary,
&runtime_dir,
&state_dir,
&["run", "--", "/bin/echo", "followed"],
)?;
let logs = run_pz(&binary, &runtime_dir, &state_dir, &["logs", "1", "-f"])?;
assert_eq!(String::from_utf8(logs.stdout)?, "followed\n");
run_pz(&binary, &runtime_dir, &state_dir, &["daemon", "stop"])?;
wait_for_socket_removal(runtime_dir.path().join("pz.sock"))?;
Ok(())
}
#[test]
fn logs_tail_limits_output_lines() -> Result<()> {
let runtime_dir = tempdir()?;
let state_dir = tempdir()?;
let binary = cargo_bin("pz");
run_pz(&binary, &runtime_dir, &state_dir, &["daemon", "start"])?;
run_pz(
&binary,
&runtime_dir,
&state_dir,
&["run", "--", "/usr/bin/printf", "one\\ntwo\\nthree\\n"],
)?;
run_pz(&binary, &runtime_dir, &state_dir, &["wait", "1"])?;
let logs = run_pz(
&binary,
&runtime_dir,
&state_dir,
&["logs", "1", "--tail", "2"],
)?;
assert_eq!(String::from_utf8(logs.stdout)?, "two\nthree\n");
run_pz(&binary, &runtime_dir, &state_dir, &["daemon", "stop"])?;
wait_for_socket_removal(runtime_dir.path().join("pz.sock"))?;
Ok(())
}
#[test]
fn wait_exits_with_process_status() -> Result<()> {
let runtime_dir = tempdir()?;
let state_dir = tempdir()?;
let binary = cargo_bin("pz");
run_pz(&binary, &runtime_dir, &state_dir, &["daemon", "start"])?;
run_pz(
&binary,
&runtime_dir,
&state_dir,
&["run", "--name", "wait-false", "--", "/usr/bin/env", "false"],
)?;
let wait = Command::new(&binary)
.args(["wait", "wait-false"])
.env("PZ_RUNTIME_DIR", runtime_dir.path())
.env("PZ_STATE_DIR", state_dir.path())
.output()
.context("failed to run pz wait")?;
assert_eq!(wait.status.code(), Some(1));
run_pz(&binary, &runtime_dir, &state_dir, &["daemon", "stop"])?;
wait_for_socket_removal(runtime_dir.path().join("pz.sock"))?;
Ok(())
}
#[test]
fn help_guides_agents_to_managed_processes() -> Result<()> {
let binary = cargo_bin("pz");
let help = Command::new(&binary)
.arg("--help")
.output()
.context("failed to run pz help")?;
assert!(help.status.success());
let stdout = String::from_utf8(help.stdout)?;
assert!(stdout.contains("daemon-backed process manager"), "{stdout}");
assert!(stdout.contains("explicit environment control"), "{stdout}");
Ok(())
}
#[test]
fn wrong_top_level_command_prints_help() -> Result<()> {
let binary = cargo_bin("pz");
let output = Command::new(&binary)
.arg("status")
.output()
.context("failed to run wrong pz command")?;
assert!(!output.status.success());
let stderr = String::from_utf8(output.stderr)?;
assert!(stderr.contains("unrecognized subcommand"), "{stderr}");
assert!(stderr.contains("daemon-backed process manager"), "{stderr}");
Ok(())
}
#[test]
fn wrong_subcommand_arg_prints_subcommand_help() -> Result<()> {
let binary = cargo_bin("pz");
let output = Command::new(&binary)
.args(["logs", "my-app", "--stream", "stdout"])
.output()
.context("failed to run wrong pz logs command")?;
assert!(!output.status.success());
let stderr = String::from_utf8(output.stderr)?;
assert!(
stderr.contains("unexpected argument '--stream'"),
"{stderr}"
);
assert!(stderr.contains("Usage: logs"), "{stderr}");
assert!(stderr.contains("--tail <TAIL>"), "{stderr}");
assert!(stderr.contains("Blocks until the process exits"), "{stderr}");
Ok(())
}
#[cfg(target_os = "linux")]
#[test]
fn resources_do_not_count_threads_as_process_memory() -> Result<()> {
let runtime_dir = tempdir()?;
let state_dir = tempdir()?;
let work_dir = tempdir()?;
let script = work_dir.path().join("threaded.py");
let binary = cargo_bin("pz");
std::fs::write(
&script,
r#"
import threading
import time
buf = bytearray(64 * 1024 * 1024)
threads = []
for _ in range(32):
thread = threading.Thread(target=time.sleep, args=(30,), daemon=True)
thread.start()
threads.append(thread)
print("ready", flush=True)
time.sleep(30)
"#,
)?;
run_pz(&binary, &runtime_dir, &state_dir, &["daemon", "start"])?;
run_pz(
&binary,
&runtime_dir,
&state_dir,
&[
"run",
"--name",
"threaded-memory",
"--",
"/usr/bin/python3",
script.to_str().context("script path should be utf-8")?,
],
)?;
for _ in 0..100 {
let logs = run_pz(
&binary,
&runtime_dir,
&state_dir,
&["logs", "threaded-memory", "stdout"],
)?;
if String::from_utf8(logs.stdout)?.contains("ready") {
break;
}
sleep(Duration::from_millis(50));
}
let resources = run_pz(
&binary,
&runtime_dir,
&state_dir,
&["resources", "threaded-memory"],
)?;
let resources = String::from_utf8(resources.stdout)?;
assert!(resources.contains("status: running"), "{resources}");
assert!(resources.contains("processes: 1"), "{resources}");
assert_memory_below(&resources, 512.0)?;
run_pz(
&binary,
&runtime_dir,
&state_dir,
&["stop", "threaded-memory", "--force"],
)?;
run_pz(&binary, &runtime_dir, &state_dir, &["daemon", "stop"])?;
wait_for_socket_removal(runtime_dir.path().join("pz.sock"))?;
Ok(())
}
#[cfg(target_os = "linux")]
fn assert_memory_below(resources: &str, max_mb: f64) -> Result<()> {
let memory = resources
.lines()
.find_map(|line| line.strip_prefix("memory: "))
.context("resources should include memory line")?;
let mut parts = memory.split_whitespace();
let value = parts
.next()
.context("memory should include value")?
.parse::<f64>()?;
let unit = parts.next().context("memory should include unit")?;
let mb = match unit {
"GB" => value * 1024.0,
"MB" => value,
"KB" => value / 1024.0,
"B" => value / 1024.0 / 1024.0,
other => bail!("unexpected memory unit {other:?}"),
};
assert!(
mb < max_mb,
"expected memory below {max_mb} MB, got {memory}\n{resources}"
);
Ok(())
}
fn run_pz(
binary: &std::path::Path,
runtime_dir: &tempfile::TempDir,
state_dir: &tempfile::TempDir,
args: &[&str],
) -> Result<std::process::Output> {
let output = Command::new(binary)
.args(args)
.env("PZ_RUNTIME_DIR", runtime_dir.path())
.env("PZ_STATE_DIR", state_dir.path())
.output()
.with_context(|| format!("failed to run pz {}", args.join(" ")))?;
assert!(
output.status.success(),
"pz {} failed: {}",
args.join(" "),
String::from_utf8_lossy(&output.stderr)
);
Ok(output)
}
fn wait_for_socket_removal(socket: std::path::PathBuf) -> Result<()> {
for _ in 0..100 {
if !socket.exists() {
return Ok(());
}
sleep(Duration::from_millis(10));
}
bail!("daemon socket still exists at {}", socket.display())
}