use crate::listen;
use crate::ps::resolve_live_status;
use anyhow::{Result, bail};
use log::debug;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use std::time::{Duration, Instant};
use zag_agent::process_store::ProcessStore;
use zag_agent::session::SessionStore;
use zag_agent::session_log::{AgentLogEvent, LogEventKind};
#[derive(Debug, Clone, serde::Serialize)]
pub struct WaitResult {
pub session_id: String,
pub success: bool,
pub error: Option<String>,
}
pub struct WaitParams {
pub session_ids: Vec<String>,
pub tag: Option<String>,
pub latest: bool,
pub timeout: Option<String>,
pub any: bool,
pub json: bool,
pub root: Option<String>,
}
fn parse_duration(s: &str) -> Result<Duration> {
let s = s.trim();
let mut total_secs: u64 = 0;
let mut current_num = String::new();
for ch in s.chars() {
if ch.is_ascii_digit() {
current_num.push(ch);
} else {
let n: u64 = current_num
.parse()
.map_err(|_| anyhow::anyhow!("Invalid duration: '{}'", s))?;
current_num.clear();
match ch {
's' => total_secs += n,
'm' => total_secs += n * 60,
'h' => total_secs += n * 3600,
'd' => total_secs += n * 86400,
_ => bail!("Invalid duration unit '{}' in '{}'", ch, s),
}
}
}
if !current_num.is_empty() {
let n: u64 = current_num
.parse()
.map_err(|_| anyhow::anyhow!("Invalid duration: '{}'", s))?;
total_secs += n;
}
if total_secs == 0 {
bail!("Duration must be greater than zero: '{}'", s);
}
Ok(Duration::from_secs(total_secs))
}
fn resolve_session_ids(params: &WaitParams) -> Result<Vec<String>> {
let mut ids = Vec::new();
if params.latest {
let store = SessionStore::load(params.root.as_deref())?;
let entry = store
.latest()
.ok_or_else(|| anyhow::anyhow!("No sessions found"))?;
ids.push(entry.session_id.clone());
}
if let Some(ref tag) = params.tag {
let store = SessionStore::load(params.root.as_deref())?;
let tagged = store.find_by_tag(tag);
if tagged.is_empty() {
bail!("No sessions found with tag '{}'", tag);
}
for entry in tagged {
if !ids.contains(&entry.session_id) {
ids.push(entry.session_id.clone());
}
}
}
for id in ¶ms.session_ids {
if !ids.contains(id) {
ids.push(id.clone());
}
}
if ids.is_empty() {
bail!("No sessions specified. Provide session IDs, --tag, or --latest.");
}
Ok(ids)
}
fn check_log_for_ended(session_id: &str, log_path: &PathBuf) -> Option<WaitResult> {
let file = std::fs::File::open(log_path).ok()?;
let reader = BufReader::new(file);
for line in reader.lines() {
let line = match line {
Ok(l) => l,
Err(_) => continue,
};
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Ok(event) = serde_json::from_str::<AgentLogEvent>(trimmed) {
if let LogEventKind::SessionEnded { success, error } = &event.kind {
return Some(WaitResult {
session_id: session_id.to_string(),
success: *success,
error: error.clone(),
});
}
}
}
None
}
fn check_process_dead(session_id: &str) -> bool {
let store = match ProcessStore::load() {
Ok(s) => s,
Err(_) => return false,
};
let entry = store
.processes
.iter()
.filter(|e| e.session_id.as_deref() == Some(session_id))
.max_by(|a, b| a.started_at.cmp(&b.started_at));
match entry {
Some(e) => {
let status = resolve_live_status(e);
status != "running"
}
None => false, }
}
pub fn wait_for_sessions(params: &WaitParams) -> Result<Vec<WaitResult>> {
let session_ids = resolve_session_ids(params)?;
let timeout = params.timeout.as_deref().map(parse_duration).transpose()?;
debug!(
"Waiting for {} session(s): {:?}, timeout: {:?}, any: {}",
session_ids.len(),
session_ids,
timeout,
params.any
);
let start = Instant::now();
let mut results: Vec<WaitResult> = Vec::new();
let mut pending: Vec<String> = session_ids.clone();
loop {
if let Some(timeout_dur) = timeout {
if start.elapsed() >= timeout_dur {
for id in &pending {
results.push(WaitResult {
session_id: id.clone(),
success: false,
error: Some("timeout".to_string()),
});
}
return Ok(results);
}
}
let mut newly_done = Vec::new();
for id in &pending {
let log_path =
listen::resolve_session_log(Some(id), false, false, params.root.as_deref());
if let Ok(ref path) = log_path {
if let Some(result) = check_log_for_ended(id, path) {
newly_done.push(result);
continue;
}
}
if check_process_dead(id) {
newly_done.push(WaitResult {
session_id: id.clone(),
success: false,
error: Some("process died without clean exit".to_string()),
});
}
}
for result in newly_done {
pending.retain(|id| *id != result.session_id);
results.push(result);
if params.any {
return Ok(results);
}
}
if pending.is_empty() {
return Ok(results);
}
std::thread::sleep(Duration::from_millis(500));
}
}
pub fn run_wait(params: WaitParams) -> Result<()> {
let json = params.json;
let results = wait_for_sessions(¶ms)?;
let timed_out = results
.iter()
.any(|r| r.error.as_deref() == Some("timeout"));
if json {
for r in &results {
println!("{}", serde_json::to_string(r)?);
}
} else {
for r in &results {
if r.error.as_deref() == Some("timeout") {
eprintln!("Timed out waiting for session {}", r.session_id);
} else {
print_result(r);
}
}
}
if timed_out {
std::process::exit(124);
}
let all_success = results.iter().all(|r| r.success);
std::process::exit(if all_success { 0 } else { 1 });
}
fn print_result(result: &WaitResult) {
let status = if result.success {
"\x1b[32m✓\x1b[0m completed"
} else {
"\x1b[31m✗\x1b[0m failed"
};
let error_info = result
.error
.as_deref()
.map(|e| format!(": {}", e))
.unwrap_or_default();
println!("{} {}{}", result.session_id, status, error_info);
}
#[cfg(test)]
#[path = "wait_tests.rs"]
mod tests;