Skip to main content

modo/cron/
scheduler.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::Duration;
6
7use chrono::Utc;
8use tokio::task::{JoinHandle, JoinSet};
9use tokio_util::sync::CancellationToken;
10
11use crate::error::Result;
12use crate::service::{Registry, RegistrySnapshot};
13
14use super::context::CronContext;
15use super::handler::CronHandler;
16use super::meta::Meta;
17use super::schedule::Schedule;
18
19/// Per-job options supplied to [`SchedulerBuilder::job_with`].
20#[non_exhaustive]
21pub struct CronOptions {
22    /// Maximum number of seconds a single execution may run before it is
23    /// cancelled and logged as timed out. Defaults to `300` (5 minutes).
24    pub timeout_secs: u64,
25}
26
27impl Default for CronOptions {
28    /// Returns `CronOptions { timeout_secs: 300 }`.
29    fn default() -> Self {
30        Self { timeout_secs: 300 }
31    }
32}
33
34type ErasedCronHandler =
35    Arc<dyn Fn(CronContext) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
36
37struct CronEntry {
38    name: String,
39    schedule: Schedule,
40    handler: ErasedCronHandler,
41    timeout_secs: u64,
42}
43
44/// Builder for constructing a [`Scheduler`] with registered cron jobs.
45///
46/// Obtain a builder with [`Scheduler::builder`].
47#[must_use]
48pub struct SchedulerBuilder {
49    registry: Arc<RegistrySnapshot>,
50    entries: Vec<CronEntry>,
51}
52
53impl SchedulerBuilder {
54    /// Register a cron job with default options.
55    ///
56    /// The `schedule` string can be a standard cron expression, a named alias
57    /// (`@daily`, `@hourly`, etc.), or an interval (`@every 5m`).
58    ///
59    /// # Errors
60    ///
61    /// Returns an error if the schedule string is invalid.
62    pub fn job<H, Args>(self, schedule: &str, handler: H) -> Result<Self>
63    where
64        H: CronHandler<Args> + Send + Sync,
65    {
66        self.job_with(schedule, handler, CronOptions::default())
67    }
68
69    /// Register a cron job with custom [`CronOptions`].
70    ///
71    /// # Errors
72    ///
73    /// Returns an error if the schedule string is invalid.
74    pub fn job_with<H, Args>(
75        mut self,
76        schedule: &str,
77        handler: H,
78        options: CronOptions,
79    ) -> Result<Self>
80    where
81        H: CronHandler<Args> + Send + Sync,
82    {
83        let name = std::any::type_name::<H>().to_string();
84        let parsed = Schedule::parse(schedule)?;
85
86        let erased: ErasedCronHandler = Arc::new(
87            move |ctx: CronContext| -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
88                let h = handler.clone();
89                Box::pin(async move { h.call(ctx).await })
90            },
91        );
92
93        self.entries.push(CronEntry {
94            name,
95            schedule: parsed,
96            handler: erased,
97            timeout_secs: options.timeout_secs,
98        });
99        Ok(self)
100    }
101
102    /// Start all registered cron jobs and return a [`Scheduler`] handle.
103    pub async fn start(self) -> Scheduler {
104        let cancel = CancellationToken::new();
105        let mut handles = Vec::new();
106
107        for entry in self.entries {
108            let handle = tokio::spawn(cron_job_loop(
109                entry.name,
110                entry.schedule,
111                entry.handler,
112                entry.timeout_secs,
113                self.registry.clone(),
114                cancel.clone(),
115            ));
116            handles.push(handle);
117        }
118
119        Scheduler { cancel, handles }
120    }
121}
122
123/// A running cron scheduler that manages one or more periodic jobs.
124///
125/// Implements [`crate::runtime::Task`] for clean shutdown integration with the
126/// runtime's `run!` macro.
127///
128/// When [`Task::shutdown`](crate::runtime::Task::shutdown) is called (or the
129/// `run!` macro triggers it), the scheduler signals all job loops to stop and
130/// waits up to 30 seconds for in-flight executions to complete.
131pub struct Scheduler {
132    cancel: CancellationToken,
133    handles: Vec<JoinHandle<()>>,
134}
135
136impl Scheduler {
137    /// Create a new [`SchedulerBuilder`] from a service registry.
138    ///
139    /// The registry is snapshotted at build time; services added to `registry`
140    /// after this call will not be visible to cron handlers.
141    pub fn builder(registry: &Registry) -> SchedulerBuilder {
142        SchedulerBuilder {
143            registry: registry.snapshot(),
144            entries: Vec::new(),
145        }
146    }
147}
148
149impl crate::runtime::Task for Scheduler {
150    async fn shutdown(self) -> Result<()> {
151        self.cancel.cancel();
152        let drain = async {
153            for handle in self.handles {
154                let _ = handle.await;
155            }
156        };
157        let _ = tokio::time::timeout(Duration::from_secs(30), drain).await;
158        Ok(())
159    }
160}
161
162async fn cron_job_loop(
163    name: String,
164    schedule: Schedule,
165    handler: ErasedCronHandler,
166    timeout_secs: u64,
167    registry: Arc<RegistrySnapshot>,
168    cancel: CancellationToken,
169) {
170    let running = Arc::new(AtomicBool::new(false));
171    let timeout_dur = Duration::from_secs(timeout_secs);
172    let mut handler_tasks = JoinSet::new();
173
174    let mut next_tick = match schedule.next_tick(Utc::now()) {
175        Some(t) => t,
176        None => {
177            tracing::error!(cron_job = %name, "cron expression has no future occurrence; stopping");
178            return;
179        }
180    };
181
182    loop {
183        let sleep_duration = (next_tick - Utc::now()).to_std().unwrap_or(Duration::ZERO);
184
185        tokio::select! {
186            _ = cancel.cancelled() => break,
187            _ = tokio::time::sleep(sleep_duration) => {
188                // Reap finished handler tasks
189                while handler_tasks.try_join_next().is_some() {}
190
191                // Skip if previous run still going
192                if running.load(Ordering::SeqCst) {
193                    tracing::warn!(cron_job = %name, "skipping tick, previous run still active");
194                    next_tick = match schedule.next_tick(Utc::now()) {
195                        Some(t) => t,
196                        None => {
197                            tracing::error!(cron_job = %name, "cron expression has no future occurrence; stopping");
198                            break;
199                        }
200                    };
201                    continue;
202                }
203
204                running.store(true, Ordering::SeqCst);
205
206                let deadline = tokio::time::Instant::now() + timeout_dur;
207
208                let ctx = CronContext {
209                    registry: registry.clone(),
210                    meta: Meta {
211                        name: name.clone(),
212                        deadline: Some(deadline),
213                        tick: next_tick,
214                    },
215                };
216
217                let running_flag = running.clone();
218                let handler_clone = handler.clone();
219                let job_name = name.clone();
220                handler_tasks.spawn(async move {
221                    let result =
222                        tokio::time::timeout(timeout_dur, (handler_clone)(ctx)).await;
223
224                    match result {
225                        Ok(Ok(())) => {
226                            tracing::debug!(cron_job = %job_name, "completed");
227                        }
228                        Ok(Err(e)) => {
229                            tracing::error!(cron_job = %job_name, error = %e, "failed");
230                        }
231                        Err(_) => {
232                            tracing::error!(cron_job = %job_name, "timed out");
233                        }
234                    }
235
236                    running_flag.store(false, Ordering::SeqCst);
237                });
238
239                next_tick = match schedule.next_tick(Utc::now()) {
240                    Some(t) => t,
241                    None => {
242                        tracing::error!(cron_job = %name, "cron expression has no future occurrence; stopping");
243                        break;
244                    }
245                };
246            }
247        }
248    }
249
250    // Drain in-flight handler tasks before returning
251    while handler_tasks.join_next().await.is_some() {}
252}