use anyhow::{Context, Result, bail};
use zag_agent::process_store::{ProcessEntry, ProcessStore};
use zag_agent::session::SessionStore;
use zag_agent::session_log::{LogEventKind, append_event_to_log, logs_dir};
fn resolve_process_id(id: &str) -> Result<String> {
if id == "self" {
std::env::var("ZAG_PROCESS_ID").map_err(|_| {
anyhow::anyhow!(
"Cannot resolve \"self\": ZAG_PROCESS_ID is not set. \
Are you running inside a zag session?"
)
})
} else {
Ok(id.to_string())
}
}
pub fn resolve_live_status(entry: &ProcessEntry) -> &'static str {
if entry.status != "running" {
return match entry.status.as_str() {
"exited" => "exited",
"killed" => "killed",
_ => "unknown",
};
}
check_process_alive(entry.pid)
}
#[cfg(unix)]
fn check_process_alive(pid: u32) -> &'static str {
use nix::sys::signal::kill;
use nix::unistd::Pid;
let pid = Pid::from_raw(pid as i32);
match kill(pid, None) {
Ok(()) => "running",
Err(_) => "dead",
}
}
#[cfg(not(unix))]
fn check_process_alive(_pid: u32) -> &'static str {
"running"
}
#[cfg(unix)]
fn send_signal(pid: u32, signal: nix::sys::signal::Signal) -> Result<()> {
use nix::sys::signal::kill;
use nix::unistd::Pid;
let pid = Pid::from_raw(pid as i32);
kill(pid, signal).map_err(|e| anyhow::anyhow!("Failed to signal process: {e}"))
}
#[cfg(not(unix))]
fn send_signal(pid: u32, _signal_name: &str) -> Result<()> {
bail!(
"Process signaling is not supported on Windows. Use taskkill /PID {} instead.",
pid
);
}
#[derive(Debug, serde::Serialize)]
pub struct ProcessInfo {
#[serde(flatten)]
pub entry: serde_json::Value,
pub live_status: String,
}
pub fn list_processes(
running: bool,
limit: Option<usize>,
provider: Option<&str>,
) -> Result<Vec<ProcessInfo>> {
let store = ProcessStore::load()?;
let mut entries: Vec<&ProcessEntry> = store.list_recent(limit);
if running {
entries.retain(|e| resolve_live_status(e) == "running");
}
if let Some(p) = provider {
entries.retain(|e| e.provider == p);
}
Ok(entries
.iter()
.map(|e| {
let mut v = serde_json::to_value(e).unwrap_or_default();
let live = resolve_live_status(e).to_string();
if let serde_json::Value::Object(ref mut m) = v {
m.insert(
"live_status".to_string(),
serde_json::Value::String(live.clone()),
);
}
ProcessInfo {
entry: v,
live_status: live,
}
})
.collect())
}
pub fn get_process(id: &str) -> Result<ProcessInfo> {
let id = resolve_process_id(id)?;
let store = ProcessStore::load()?;
match store.find(&id) {
Some(e) => {
let live = resolve_live_status(e).to_string();
let mut v = serde_json::to_value(e)?;
if let serde_json::Value::Object(ref mut m) = v {
m.insert(
"live_status".to_string(),
serde_json::Value::String(live.clone()),
);
}
Ok(ProcessInfo {
entry: v,
live_status: live,
})
}
None => bail!("Process not found: {id}"),
}
}
pub fn request_stop(id: &str) -> Result<()> {
let id = resolve_process_id(id)?;
let entry = ProcessStore::load()?
.find(&id)
.ok_or_else(|| anyhow::anyhow!("Process not found: {id}"))?
.clone();
let live = resolve_live_status(&entry);
if live != "running" {
bail!("Process {id} is not running (status: {live})");
}
stop_process(entry.pid)
}
#[derive(Debug, Clone)]
pub enum KillResult {
Inline(String),
File(std::path::PathBuf),
}
impl KillResult {
pub fn read(&self) -> Result<String> {
match self {
Self::Inline(s) => Ok(s.clone()),
Self::File(path) => std::fs::read_to_string(path)
.with_context(|| format!("Failed to read result file: {}", path.display())),
}
}
}
struct PreparedKill {
process_id: String,
process_entry: ProcessEntry,
session_entry: Option<zag_agent::session::SessionEntry>,
result_text: Option<String>,
}
fn prepare_kill(id: &str, result: Option<KillResult>) -> Result<PreparedKill> {
let id = resolve_process_id(id)?;
let process_entry = ProcessStore::load()?
.find(&id)
.ok_or_else(|| anyhow::anyhow!("Process not found: {id}"))?
.clone();
let live = resolve_live_status(&process_entry);
if live != "running" {
bail!("Process {id} is not running (status: {live})");
}
let result_text = match &result {
Some(r) => Some(r.read()?),
None => None,
};
let session_entry = process_entry.session_id.as_deref().and_then(|sid| {
SessionStore::load(process_entry.root.as_deref())
.ok()
.and_then(|store| store.find_by_any_id(sid).cloned())
});
if let Some(constraints) = session_entry.as_ref().and_then(|s| s.exit.as_ref()) {
let text = result_text.as_deref().unwrap_or("");
constraints
.validate(text)
.map_err(|e| anyhow::anyhow!("{e}"))?;
}
Ok(PreparedKill {
process_id: id,
process_entry,
session_entry,
result_text,
})
}
fn apply_kill(prepared: PreparedKill) -> Result<()> {
let PreparedKill {
process_id,
process_entry,
session_entry,
result_text,
} = prepared;
if let (Some(text), Some(session)) = (result_text.as_ref(), session_entry.as_ref()) {
record_session_result(session, text, process_entry.root.as_deref()).map_err(|e| {
anyhow::anyhow!(
"Failed to record session result: {e}. Kill aborted — the \
process is still running. Retry once the underlying I/O \
error is resolved."
)
})?;
}
let mut store = ProcessStore::load()?;
store.update_status(&process_id, "killed", None);
store.save()?;
kill_process(process_entry.pid)
}
pub fn request_kill(id: &str, result: Option<KillResult>) -> Result<()> {
apply_kill(prepare_kill(id, result)?)
}
fn record_session_result(
session_entry: &zag_agent::session::SessionEntry,
result: &str,
root: Option<&str>,
) -> Result<()> {
let log_path = session_entry
.log_path
.as_ref()
.map(std::path::PathBuf::from)
.unwrap_or_else(|| {
logs_dir(root)
.join("sessions")
.join(format!("{}.jsonl", session_entry.session_id))
});
append_event_to_log(
&log_path,
&session_entry.provider,
&session_entry.session_id,
session_entry.provider_session_id.as_deref(),
LogEventKind::SessionResult {
result: result.to_string(),
},
)
}
pub fn run_ps(command: PsCommand, json: bool) -> Result<()> {
match command {
PsCommand::List {
running,
limit,
provider,
children,
} => {
let store = ProcessStore::load()?;
let mut entries: Vec<&ProcessEntry> = store.list_recent(limit);
if running {
entries.retain(|e| resolve_live_status(e) == "running");
}
if let Some(ref p) = provider {
entries.retain(|e| e.provider == *p);
}
if let Some(ref parent_id) = children {
entries.retain(|e| {
e.parent_session_id.as_deref() == Some(parent_id)
|| e.parent_process_id.as_deref() == Some(parent_id)
});
}
if json {
let with_live: Vec<serde_json::Value> = entries
.iter()
.map(|e| {
let mut v = serde_json::to_value(e).unwrap_or_default();
if let serde_json::Value::Object(ref mut m) = v {
m.insert(
"live_status".to_string(),
serde_json::Value::String(resolve_live_status(e).to_string()),
);
}
v
})
.collect();
println!("{}", serde_json::to_string(&with_live)?);
return Ok(());
}
if entries.is_empty() {
println!("No processes found.");
return Ok(());
}
println!(
"{:<38} {:<7} {:<8} {:<10} {:<10} {:<7} {:<22} PROMPT",
"ID", "PID", "STATUS", "PROVIDER", "MODEL", "CMD", "STARTED"
);
println!("{}", "-".repeat(130));
for e in &entries {
let live = resolve_live_status(e);
let prompt_display = e
.prompt
.as_deref()
.unwrap_or("")
.chars()
.take(40)
.collect::<String>();
println!(
"{:<38} {:<7} {:<8} {:<10} {:<10} {:<7} {:<22} {}",
e.id,
e.pid,
live,
e.provider,
e.model,
e.command,
e.started_at.chars().take(20).collect::<String>(),
prompt_display
);
}
}
PsCommand::Show { id } => {
let id = resolve_process_id(&id)?;
let store = ProcessStore::load()?;
match store.find(&id) {
Some(e) => {
let live = resolve_live_status(e);
if json {
let mut v = serde_json::to_value(e)?;
if let serde_json::Value::Object(ref mut m) = v {
m.insert(
"live_status".to_string(),
serde_json::Value::String(live.to_string()),
);
}
println!("{}", serde_json::to_string(&v)?);
return Ok(());
}
println!("Process ID: {}", e.id);
println!("PID: {}", e.pid);
println!("Status: {live}");
println!("Provider: {}", e.provider);
println!("Model: {}", e.model);
println!("Command: {}", e.command);
println!("Started: {}", e.started_at);
if let Some(ref exited) = e.exited_at {
println!("Exited: {exited}");
}
if let Some(code) = e.exit_code {
println!("Exit code: {code}");
}
if let Some(ref sid) = e.session_id {
println!("Session ID: {sid}");
}
if let Some(ref root) = e.root {
println!("Root: {root}");
}
if let Some(ref prompt) = e.prompt {
println!("Prompt: {prompt}");
}
}
None => {
bail!("Process not found: {id}");
}
}
}
PsCommand::Stop { id } => {
let id = resolve_process_id(&id)?;
let entry = ProcessStore::load()?
.find(&id)
.ok_or_else(|| anyhow::anyhow!("Process not found: {id}"))?
.clone();
let live = resolve_live_status(&entry);
if live != "running" {
bail!("Process {id} is not running (status: {live})");
}
println!(
"\x1b[33m>\x1b[0m Sending stop signal to process {} ({})",
entry.pid, entry.id
);
stop_process(entry.pid)?;
println!("\x1b[32m✓\x1b[0m Stop signal sent");
}
PsCommand::Kill { id, result, file } => {
let kill_result = match (result, file) {
(Some(r), None) => Some(KillResult::Inline(r)),
(None, Some(p)) => Some(KillResult::File(p)),
(None, None) => None,
(Some(_), Some(_)) => bail!("`<result>` and `--file` are mutually exclusive"),
};
let prepared = prepare_kill(&id, kill_result)?;
println!(
"\x1b[33m>\x1b[0m Sending kill signal to process {} ({})",
prepared.process_entry.pid, prepared.process_entry.id
);
apply_kill(prepared)?;
println!("\x1b[32m✓\x1b[0m Process killed");
}
}
Ok(())
}
#[cfg(unix)]
fn stop_process(pid: u32) -> Result<()> {
send_signal(pid, nix::sys::signal::Signal::SIGHUP)
}
#[cfg(not(unix))]
fn stop_process(pid: u32) -> Result<()> {
send_signal(pid, "stop")
}
#[cfg(unix)]
fn kill_process(pid: u32) -> Result<()> {
send_signal(pid, nix::sys::signal::Signal::SIGTERM)
}
#[cfg(not(unix))]
fn kill_process(pid: u32) -> Result<()> {
send_signal(pid, "kill")
}
#[derive(clap::Subcommand)]
pub enum PsCommand {
List {
#[arg(long)]
running: bool,
#[arg(short = 'n', long)]
limit: Option<usize>,
#[arg(short = 'p', long)]
provider: Option<String>,
#[arg(long)]
children: Option<String>,
},
Show {
id: String,
},
Stop {
id: String,
},
Kill {
id: String,
#[arg(conflicts_with = "file")]
result: Option<String>,
#[arg(long, value_name = "PATH", conflicts_with = "result")]
file: Option<std::path::PathBuf>,
},
}
#[cfg(test)]
#[path = "ps_tests.rs"]
mod tests;