use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
#[allow(clippy::type_complexity)]
pub struct ScheduledTask {
label: String,
schedule: cron::Schedule,
handler: Arc<Box<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>>,
}
pub struct Scheduler {
tasks: Vec<ScheduledTask>,
}
impl Scheduler {
pub fn new() -> Self {
Self { tasks: Vec::new() }
}
pub fn task<F, Fut>(mut self, cron_expr: &str, handler: F) -> Self
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let full_expr = format!("0 {} *", cron_expr);
let schedule: cron::Schedule = full_expr
.parse()
.unwrap_or_else(|e| panic!("Invalid cron expression '{}': {}", cron_expr, e));
let label = cron_expr.to_string();
let boxed: Box<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync> =
Box::new(move || Box::pin(handler()));
self.tasks.push(ScheduledTask {
label,
schedule,
handler: Arc::new(boxed),
});
self
}
pub fn start(self) {
for task in self.tasks {
let schedule = task.schedule;
let handler = task.handler;
let label = task.label;
tokio::spawn(async move {
println!("📅 Scheduler: task '{}' registered.", label);
loop {
let now = chrono::Utc::now();
if let Some(next) = schedule.upcoming(chrono::Utc).next() {
let duration = (next - now)
.to_std()
.unwrap_or(std::time::Duration::from_secs(60));
tokio::time::sleep(duration).await;
let handler_clone = Arc::clone(&handler);
tokio::spawn(async move {
handler_clone().await;
});
} else {
break;
}
}
});
}
}
}
impl Default for Scheduler {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
#[test]
fn test_scheduler_creation() {
let scheduler = Scheduler::new();
assert_eq!(scheduler.tasks.len(), 0);
}
#[test]
fn test_scheduler_task_registration() {
let scheduler = Scheduler::new()
.task("* * * * *", || async {})
.task("0 0 * * *", || async {});
assert_eq!(scheduler.tasks.len(), 2);
}
#[test]
fn test_scheduler_label_preserved() {
let scheduler = Scheduler::new().task("30 2 * * 1", || async {});
assert_eq!(scheduler.tasks[0].label, "30 2 * * 1");
}
#[test]
#[should_panic(expected = "Invalid cron expression")]
fn test_scheduler_invalid_cron() {
let _scheduler = Scheduler::new().task("invalid cron", || async {});
}
#[tokio::test]
async fn test_scheduler_cron_parses_correctly() {
let scheduler = Scheduler::new().task("*/5 * * * *", || async {});
let next = scheduler.tasks[0].schedule.upcoming(chrono::Utc).next();
assert!(next.is_some(), "Scheduler should have upcoming executions");
}
}