harness_loop_engine/scheduler.rs
1//! `LoopScheduler` — runs registered loops on their declared cadence.
2//!
3//! This is the "automations / scheduling" building block: it ticks on an
4//! interval and, for every loop whose cadence is due, runs one round and
5//! delivers the report through a [`LoopSink`]. It mirrors
6//! `harness-scheduler`'s execution model (a dedicated thread with its own
7//! current-thread Tokio runtime, because a sub-agent future is not `Send`)
8//! but understands whole loop *rounds* — maker, checker, budget, gate —
9//! rather than single agent turns.
10//!
11//! Registering a production pattern is one line:
12//!
13//! ```ignore
14//! let engine = LoopEngine::new(patterns::daily_triage(), model)
15//! .with_maker_tool(read_only_tool);
16//! LoopScheduler::new().register(engine).spawn(); // ticks forever
17//! ```
18
19use crate::engine::{LoopEngine, RoundReport};
20use harness_daemon::Schedule;
21use std::sync::Arc;
22use std::time::Duration;
23
24/// Where a finished round's report goes. Implement this to route reports to
25/// Slack, email, a file, a tracker — anywhere. The default is stdout.
26pub trait LoopSink: Send + Sync {
27 fn deliver(&self, report: &RoundReport);
28}
29
30/// Prints deliverable reports to stdout.
31pub struct StdoutSink;
32
33impl LoopSink for StdoutSink {
34 fn deliver(&self, report: &RoundReport) {
35 println!("{}", report.render());
36 }
37}
38
39struct ScheduledLoop {
40 engine: LoopEngine,
41 schedule: Schedule,
42 /// Next fire time in epoch-ms; `None` means "due now".
43 next_run_ms: Option<i64>,
44}
45
46/// Ticks registered loops on their cadence.
47pub struct LoopScheduler {
48 entries: Vec<ScheduledLoop>,
49 tick: Duration,
50 sink: Arc<dyn LoopSink>,
51}
52
53impl Default for LoopScheduler {
54 fn default() -> Self {
55 Self::new()
56 }
57}
58
59impl LoopScheduler {
60 pub fn new() -> Self {
61 Self {
62 entries: Vec::new(),
63 tick: Duration::from_secs(60),
64 sink: Arc::new(StdoutSink),
65 }
66 }
67
68 /// How often the scheduler wakes to check for due loops. A loop never
69 /// fires more often than this, regardless of its cadence.
70 pub fn with_tick(mut self, d: Duration) -> Self {
71 self.tick = d;
72 self
73 }
74
75 pub fn with_sink(mut self, s: Arc<dyn LoopSink>) -> Self {
76 self.sink = s;
77 self
78 }
79
80 /// Register a loop. Its `spec().cadence` is parsed into a schedule; an
81 /// unparseable cadence is rejected so the problem surfaces at wiring
82 /// time rather than silently never firing.
83 pub fn register(mut self, engine: LoopEngine) -> Self {
84 match Schedule::parse(&engine.spec().cadence) {
85 Ok(schedule) => self.entries.push(ScheduledLoop {
86 engine,
87 schedule,
88 next_run_ms: None,
89 }),
90 Err(e) => {
91 tracing::warn!(
92 loop = %engine.spec().name,
93 cadence = %engine.spec().cadence,
94 error = %e,
95 "loop-scheduler: bad cadence, loop not registered"
96 );
97 }
98 }
99 self
100 }
101
102 pub fn len(&self) -> usize {
103 self.entries.len()
104 }
105 pub fn is_empty(&self) -> bool {
106 self.entries.is_empty()
107 }
108
109 /// Run every currently-due loop once. Returns how many fired. Best
110 /// effort: one loop's failure never stops the others (the engine itself
111 /// never returns `Err`).
112 pub async fn tick_once(&mut self) -> usize {
113 let now = chrono::Local::now();
114 let now_ms = now.timestamp_millis();
115 let mut fired = 0;
116 for entry in &mut self.entries {
117 let due = entry.next_run_ms.map(|t| t <= now_ms).unwrap_or(true);
118 if !due {
119 continue;
120 }
121 fired += 1;
122 let report = entry.engine.run_once().await;
123 if report.should_deliver() {
124 self.sink.deliver(&report);
125 }
126 entry.next_run_ms = Some(entry.schedule.next_after(now).timestamp_millis());
127 }
128 fired
129 }
130
131 /// Spawn the tick loop on a dedicated thread with its own single-threaded
132 /// Tokio runtime. Runs forever.
133 pub fn spawn(mut self) {
134 std::thread::spawn(move || {
135 let rt = tokio::runtime::Builder::new_current_thread()
136 .enable_all()
137 .build()
138 .expect("loop-scheduler: build tokio rt");
139 rt.block_on(async move {
140 loop {
141 let _ = self.tick_once().await;
142 tokio::time::sleep(self.tick).await;
143 }
144 });
145 });
146 }
147}