1use anyhow::Result;
2use apm_core::{config::Config, ticket};
3use std::path::Path;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8fn log(msg: &str) {
9 let ts = chrono::Local::now().format("%H:%M:%S");
10 println!("[{ts}] {msg}");
11}
12
13pub fn run(root: &Path, skip_permissions: bool, dry_run: bool, daemon: bool, interval_secs: u64, epic: Option<String>) -> Result<()> {
14 if daemon && dry_run {
15 anyhow::bail!("--daemon and --dry-run cannot be used together");
16 }
17
18 let config = Config::load(root)?;
19 let max_concurrent = config.agents.max_concurrent.max(1);
20 let epic_filter: Option<String> = epic.or_else(|| config.work.epic.clone());
21
22 if dry_run {
23 return run_dry(root, &config, epic_filter.as_deref());
24 }
25
26 let sig_count = Arc::new(AtomicUsize::new(0));
27 let sig_count_clone = Arc::clone(&sig_count);
28 let _ = ctrlc::set_handler(move || {
29 sig_count_clone.fetch_add(1, Ordering::Relaxed);
30 });
31
32 let mut workers: Vec<(String, Option<String>, apm_core::start::ManagedChild, std::path::PathBuf)> = Vec::new();
33 let mut started_ids: Vec<String> = Vec::new();
34 let mut no_more = false;
35 let mut next_poll = Instant::now();
37 let mut drain_announced = false;
38
39 loop {
40 let sigs = sig_count.load(Ordering::Relaxed);
41
42 if daemon {
43 if sigs >= 2 {
44 log(&format!("Forced exit; {} worker(s) may still be running", workers.len()));
45 break;
46 }
47 if sigs == 1 {
48 if workers.is_empty() {
49 log("Daemon stopped.");
50 break;
51 }
52 if !drain_announced {
53 log(&format!(
54 "Graceful shutdown: waiting for {} worker(s) to finish (Ctrl+C again to exit immediately)",
55 workers.len()
56 ));
57 drain_announced = true;
58 }
59 workers.retain_mut(|(id, _epic_id, child, _pid_path)| {
61 let done = matches!(child.try_wait(), Ok(Some(_)));
62 if done {
63 log(&format!("Worker for ticket #{id} finished"));
64 }
65 !done
66 });
67 if workers.is_empty() {
68 log("All workers finished; exiting.");
69 break;
70 }
71 std::thread::sleep(Duration::from_millis(500));
72 continue;
73 }
74 } else if sigs >= 1 {
75 break;
76 }
77
78 let mut reaped = false;
80 workers.retain_mut(|(id, _epic_id, child, _pid_path)| {
81 let done = matches!(child.try_wait(), Ok(Some(_)));
82 if done {
83 log(&format!("Worker for ticket #{id} finished"));
84 reaped = true;
85 }
86 !done
87 });
88
89 if reaped {
91 if daemon {
92 next_poll = Instant::now();
93 }
94 no_more = false;
95 }
96
97 if !daemon && no_more && workers.is_empty() {
98 break;
99 }
100
101 if daemon && no_more {
103 let now = Instant::now();
104 if now < next_poll {
105 std::thread::sleep(Duration::from_millis(500));
106 continue;
107 }
108 no_more = false;
110 }
111
112 if !no_more && workers.len() < max_concurrent {
113 let (blocked_epics, default_blocked) = {
114 let epic_ids: Vec<Option<String>> = workers.iter()
115 .map(|(_, eid, _, _)| eid.clone())
116 .collect();
117 let blocked = config.blocked_epics(&epic_ids);
118 let def_blocked = config.is_default_branch_blocked(&epic_ids);
119 (blocked, def_blocked)
120 };
121 match super::start::spawn_next_worker(root, true, skip_permissions, epic_filter.as_deref(), &blocked_epics, default_blocked) {
122 Ok(None) => {
123 if daemon {
124 let secs = interval_secs;
125 log(&format!("No actionable tickets; next check in {secs}s"));
126 next_poll = Instant::now() + Duration::from_secs(interval_secs);
127 }
128 no_more = true;
129 }
130 Ok(Some((id, epic_id, child, pid_path))) => {
131 log(&format!(
132 "Dispatched worker for ticket #{id}"
133 ));
134 started_ids.push(id.clone());
135 workers.push((id, epic_id, child, pid_path));
136 no_more = false;
137 }
138 Err(e) => {
139 eprintln!("warning: dispatch failed: {e:#}");
140 no_more = true;
141 std::thread::sleep(Duration::from_secs(30));
142 }
143 }
144 } else {
145 std::thread::sleep(Duration::from_millis(500));
146 }
147 }
148
149 if started_ids.is_empty() {
154 println!("No tickets to work.");
155 return Ok(());
156 }
157
158 if daemon {
159 return Ok(());
161 }
162
163 let tickets = ticket::load_all_from_git(root, &config.tickets.dir)?;
164 let good_states: Vec<&str> = config.workflow.states.iter()
165 .filter(|s| s.terminal)
166 .map(|s| s.id.as_str())
167 .collect();
168 let mut any_bad = false;
169 println!("\nSummary:");
170 for id in &started_ids {
171 if let Some(t) = tickets.iter().find(|t| t.frontmatter.id == *id) {
172 let state = &t.frontmatter.state;
173 let ok = good_states.contains(&state.as_str());
174 if !ok { any_bad = true; }
175 println!(" #{id} {} — {state}", t.frontmatter.title);
176 }
177 }
178
179 if any_bad {
180 std::process::exit(1);
181 }
182 Ok(())
183}
184
185fn run_dry(root: &Path, config: &Config, epic_filter: Option<&str>) -> Result<()> {
186 let pw = config.workflow.prioritization.priority_weight;
187 let ew = config.workflow.prioritization.effort_weight;
188 let rw = config.workflow.prioritization.risk_weight;
189 let max_concurrent = config.agents.max_concurrent.max(1);
190
191 let startable: Vec<&str> = config.workflow.states.iter()
192 .filter(|s| s.transitions.iter().any(|tr| tr.trigger == "command:start"))
193 .map(|s| s.id.as_str())
194 .collect();
195 let actionable_owned = config.actionable_states_for("agent");
196 let actionable: Vec<&str> = actionable_owned.iter().map(|s| s.as_str()).collect();
197
198 let tickets = ticket::load_all_from_git(root, &config.tickets.dir)?;
199 let mut candidates: Vec<&ticket::Ticket> = tickets
200 .iter()
201 .filter(|t| {
202 let state = t.frontmatter.state.as_str();
203 actionable.contains(&state)
204 && (startable.is_empty() || startable.contains(&state))
205 && epic_filter
206 .map_or(true, |id| t.frontmatter.epic.as_deref() == Some(id))
207 })
208 .collect();
209 candidates.sort_by(|a, b| {
210 b.score(pw, ew, rw)
211 .partial_cmp(&a.score(pw, ew, rw))
212 .unwrap_or(std::cmp::Ordering::Equal)
213 });
214
215 if candidates.is_empty() {
216 println!("dry-run: no actionable tickets");
217 } else {
218 for t in candidates.into_iter().take(max_concurrent) {
219 println!(
220 "dry-run: would start next: #{} [{}] {}",
221 t.frontmatter.id, t.frontmatter.state, t.frontmatter.title
222 );
223 }
224 }
225 Ok(())
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231
232 #[test]
233 fn daemon_dry_run_is_error() {
234 let result = run(
238 std::path::Path::new("/nonexistent"),
239 false,
240 true, true, 30,
243 None,
244 );
245 let err = result.unwrap_err();
246 let msg = err.to_string();
247 assert!(
248 msg.contains("--daemon") && msg.contains("--dry-run"),
249 "unexpected error: {msg}"
250 );
251 }
252
253 #[test]
254 fn sig_count_increments_correctly() {
255 let sig_count = Arc::new(AtomicUsize::new(0));
256 assert_eq!(sig_count.load(Ordering::Relaxed), 0);
257
258 sig_count.fetch_add(1, Ordering::Relaxed);
259 assert_eq!(sig_count.load(Ordering::Relaxed), 1);
260
261 sig_count.fetch_add(1, Ordering::Relaxed);
262 let sigs = sig_count.load(Ordering::Relaxed);
263 assert!(sigs >= 2);
264 }
265}