Skip to main content

harness_loop_engine/
scheduler.rs

1//! `LoopScheduler` — runs registered loops on their declared cadence.
2//!
3//! This is the "automations / scheduling" building block: it ticks on an
4//! interval and, for every loop whose cadence is due, runs one round and
5//! delivers the report through a [`LoopSink`]. It mirrors
6//! `harness-scheduler`'s execution model (a dedicated thread with its own
7//! current-thread Tokio runtime, because a sub-agent future is not `Send`)
8//! but understands whole loop *rounds* — maker, checker, budget, gate —
9//! rather than single agent turns.
10//!
11//! Registering a production pattern is one line:
12//!
13//! ```ignore
14//! let engine = LoopEngine::new(patterns::daily_triage(), model)
15//!     .with_maker_tool(read_only_tool);
16//! LoopScheduler::new().register(engine).spawn(); // ticks forever
17//! ```
18
19use crate::engine::{LoopEngine, RoundReport};
20use harness_daemon::Schedule;
21use std::sync::Arc;
22use std::time::Duration;
23
24/// Where a finished round's report goes. Implement this to route reports to
25/// Slack, email, a file, a tracker — anywhere. The default is stdout.
26pub trait LoopSink: Send + Sync {
27    fn deliver(&self, report: &RoundReport);
28}
29
30/// Prints deliverable reports to stdout.
31pub struct StdoutSink;
32
33impl LoopSink for StdoutSink {
34    fn deliver(&self, report: &RoundReport) {
35        println!("{}", report.render());
36    }
37}
38
39struct ScheduledLoop {
40    engine: LoopEngine,
41    schedule: Schedule,
42    /// Next fire time in epoch-ms; `None` means "due now".
43    next_run_ms: Option<i64>,
44}
45
46/// Ticks registered loops on their cadence.
47pub struct LoopScheduler {
48    entries: Vec<ScheduledLoop>,
49    tick: Duration,
50    sink: Arc<dyn LoopSink>,
51}
52
53impl Default for LoopScheduler {
54    fn default() -> Self {
55        Self::new()
56    }
57}
58
59impl LoopScheduler {
60    pub fn new() -> Self {
61        Self {
62            entries: Vec::new(),
63            tick: Duration::from_secs(60),
64            sink: Arc::new(StdoutSink),
65        }
66    }
67
68    /// How often the scheduler wakes to check for due loops. A loop never
69    /// fires more often than this, regardless of its cadence.
70    pub fn with_tick(mut self, d: Duration) -> Self {
71        self.tick = d;
72        self
73    }
74
75    pub fn with_sink(mut self, s: Arc<dyn LoopSink>) -> Self {
76        self.sink = s;
77        self
78    }
79
80    /// Register a loop. Its `spec().cadence` is parsed into a schedule; an
81    /// unparseable cadence is rejected so the problem surfaces at wiring
82    /// time rather than silently never firing.
83    pub fn register(mut self, engine: LoopEngine) -> Self {
84        match Schedule::parse(&engine.spec().cadence) {
85            Ok(schedule) => self.entries.push(ScheduledLoop {
86                engine,
87                schedule,
88                next_run_ms: None,
89            }),
90            Err(e) => {
91                tracing::warn!(
92                    loop = %engine.spec().name,
93                    cadence = %engine.spec().cadence,
94                    error = %e,
95                    "loop-scheduler: bad cadence, loop not registered"
96                );
97            }
98        }
99        self
100    }
101
102    pub fn len(&self) -> usize {
103        self.entries.len()
104    }
105    pub fn is_empty(&self) -> bool {
106        self.entries.is_empty()
107    }
108
109    /// Run every currently-due loop once. Returns how many fired. Best
110    /// effort: one loop's failure never stops the others (the engine itself
111    /// never returns `Err`).
112    pub async fn tick_once(&mut self) -> usize {
113        let now = chrono::Local::now();
114        let now_ms = now.timestamp_millis();
115        let mut fired = 0;
116        for entry in &mut self.entries {
117            let due = entry.next_run_ms.map(|t| t <= now_ms).unwrap_or(true);
118            if !due {
119                continue;
120            }
121            fired += 1;
122            let report = entry.engine.run_once().await;
123            if report.should_deliver() {
124                self.sink.deliver(&report);
125            }
126            entry.next_run_ms = Some(entry.schedule.next_after(now).timestamp_millis());
127        }
128        fired
129    }
130
131    /// Spawn the tick loop on a dedicated thread with its own single-threaded
132    /// Tokio runtime. Runs forever.
133    pub fn spawn(mut self) {
134        std::thread::spawn(move || {
135            let rt = tokio::runtime::Builder::new_current_thread()
136                .enable_all()
137                .build()
138                .expect("loop-scheduler: build tokio rt");
139            rt.block_on(async move {
140                loop {
141                    let _ = self.tick_once().await;
142                    tokio::time::sleep(self.tick).await;
143                }
144            });
145        });
146    }
147}