Skip to main content

modo/cron/
scheduler.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::Duration;
6
7use chrono::Utc;
8use tokio::task::{JoinHandle, JoinSet};
9use tokio_util::sync::CancellationToken;
10
11use crate::error::Result;
12use crate::service::{Registry, RegistrySnapshot};
13
14use super::context::CronContext;
15use super::handler::CronHandler;
16use super::meta::Meta;
17use super::schedule::Schedule;
18
19/// Per-job options supplied to [`SchedulerBuilder::job_with`].
20///
21/// This struct is `#[non_exhaustive]`; construct it via [`Default`] and
22/// mutate the fields you need:
23///
24/// ```rust,no_run
25/// use modo::cron::CronOptions;
26///
27/// let mut opts = CronOptions::default();
28/// opts.timeout_secs = 600;
29/// ```
30#[non_exhaustive]
31pub struct CronOptions {
32    /// Maximum number of seconds a single execution may run before it is
33    /// cancelled and logged as timed out. Defaults to `300` (5 minutes).
34    pub timeout_secs: u64,
35}
36
37impl Default for CronOptions {
38    /// Returns `CronOptions { timeout_secs: 300 }`.
39    fn default() -> Self {
40        Self { timeout_secs: 300 }
41    }
42}
43
44type ErasedCronHandler =
45    Arc<dyn Fn(CronContext) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
46
47struct CronEntry {
48    name: String,
49    schedule: Schedule,
50    handler: ErasedCronHandler,
51    timeout_secs: u64,
52}
53
54/// Builder for constructing a [`Scheduler`] with registered cron jobs.
55///
56/// Obtain a builder with [`Scheduler::builder`].
57#[must_use]
58pub struct SchedulerBuilder {
59    registry: Arc<RegistrySnapshot>,
60    entries: Vec<CronEntry>,
61}
62
63impl SchedulerBuilder {
64    /// Register a cron job with default options.
65    ///
66    /// The `schedule` string can be a standard cron expression, a named alias
67    /// (`@daily`, `@hourly`, etc.), or an interval (`@every 5m`).
68    ///
69    /// # Errors
70    ///
71    /// Returns an error if the schedule string is invalid.
72    pub fn job<H, Args>(self, schedule: &str, handler: H) -> Result<Self>
73    where
74        H: CronHandler<Args> + Send + Sync,
75    {
76        self.job_with(schedule, handler, CronOptions::default())
77    }
78
79    /// Register a cron job with custom [`CronOptions`].
80    ///
81    /// # Errors
82    ///
83    /// Returns an error if the schedule string is invalid.
84    pub fn job_with<H, Args>(
85        mut self,
86        schedule: &str,
87        handler: H,
88        options: CronOptions,
89    ) -> Result<Self>
90    where
91        H: CronHandler<Args> + Send + Sync,
92    {
93        let name = std::any::type_name::<H>().to_string();
94        let parsed = Schedule::parse(schedule)?;
95
96        let erased: ErasedCronHandler = Arc::new(
97            move |ctx: CronContext| -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
98                let h = handler.clone();
99                Box::pin(async move { h.call(ctx).await })
100            },
101        );
102
103        self.entries.push(CronEntry {
104            name,
105            schedule: parsed,
106            handler: erased,
107            timeout_secs: options.timeout_secs,
108        });
109        Ok(self)
110    }
111
112    /// Start all registered cron jobs and return a [`Scheduler`] handle.
113    pub async fn start(self) -> Scheduler {
114        let cancel = CancellationToken::new();
115        let mut handles = Vec::new();
116
117        for entry in self.entries {
118            let handle = tokio::spawn(cron_job_loop(
119                entry.name,
120                entry.schedule,
121                entry.handler,
122                entry.timeout_secs,
123                self.registry.clone(),
124                cancel.clone(),
125            ));
126            handles.push(handle);
127        }
128
129        Scheduler { cancel, handles }
130    }
131}
132
133/// A running cron scheduler that manages one or more periodic jobs.
134///
135/// Implements [`crate::runtime::Task`] for clean shutdown integration with the
136/// runtime's `run!` macro.
137///
138/// When [`Task::shutdown`](crate::runtime::Task::shutdown) is called (or the
139/// `run!` macro triggers it), the scheduler signals all job loops to stop and
140/// waits up to 30 seconds for in-flight executions to complete.
141pub struct Scheduler {
142    cancel: CancellationToken,
143    handles: Vec<JoinHandle<()>>,
144}
145
146impl Scheduler {
147    /// Create a new [`SchedulerBuilder`] from a service registry.
148    ///
149    /// The registry is snapshotted at build time; services added to `registry`
150    /// after this call will not be visible to cron handlers.
151    pub fn builder(registry: &Registry) -> SchedulerBuilder {
152        SchedulerBuilder {
153            registry: registry.snapshot(),
154            entries: Vec::new(),
155        }
156    }
157}
158
159impl crate::runtime::Task for Scheduler {
160    async fn shutdown(self) -> Result<()> {
161        self.cancel.cancel();
162        let drain = async {
163            for handle in self.handles {
164                let _ = handle.await;
165            }
166        };
167        let _ = tokio::time::timeout(Duration::from_secs(30), drain).await;
168        Ok(())
169    }
170}
171
172async fn cron_job_loop(
173    name: String,
174    schedule: Schedule,
175    handler: ErasedCronHandler,
176    timeout_secs: u64,
177    registry: Arc<RegistrySnapshot>,
178    cancel: CancellationToken,
179) {
180    let running = Arc::new(AtomicBool::new(false));
181    let timeout_dur = Duration::from_secs(timeout_secs);
182    let mut handler_tasks = JoinSet::new();
183
184    let mut next_tick = match schedule.next_tick(Utc::now()) {
185        Some(t) => t,
186        None => {
187            tracing::error!(cron_job = %name, "cron expression has no future occurrence; stopping");
188            return;
189        }
190    };
191
192    loop {
193        let sleep_duration = (next_tick - Utc::now()).to_std().unwrap_or(Duration::ZERO);
194
195        tokio::select! {
196            _ = cancel.cancelled() => break,
197            _ = tokio::time::sleep(sleep_duration) => {
198                // Reap finished handler tasks
199                while handler_tasks.try_join_next().is_some() {}
200
201                // Skip if previous run still going
202                if running.load(Ordering::SeqCst) {
203                    tracing::warn!(cron_job = %name, "skipping tick, previous run still active");
204                    next_tick = match schedule.next_tick(Utc::now()) {
205                        Some(t) => t,
206                        None => {
207                            tracing::error!(cron_job = %name, "cron expression has no future occurrence; stopping");
208                            break;
209                        }
210                    };
211                    continue;
212                }
213
214                running.store(true, Ordering::SeqCst);
215
216                let deadline = tokio::time::Instant::now() + timeout_dur;
217
218                let ctx = CronContext {
219                    registry: registry.clone(),
220                    meta: Meta {
221                        name: name.clone(),
222                        deadline: Some(deadline),
223                        tick: next_tick,
224                    },
225                };
226
227                let running_flag = running.clone();
228                let handler_clone = handler.clone();
229                let job_name = name.clone();
230                handler_tasks.spawn(async move {
231                    let result =
232                        tokio::time::timeout(timeout_dur, (handler_clone)(ctx)).await;
233
234                    match result {
235                        Ok(Ok(())) => {
236                            tracing::debug!(cron_job = %job_name, "completed");
237                        }
238                        Ok(Err(e)) => {
239                            tracing::error!(cron_job = %job_name, error = %e, "failed");
240                        }
241                        Err(_) => {
242                            tracing::error!(cron_job = %job_name, "timed out");
243                        }
244                    }
245
246                    running_flag.store(false, Ordering::SeqCst);
247                });
248
249                next_tick = match schedule.next_tick(Utc::now()) {
250                    Some(t) => t,
251                    None => {
252                        tracing::error!(cron_job = %name, "cron expression has no future occurrence; stopping");
253                        break;
254                    }
255                };
256            }
257        }
258    }
259
260    // Drain in-flight handler tasks before returning
261    while handler_tasks.join_next().await.is_some() {}
262}