Skip to main content

apm/cmd/
work.rs

1use anyhow::Result;
2use apm_core::{config::Config, ticket};
3use std::path::Path;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8fn log(msg: &str) {
9    let ts = chrono::Local::now().format("%H:%M:%S");
10    println!("[{ts}] {msg}");
11}
12
13pub fn run(root: &Path, skip_permissions: bool, dry_run: bool, daemon: bool, interval_secs: u64, epic: Option<String>) -> Result<()> {
14    if daemon && dry_run {
15        anyhow::bail!("--daemon and --dry-run cannot be used together");
16    }
17
18    let config = Config::load(root)?;
19    let max_concurrent = config.agents.max_concurrent.max(1);
20    let epic_filter: Option<String> = epic.or_else(|| config.work.epic.clone());
21
22    if dry_run {
23        return run_dry(root, &config, epic_filter.as_deref());
24    }
25
26    let sig_count = Arc::new(AtomicUsize::new(0));
27    let sig_count_clone = Arc::clone(&sig_count);
28    let _ = ctrlc::set_handler(move || {
29        sig_count_clone.fetch_add(1, Ordering::Relaxed);
30    });
31
32    let mut workers: Vec<(String, Option<String>, apm_core::start::ManagedChild, std::path::PathBuf)> = Vec::new();
33    let mut started_ids: Vec<String> = Vec::new();
34    let mut no_more = false;
35    // next_poll only used in daemon mode
36    let mut next_poll = Instant::now();
37    let mut drain_announced = false;
38
39    loop {
40        let sigs = sig_count.load(Ordering::Relaxed);
41
42        if daemon {
43            if sigs >= 2 {
44                log(&format!("Forced exit; {} worker(s) may still be running", workers.len()));
45                break;
46            }
47            if sigs == 1 {
48                if workers.is_empty() {
49                    log("Daemon stopped.");
50                    break;
51                }
52                if !drain_announced {
53                    log(&format!(
54                        "Graceful shutdown: waiting for {} worker(s) to finish (Ctrl+C again to exit immediately)",
55                        workers.len()
56                    ));
57                    drain_announced = true;
58                }
59                // Reap finished workers during drain.
60                workers.retain_mut(|(id, _epic_id, child, _pid_path)| {
61                    let done = matches!(child.try_wait(), Ok(Some(_)));
62                    if done {
63                        log(&format!("Worker for ticket #{id} finished"));
64                    }
65                    !done
66                });
67                if workers.is_empty() {
68                    log("All workers finished; exiting.");
69                    break;
70                }
71                std::thread::sleep(Duration::from_millis(500));
72                continue;
73            }
74        } else if sigs >= 1 {
75            break;
76        }
77
78        // Reap finished workers.
79        let mut reaped = false;
80        workers.retain_mut(|(id, _epic_id, child, _pid_path)| {
81            let done = matches!(child.try_wait(), Ok(Some(_)));
82            if done {
83                log(&format!("Worker for ticket #{id} finished"));
84                reaped = true;
85            }
86            !done
87        });
88
89        // A reaped worker opens a slot — retry dispatch in both modes.
90        if reaped {
91            if daemon {
92                next_poll = Instant::now();
93            }
94            no_more = false;
95        }
96
97        if !daemon && no_more && workers.is_empty() {
98            break;
99        }
100
101        // In daemon mode: if no_more and not yet time to poll again, sleep and continue.
102        if daemon && no_more {
103            let now = Instant::now();
104            if now < next_poll {
105                std::thread::sleep(Duration::from_millis(500));
106                continue;
107            }
108            // Poll interval elapsed — try again.
109            no_more = false;
110        }
111
112        if !no_more && workers.len() < max_concurrent {
113            let (blocked_epics, default_blocked) = {
114                let epic_ids: Vec<Option<String>> = workers.iter()
115                    .map(|(_, eid, _, _)| eid.clone())
116                    .collect();
117                let blocked = config.blocked_epics(&epic_ids);
118                let def_blocked = config.is_default_branch_blocked(&epic_ids);
119                (blocked, def_blocked)
120            };
121            match super::start::spawn_next_worker(root, true, skip_permissions, epic_filter.as_deref(), &blocked_epics, default_blocked) {
122                Ok(None) => {
123                    if daemon {
124                        let secs = interval_secs;
125                        log(&format!("No actionable tickets; next check in {secs}s"));
126                        next_poll = Instant::now() + Duration::from_secs(interval_secs);
127                    }
128                    no_more = true;
129                }
130                Ok(Some((id, epic_id, child, pid_path))) => {
131                    log(&format!(
132                        "Dispatched worker for ticket #{id}"
133                    ));
134                    started_ids.push(id.clone());
135                    workers.push((id, epic_id, child, pid_path));
136                    no_more = false;
137                }
138                Err(e) => {
139                    eprintln!("warning: dispatch failed: {e:#}");
140                    no_more = true;
141                    std::thread::sleep(Duration::from_secs(30));
142                }
143            }
144        } else {
145            std::thread::sleep(Duration::from_millis(500));
146        }
147    }
148
149    // Wait for all remaining workers in non-daemon mode (they were already
150    // reaped in the loop above for daemon mode; non-daemon exits when empty).
151    // In daemon mode workers run independently — we just stop dispatching.
152
153    if started_ids.is_empty() {
154        println!("No tickets to work.");
155        return Ok(());
156    }
157
158    if daemon {
159        // Don't print summary — workers are still running independently.
160        return Ok(());
161    }
162
163    let tickets = ticket::load_all_from_git(root, &config.tickets.dir)?;
164    let good_states: Vec<&str> = config.workflow.states.iter()
165        .filter(|s| s.terminal)
166        .map(|s| s.id.as_str())
167        .collect();
168    let mut any_bad = false;
169    println!("\nSummary:");
170    for id in &started_ids {
171        if let Some(t) = tickets.iter().find(|t| t.frontmatter.id == *id) {
172            let state = &t.frontmatter.state;
173            let ok = good_states.contains(&state.as_str());
174            if !ok { any_bad = true; }
175            println!("  #{id} {} — {state}", t.frontmatter.title);
176        }
177    }
178
179    if any_bad {
180        std::process::exit(1);
181    }
182    Ok(())
183}
184
185fn run_dry(root: &Path, config: &Config, epic_filter: Option<&str>) -> Result<()> {
186    let pw = config.workflow.prioritization.priority_weight;
187    let ew = config.workflow.prioritization.effort_weight;
188    let rw = config.workflow.prioritization.risk_weight;
189    let max_concurrent = config.agents.max_concurrent.max(1);
190
191    let startable: Vec<&str> = config.workflow.states.iter()
192        .filter(|s| s.transitions.iter().any(|tr| tr.trigger == "command:start"))
193        .map(|s| s.id.as_str())
194        .collect();
195    let actionable_owned = config.actionable_states_for("agent");
196    let actionable: Vec<&str> = actionable_owned.iter().map(|s| s.as_str()).collect();
197
198    let tickets = ticket::load_all_from_git(root, &config.tickets.dir)?;
199    let mut candidates: Vec<&ticket::Ticket> = tickets
200        .iter()
201        .filter(|t| {
202            let state = t.frontmatter.state.as_str();
203            actionable.contains(&state)
204                && (startable.is_empty() || startable.contains(&state))
205                && epic_filter
206                    .map_or(true, |id| t.frontmatter.epic.as_deref() == Some(id))
207        })
208        .collect();
209    candidates.sort_by(|a, b| {
210        b.score(pw, ew, rw)
211            .partial_cmp(&a.score(pw, ew, rw))
212            .unwrap_or(std::cmp::Ordering::Equal)
213    });
214
215    if candidates.is_empty() {
216        println!("dry-run: no actionable tickets");
217    } else {
218        for t in candidates.into_iter().take(max_concurrent) {
219            println!(
220                "dry-run: would start next: #{} [{}] {}",
221                t.frontmatter.id, t.frontmatter.state, t.frontmatter.title
222            );
223        }
224    }
225    Ok(())
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231
232    #[test]
233    fn daemon_dry_run_is_error() {
234        // We can't call run() against a real git repo here, but we can verify
235        // the guard fires before any I/O by passing a non-existent path and
236        // ensuring the error message mentions the flag combination.
237        let result = run(
238            std::path::Path::new("/nonexistent"),
239            false,
240            true,  // dry_run
241            true,  // daemon
242            30,
243            None,
244        );
245        let err = result.unwrap_err();
246        let msg = err.to_string();
247        assert!(
248            msg.contains("--daemon") && msg.contains("--dry-run"),
249            "unexpected error: {msg}"
250        );
251    }
252
253    #[test]
254    fn sig_count_increments_correctly() {
255        let sig_count = Arc::new(AtomicUsize::new(0));
256        assert_eq!(sig_count.load(Ordering::Relaxed), 0);
257
258        sig_count.fetch_add(1, Ordering::Relaxed);
259        assert_eq!(sig_count.load(Ordering::Relaxed), 1);
260
261        sig_count.fetch_add(1, Ordering::Relaxed);
262        let sigs = sig_count.load(Ordering::Relaxed);
263        assert!(sigs >= 2);
264    }
265}