1use std::future::Future;
2use std::pin::Pin;
3use std::time::Duration;
4use tokio_util::sync::CancellationToken;
5
6use quarlus_core::scheduling::{ScheduleConfig, ScheduledTaskDef};
7
8pub struct Scheduler;
26
27impl<T: Clone + Send + Sync + 'static> quarlus_core::Plugin<T> for Scheduler {
28 fn install(self, app: quarlus_core::AppBuilder<T>) -> quarlus_core::AppBuilder<T> {
29 let cancel = CancellationToken::new();
30 let cancel_stop = cancel.clone();
31
32 app.set_scheduler_backend(
33 Box::new(move |task_defs, state| {
34 start_tasks_from_defs(task_defs, state, cancel);
35 }),
36 Box::new(move || {
37 cancel_stop.cancel();
38 }),
39 )
40 }
41}
42
43fn start_tasks_from_defs<T: Clone + Send + Sync + 'static>(
45 task_defs: Vec<ScheduledTaskDef<T>>,
46 state: T,
47 cancel: CancellationToken,
48) {
49 for def in task_defs {
50 let state = state.clone();
51 let cancel = cancel.clone();
52 let name = def.name.clone();
53
54 tokio::spawn(async move {
55 tracing::info!(task = %name, "Scheduled task started");
56 match def.schedule {
57 ScheduleConfig::Interval(interval) => {
58 run_interval(&name, interval, Duration::ZERO, state, cancel, &def.task)
59 .await;
60 }
61 ScheduleConfig::IntervalWithDelay {
62 interval,
63 initial_delay,
64 } => {
65 run_interval(&name, interval, initial_delay, state, cancel, &def.task)
66 .await;
67 }
68 ScheduleConfig::Cron(expr) => {
69 run_cron(&name, &expr, state, cancel, &def.task).await;
70 }
71 }
72 tracing::info!(task = %name, "Scheduled task stopped");
73 });
74 }
75}
76
77async fn run_interval<T: Clone + Send + Sync + 'static>(
78 name: &str,
79 interval: Duration,
80 initial_delay: Duration,
81 state: T,
82 cancel: CancellationToken,
83 task: &(dyn Fn(T) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync),
84) {
85 if !initial_delay.is_zero() {
86 tokio::select! {
87 _ = tokio::time::sleep(initial_delay) => {},
88 _ = cancel.cancelled() => { return; }
89 }
90 }
91
92 let mut tick = tokio::time::interval(interval);
93 loop {
94 tokio::select! {
95 _ = tick.tick() => {
96 tracing::debug!(task = %name, "Executing scheduled task");
97 task(state.clone()).await;
98 }
99 _ = cancel.cancelled() => {
100 break;
101 }
102 }
103 }
104}
105
106async fn run_cron<T: Clone + Send + Sync + 'static>(
107 name: &str,
108 expr: &str,
109 state: T,
110 cancel: CancellationToken,
111 task: &(dyn Fn(T) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync),
112) {
113 let schedule = match expr.parse::<cron::Schedule>() {
114 Ok(s) => s,
115 Err(e) => {
116 tracing::error!(task = %name, error = %e, "Invalid cron expression");
117 return;
118 }
119 };
120
121 loop {
122 let now = chrono::Utc::now();
123 let next = match schedule.upcoming(chrono::Utc).next() {
124 Some(n) => n,
125 None => {
126 tracing::warn!(task = %name, "No more upcoming cron executions");
127 break;
128 }
129 };
130
131 let until = (next - now).to_std().unwrap_or(Duration::from_secs(1));
132
133 tokio::select! {
134 _ = tokio::time::sleep(until) => {
135 tracing::debug!(task = %name, "Executing cron task");
136 task(state.clone()).await;
137 }
138 _ = cancel.cancelled() => {
139 break;
140 }
141 }
142 }
143}