1use std::str::FromStr;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use chrono::Utc;
8use cron::Schedule as Cron;
9use parking_lot::Mutex;
10
11use crate::container::Container;
12use crate::Error;
13
14#[async_trait]
15pub trait ScheduledTask: Send + Sync {
16 async fn run(&self, container: &Container) -> Result<(), Error>;
17 fn description(&self) -> &str {
18 "scheduled task"
19 }
20}
21
22pub struct ScheduledEntry {
23 pub expression: String,
24 pub task: Arc<dyn ScheduledTask>,
25}
26
27#[derive(Default, Clone)]
28pub struct Schedule {
29 entries: Arc<Mutex<Vec<ScheduledEntry>>>,
30}
31
32impl Schedule {
33 pub fn new() -> Self {
34 Self::default()
35 }
36
37 pub fn push(&self, expression: impl Into<String>, task: Arc<dyn ScheduledTask>) {
38 self.entries.lock().push(ScheduledEntry {
39 expression: expression.into(),
40 task,
41 });
42 }
43
44 pub fn cron(&self, expression: &str, task: Arc<dyn ScheduledTask>) {
45 self.push(expression, task);
46 }
47
48 pub fn daily_at(&self, hour_minute: &str, task: Arc<dyn ScheduledTask>) {
49 let parts: Vec<&str> = hour_minute.split(':').collect();
50 let (h, m) = match parts.as_slice() {
51 [h, m] => (h.parse::<u32>().unwrap_or(0), m.parse::<u32>().unwrap_or(0)),
52 _ => (0, 0),
53 };
54 self.push(format!("0 {m} {h} * * *"), task);
55 }
56
57 pub fn hourly(&self, task: Arc<dyn ScheduledTask>) {
58 self.push("0 0 * * * *", task);
59 }
60
61 pub async fn run_due(&self, container: &Container) -> Result<(), Error> {
63 let entries: Vec<_> = self
64 .entries
65 .lock()
66 .iter()
67 .map(|e| (e.expression.clone(), e.task.clone()))
68 .collect();
69 let now = Utc::now();
70 for (expr, task) in entries {
71 let Ok(cron) = Cron::from_str(&expr) else {
72 tracing::warn!(expr, "invalid cron expression, skipping");
73 continue;
74 };
75 if let Some(next) = cron.upcoming(Utc).next() {
76 if (next - now).num_seconds().abs() < 60 {
78 if let Err(e) = task.run(container).await {
79 tracing::error!(error = ?e, "scheduled task failed");
80 }
81 }
82 }
83 }
84 Ok(())
85 }
86}