use crate::engine::{LoopEngine, RoundReport};
use harness_daemon::Schedule;
use std::sync::Arc;
use std::time::Duration;
pub trait LoopSink: Send + Sync {
fn deliver(&self, report: &RoundReport);
}
pub struct StdoutSink;
impl LoopSink for StdoutSink {
fn deliver(&self, report: &RoundReport) {
println!("{}", report.render());
}
}
struct ScheduledLoop {
engine: LoopEngine,
schedule: Schedule,
next_run_ms: Option<i64>,
}
pub struct LoopScheduler {
entries: Vec<ScheduledLoop>,
tick: Duration,
sink: Arc<dyn LoopSink>,
}
impl Default for LoopScheduler {
fn default() -> Self {
Self::new()
}
}
impl LoopScheduler {
pub fn new() -> Self {
Self {
entries: Vec::new(),
tick: Duration::from_secs(60),
sink: Arc::new(StdoutSink),
}
}
pub fn with_tick(mut self, d: Duration) -> Self {
self.tick = d;
self
}
pub fn with_sink(mut self, s: Arc<dyn LoopSink>) -> Self {
self.sink = s;
self
}
pub fn register(mut self, engine: LoopEngine) -> Self {
match Schedule::parse(&engine.spec().cadence) {
Ok(schedule) => self.entries.push(ScheduledLoop {
engine,
schedule,
next_run_ms: None,
}),
Err(e) => {
tracing::warn!(
loop = %engine.spec().name,
cadence = %engine.spec().cadence,
error = %e,
"loop-scheduler: bad cadence, loop not registered"
);
}
}
self
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub async fn tick_once(&mut self) -> usize {
let now = chrono::Local::now();
let now_ms = now.timestamp_millis();
let mut fired = 0;
for entry in &mut self.entries {
let due = entry.next_run_ms.map(|t| t <= now_ms).unwrap_or(true);
if !due {
continue;
}
fired += 1;
let report = entry.engine.run_once().await;
if report.should_deliver() {
self.sink.deliver(&report);
}
entry.next_run_ms = Some(entry.schedule.next_after(now).timestamp_millis());
}
fired
}
pub fn spawn(mut self) {
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("loop-scheduler: build tokio rt");
rt.block_on(async move {
loop {
let _ = self.tick_once().await;
tokio::time::sleep(self.tick).await;
}
});
});
}
}