Skip to main content

apm_core/
work.rs

1use anyhow::Result;
2use std::path::Path;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7pub fn run_engine_loop(
8    root: &Path,
9    cancel: Arc<AtomicBool>,
10    interval_secs: u64,
11    max_concurrent: usize,
12    skip_permissions: bool,
13    epic_filter: Option<String>,
14) -> Result<()> {
15    let mut workers: Vec<(String, Option<String>, std::process::Child, std::path::PathBuf)> = Vec::new();
16    let mut no_more = false;
17    let mut next_poll = Instant::now();
18
19    loop {
20        if cancel.load(Ordering::Relaxed) {
21            break;
22        }
23
24        let mut reaped = false;
25        workers.retain_mut(|(_, _epic_id, child, _pid_path)| {
26            let done = matches!(child.try_wait(), Ok(Some(_)));
27            if done {
28                reaped = true;
29            }
30            !done
31        });
32
33        if reaped {
34            next_poll = Instant::now();
35            no_more = false;
36        }
37
38        if no_more {
39            let now = Instant::now();
40            if now < next_poll {
41                std::thread::sleep(Duration::from_millis(500));
42                continue;
43            }
44            no_more = false;
45        }
46
47        if !no_more && workers.len() < max_concurrent {
48            let blocked_epics: Vec<String> = crate::config::Config::load(root)
49                .map(|config| {
50                    let epic_ids: Vec<Option<String>> = workers.iter()
51                        .map(|(_, eid, _, _)| eid.clone())
52                        .collect();
53                    config.blocked_epics(&epic_ids)
54                })
55                .unwrap_or_default();
56            let mut _messages = Vec::new();
57            let mut _warnings = Vec::new();
58            match crate::start::spawn_next_worker(root, true, skip_permissions, epic_filter.as_deref(), &blocked_epics, &mut _messages, &mut _warnings) {
59                Ok(None) => {
60                    next_poll = Instant::now() + Duration::from_secs(interval_secs);
61                    no_more = true;
62                }
63                Ok(Some((id, epic_id, child, pid_path))) => {
64                    workers.push((id, epic_id, child, pid_path));
65                    no_more = false;
66                }
67                Err(_) => {
68                    no_more = true;
69                    std::thread::sleep(Duration::from_secs(30));
70                }
71            }
72        } else {
73            std::thread::sleep(Duration::from_millis(500));
74        }
75    }
76
77    Ok(())
78}
79
80#[cfg(test)]
81mod tests {
82    use super::*;
83
84    #[test]
85    fn cancel_flag_stops_loop_immediately() {
86        let cancel = Arc::new(AtomicBool::new(true));
87        let root = std::path::Path::new("/nonexistent");
88        let result = run_engine_loop(root, cancel, 30, 1, false, None);
89        assert!(result.is_ok());
90    }
91}