Skip to main content

quarlus_scheduler/
lib.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::time::Duration;
4use tokio_util::sync::CancellationToken;
5
6use quarlus_core::scheduling::{ScheduleConfig, ScheduledTaskDef};
7
8/// Scheduler plugin — installs the scheduler runtime into the application.
9///
10/// Add `.with(Scheduler)` to your `AppBuilder` to enable scheduled tasks.
11/// Controllers that declare `#[scheduled]` methods are auto-discovered via
12/// `register_controller()`.
13///
14/// # Example
15///
16/// ```ignore
17/// use quarlus_scheduler::Scheduler;
18///
19/// AppBuilder::new()
20///     .build_state::<Services>()
21///     .with(Scheduler)
22///     .register_controller::<ScheduledJobs>()
23///     .serve("0.0.0.0:3000")
24/// ```
25pub struct Scheduler;
26
27impl<T: Clone + Send + Sync + 'static> quarlus_core::Plugin<T> for Scheduler {
28    fn install(self, app: quarlus_core::AppBuilder<T>) -> quarlus_core::AppBuilder<T> {
29        let cancel = CancellationToken::new();
30        let cancel_stop = cancel.clone();
31
32        app.set_scheduler_backend(
33            Box::new(move |task_defs, state| {
34                start_tasks_from_defs(task_defs, state, cancel);
35            }),
36            Box::new(move || {
37                cancel_stop.cancel();
38            }),
39        )
40    }
41}
42
43/// Convert [`ScheduledTaskDef`]s (from quarlus-core) into running Tokio tasks.
44fn start_tasks_from_defs<T: Clone + Send + Sync + 'static>(
45    task_defs: Vec<ScheduledTaskDef<T>>,
46    state: T,
47    cancel: CancellationToken,
48) {
49    for def in task_defs {
50        let state = state.clone();
51        let cancel = cancel.clone();
52        let name = def.name.clone();
53
54        tokio::spawn(async move {
55            tracing::info!(task = %name, "Scheduled task started");
56            match def.schedule {
57                ScheduleConfig::Interval(interval) => {
58                    run_interval(&name, interval, Duration::ZERO, state, cancel, &def.task)
59                        .await;
60                }
61                ScheduleConfig::IntervalWithDelay {
62                    interval,
63                    initial_delay,
64                } => {
65                    run_interval(&name, interval, initial_delay, state, cancel, &def.task)
66                        .await;
67                }
68                ScheduleConfig::Cron(expr) => {
69                    run_cron(&name, &expr, state, cancel, &def.task).await;
70                }
71            }
72            tracing::info!(task = %name, "Scheduled task stopped");
73        });
74    }
75}
76
77async fn run_interval<T: Clone + Send + Sync + 'static>(
78    name: &str,
79    interval: Duration,
80    initial_delay: Duration,
81    state: T,
82    cancel: CancellationToken,
83    task: &(dyn Fn(T) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync),
84) {
85    if !initial_delay.is_zero() {
86        tokio::select! {
87            _ = tokio::time::sleep(initial_delay) => {},
88            _ = cancel.cancelled() => { return; }
89        }
90    }
91
92    let mut tick = tokio::time::interval(interval);
93    loop {
94        tokio::select! {
95            _ = tick.tick() => {
96                tracing::debug!(task = %name, "Executing scheduled task");
97                task(state.clone()).await;
98            }
99            _ = cancel.cancelled() => {
100                break;
101            }
102        }
103    }
104}
105
106async fn run_cron<T: Clone + Send + Sync + 'static>(
107    name: &str,
108    expr: &str,
109    state: T,
110    cancel: CancellationToken,
111    task: &(dyn Fn(T) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync),
112) {
113    let schedule = match expr.parse::<cron::Schedule>() {
114        Ok(s) => s,
115        Err(e) => {
116            tracing::error!(task = %name, error = %e, "Invalid cron expression");
117            return;
118        }
119    };
120
121    loop {
122        let now = chrono::Utc::now();
123        let next = match schedule.upcoming(chrono::Utc).next() {
124            Some(n) => n,
125            None => {
126                tracing::warn!(task = %name, "No more upcoming cron executions");
127                break;
128            }
129        };
130
131        let until = (next - now).to_std().unwrap_or(Duration::from_secs(1));
132
133        tokio::select! {
134            _ = tokio::time::sleep(until) => {
135                tracing::debug!(task = %name, "Executing cron task");
136                task(state.clone()).await;
137            }
138            _ = cancel.cancelled() => {
139                break;
140            }
141        }
142    }
143}