Skip to main content

anvil_core/
schedule.rs

1//! Scheduler. Cron-style expressions matched per tick from `smith schedule:run`.
2
3use std::str::FromStr;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use chrono::Utc;
8use cron::Schedule as Cron;
9use parking_lot::Mutex;
10
11use crate::container::Container;
12use crate::Error;
13
14#[async_trait]
15pub trait ScheduledTask: Send + Sync {
16    async fn run(&self, container: &Container) -> Result<(), Error>;
17    fn description(&self) -> &str {
18        "scheduled task"
19    }
20}
21
22pub struct ScheduledEntry {
23    pub expression: String,
24    pub task: Arc<dyn ScheduledTask>,
25}
26
27#[derive(Default, Clone)]
28pub struct Schedule {
29    entries: Arc<Mutex<Vec<ScheduledEntry>>>,
30}
31
32impl Schedule {
33    pub fn new() -> Self {
34        Self::default()
35    }
36
37    pub fn push(&self, expression: impl Into<String>, task: Arc<dyn ScheduledTask>) {
38        self.entries.lock().push(ScheduledEntry {
39            expression: expression.into(),
40            task,
41        });
42    }
43
44    pub fn cron(&self, expression: &str, task: Arc<dyn ScheduledTask>) {
45        self.push(expression, task);
46    }
47
48    pub fn daily_at(&self, hour_minute: &str, task: Arc<dyn ScheduledTask>) {
49        let parts: Vec<&str> = hour_minute.split(':').collect();
50        let (h, m) = match parts.as_slice() {
51            [h, m] => (h.parse::<u32>().unwrap_or(0), m.parse::<u32>().unwrap_or(0)),
52            _ => (0, 0),
53        };
54        self.push(format!("0 {m} {h} * * *"), task);
55    }
56
57    pub fn hourly(&self, task: Arc<dyn ScheduledTask>) {
58        self.push("0 0 * * * *", task);
59    }
60
61    /// Run any tasks whose expression matches the current minute.
62    pub async fn run_due(&self, container: &Container) -> Result<(), Error> {
63        let entries: Vec<_> = self
64            .entries
65            .lock()
66            .iter()
67            .map(|e| (e.expression.clone(), e.task.clone()))
68            .collect();
69        let now = Utc::now();
70        for (expr, task) in entries {
71            let Ok(cron) = Cron::from_str(&expr) else {
72                tracing::warn!(expr, "invalid cron expression, skipping");
73                continue;
74            };
75            if let Some(next) = cron.upcoming(Utc).next() {
76                // Run if the next upcoming time is within the next minute.
77                if (next - now).num_seconds().abs() < 60 {
78                    if let Err(e) = task.run(container).await {
79                        tracing::error!(error = ?e, "scheduled task failed");
80                    }
81                }
82            }
83        }
84        Ok(())
85    }
86}