use std::collections::HashMap;
use keel_events::EventQueueHandle;
use tokio::sync::watch;
use tracing::info;
use crate::cron_job::{CronJobConfig, CronRunner};
use crate::heartbeat::{HeartbeatConfig, HeartbeatRunner};
pub struct Scheduler {
queue_handle: EventQueueHandle,
heartbeats: HashMap<String, tokio::task::JoinHandle<()>>,
crons: HashMap<String, tokio::task::JoinHandle<()>>,
shutdown_tx: watch::Sender<bool>,
shutdown_rx: watch::Receiver<bool>,
next_heartbeat_id: u64,
}
impl Scheduler {
pub fn new(queue_handle: EventQueueHandle) -> Self {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
Self {
queue_handle,
heartbeats: HashMap::new(),
crons: HashMap::new(),
shutdown_tx,
shutdown_rx,
next_heartbeat_id: 0,
}
}
pub fn add_heartbeat(&mut self, config: HeartbeatConfig) -> String {
let id = format!("heartbeat_{}", self.next_heartbeat_id);
self.next_heartbeat_id += 1;
let handle = HeartbeatRunner::spawn(
config,
self.queue_handle.clone(),
self.shutdown_rx.clone(),
);
self.heartbeats.insert(id.clone(), handle);
info!(heartbeat_id = %id, "Heartbeat added");
id
}
pub fn remove_heartbeat(&mut self, id: &str) {
if let Some(handle) = self.heartbeats.remove(id) {
handle.abort();
info!(heartbeat_id = %id, "Heartbeat removed");
}
}
pub fn add_cron(&mut self, config: CronJobConfig) -> anyhow::Result<String> {
let id = config.id.clone();
let handle = CronRunner::spawn(
config,
self.queue_handle.clone(),
self.shutdown_rx.clone(),
)?;
self.crons.insert(id.clone(), handle);
info!(cron_id = %id, "Cron job added");
Ok(id)
}
pub fn remove_cron(&mut self, id: &str) {
if let Some(handle) = self.crons.remove(id) {
handle.abort();
info!(cron_id = %id, "Cron job removed");
}
}
pub async fn shutdown(&mut self) {
info!("Scheduler shutting down");
let _ = self.shutdown_tx.send(true);
for (id, handle) in self.heartbeats.drain() {
let _ = handle.await;
info!(heartbeat_id = %id, "Heartbeat stopped");
}
for (id, handle) in self.crons.drain() {
let _ = handle.await;
info!(cron_id = %id, "Cron job stopped");
}
info!("Scheduler shut down complete");
}
pub fn heartbeat_count(&self) -> usize {
self.heartbeats.len()
}
pub fn cron_count(&self) -> usize {
self.crons.len()
}
}