use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use tokio::sync::watch;
use tracing::{debug, info, warn};
use crate::AppState;
pub struct Scheduler {
pub interval_secs: u64,
}
impl Scheduler {
pub fn new(interval_secs: u64) -> Self {
Self { interval_secs }
}
pub async fn run<F, Fut>(
self,
state: Arc<AppState>,
mut shutdown: watch::Receiver<bool>,
pipeline_runner: F,
) -> Result<()>
where
F: Fn(Arc<AppState>, watch::Receiver<bool>, bool, bool) -> Fut,
Fut: Future<Output = Result<()>>,
{
info!(interval_secs = self.interval_secs, "scheduler started");
let interval = Duration::from_secs(self.interval_secs);
loop {
tokio::select! {
_ = tokio::time::sleep(interval) => {
debug!(interval_secs = self.interval_secs, "scheduler tick — checking for pending tasks");
let (_, shutdown_rx) = watch::channel(false);
match pipeline_runner(state.clone(), shutdown_rx, false, false).await {
Ok(_) => info!("scheduler pipeline run complete"),
Err(e) => warn!(error = %e, "scheduler pipeline run failed"),
}
}
_ = shutdown.changed() => {
if *shutdown.borrow() {
info!("scheduler received shutdown signal — stopping");
break;
}
}
}
}
Ok(())
}
}
impl Default for Scheduler {
fn default() -> Self {
Self::new(30)
}
}