use anyhow::Result;
use apm_core::{config::Config, ticket};
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
fn log(msg: &str) {
let ts = chrono::Local::now().format("%H:%M:%S");
println!("[{ts}] {msg}");
}
pub fn run(root: &Path, skip_permissions: bool, dry_run: bool, daemon: bool, interval_secs: u64, epic: Option<String>) -> Result<()> {
if daemon && dry_run {
anyhow::bail!("--daemon and --dry-run cannot be used together");
}
let config = Config::load(root)?;
let max_concurrent = config.agents.max_concurrent.max(1);
let epic_filter: Option<String> = epic.or_else(|| config.work.epic.clone());
if dry_run {
return run_dry(root, &config, epic_filter.as_deref());
}
let sig_count = Arc::new(AtomicUsize::new(0));
let sig_count_clone = Arc::clone(&sig_count);
let _ = ctrlc::set_handler(move || {
sig_count_clone.fetch_add(1, Ordering::Relaxed);
});
let mut workers: Vec<(String, Option<String>, apm_core::start::ManagedChild, std::path::PathBuf)> = Vec::new();
let mut started_ids: Vec<String> = Vec::new();
let mut no_more = false;
let mut next_poll = Instant::now();
let mut drain_announced = false;
loop {
let sigs = sig_count.load(Ordering::Relaxed);
if daemon {
if sigs >= 2 {
log(&format!("Forced exit; {} worker(s) may still be running", workers.len()));
break;
}
if sigs == 1 {
if workers.is_empty() {
log("Daemon stopped.");
break;
}
if !drain_announced {
log(&format!(
"Graceful shutdown: waiting for {} worker(s) to finish (Ctrl+C again to exit immediately)",
workers.len()
));
drain_announced = true;
}
workers.retain_mut(|(id, _epic_id, child, _pid_path)| {
let done = matches!(child.try_wait(), Ok(Some(_)));
if done {
log(&format!("Worker for ticket #{id} finished"));
}
!done
});
if workers.is_empty() {
log("All workers finished; exiting.");
break;
}
std::thread::sleep(Duration::from_millis(500));
continue;
}
} else if sigs >= 1 {
break;
}
let mut reaped = false;
workers.retain_mut(|(id, _epic_id, child, _pid_path)| {
let done = matches!(child.try_wait(), Ok(Some(_)));
if done {
log(&format!("Worker for ticket #{id} finished"));
reaped = true;
}
!done
});
if daemon && reaped {
next_poll = Instant::now();
no_more = false;
}
if !daemon && no_more && workers.is_empty() {
break;
}
if daemon && no_more {
let now = Instant::now();
if now < next_poll {
std::thread::sleep(Duration::from_millis(500));
continue;
}
no_more = false;
}
if !no_more && workers.len() < max_concurrent {
let (blocked_epics, default_blocked) = {
let epic_ids: Vec<Option<String>> = workers.iter()
.map(|(_, eid, _, _)| eid.clone())
.collect();
let blocked = config.blocked_epics(&epic_ids);
let def_blocked = config.is_default_branch_blocked(&epic_ids);
(blocked, def_blocked)
};
match super::start::spawn_next_worker(root, true, skip_permissions, epic_filter.as_deref(), &blocked_epics, default_blocked) {
Ok(None) => {
if daemon {
let secs = interval_secs;
log(&format!("No actionable tickets; next check in {secs}s"));
next_poll = Instant::now() + Duration::from_secs(interval_secs);
}
no_more = true;
}
Ok(Some((id, epic_id, child, pid_path))) => {
log(&format!(
"Dispatched worker for ticket #{id}"
));
started_ids.push(id.clone());
workers.push((id, epic_id, child, pid_path));
no_more = false;
}
Err(e) => {
eprintln!("warning: dispatch failed: {e:#}");
no_more = true;
std::thread::sleep(Duration::from_secs(30));
}
}
} else {
std::thread::sleep(Duration::from_millis(500));
}
}
if started_ids.is_empty() {
println!("No tickets to work.");
return Ok(());
}
if daemon {
return Ok(());
}
let tickets = ticket::load_all_from_git(root, &config.tickets.dir)?;
let good_states: Vec<&str> = config.workflow.states.iter()
.filter(|s| s.terminal)
.map(|s| s.id.as_str())
.collect();
let mut any_bad = false;
println!("\nSummary:");
for id in &started_ids {
if let Some(t) = tickets.iter().find(|t| t.frontmatter.id == *id) {
let state = &t.frontmatter.state;
let ok = good_states.contains(&state.as_str());
if !ok { any_bad = true; }
println!(" #{id} {} — {state}", t.frontmatter.title);
}
}
if any_bad {
std::process::exit(1);
}
Ok(())
}
fn run_dry(root: &Path, config: &Config, epic_filter: Option<&str>) -> Result<()> {
let pw = config.workflow.prioritization.priority_weight;
let ew = config.workflow.prioritization.effort_weight;
let rw = config.workflow.prioritization.risk_weight;
let max_concurrent = config.agents.max_concurrent.max(1);
let startable: Vec<&str> = config.workflow.states.iter()
.filter(|s| s.transitions.iter().any(|tr| tr.trigger == "command:start"))
.map(|s| s.id.as_str())
.collect();
let actionable_owned = config.actionable_states_for("agent");
let actionable: Vec<&str> = actionable_owned.iter().map(|s| s.as_str()).collect();
let tickets = ticket::load_all_from_git(root, &config.tickets.dir)?;
let mut candidates: Vec<&ticket::Ticket> = tickets
.iter()
.filter(|t| {
let state = t.frontmatter.state.as_str();
actionable.contains(&state)
&& (startable.is_empty() || startable.contains(&state))
&& epic_filter
.map_or(true, |id| t.frontmatter.epic.as_deref() == Some(id))
})
.collect();
candidates.sort_by(|a, b| {
b.score(pw, ew, rw)
.partial_cmp(&a.score(pw, ew, rw))
.unwrap_or(std::cmp::Ordering::Equal)
});
if candidates.is_empty() {
println!("dry-run: no actionable tickets");
} else {
for t in candidates.into_iter().take(max_concurrent) {
println!(
"dry-run: would start next: #{} [{}] {}",
t.frontmatter.id, t.frontmatter.state, t.frontmatter.title
);
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn daemon_dry_run_is_error() {
let result = run(
std::path::Path::new("/nonexistent"),
false,
true, true, 30,
None,
);
let err = result.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("--daemon") && msg.contains("--dry-run"),
"unexpected error: {msg}"
);
}
#[test]
fn sig_count_increments_correctly() {
let sig_count = Arc::new(AtomicUsize::new(0));
assert_eq!(sig_count.load(Ordering::Relaxed), 0);
sig_count.fetch_add(1, Ordering::Relaxed);
assert_eq!(sig_count.load(Ordering::Relaxed), 1);
sig_count.fetch_add(1, Ordering::Relaxed);
let sigs = sig_count.load(Ordering::Relaxed);
assert!(sigs >= 2);
}
}