1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::Duration;
6
7use chrono::Utc;
8use tokio::task::{JoinHandle, JoinSet};
9use tokio_util::sync::CancellationToken;
10
11use crate::error::Result;
12use crate::service::{Registry, RegistrySnapshot};
13
14use super::context::CronContext;
15use super::handler::CronHandler;
16use super::meta::Meta;
17use super::schedule::Schedule;
18
19#[non_exhaustive]
21pub struct CronOptions {
22 pub timeout_secs: u64,
25}
26
27impl Default for CronOptions {
28 fn default() -> Self {
30 Self { timeout_secs: 300 }
31 }
32}
33
34type ErasedCronHandler =
35 Arc<dyn Fn(CronContext) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
36
37struct CronEntry {
38 name: String,
39 schedule: Schedule,
40 handler: ErasedCronHandler,
41 timeout_secs: u64,
42}
43
44#[must_use]
48pub struct SchedulerBuilder {
49 registry: Arc<RegistrySnapshot>,
50 entries: Vec<CronEntry>,
51}
52
53impl SchedulerBuilder {
54 pub fn job<H, Args>(self, schedule: &str, handler: H) -> Result<Self>
63 where
64 H: CronHandler<Args> + Send + Sync,
65 {
66 self.job_with(schedule, handler, CronOptions::default())
67 }
68
69 pub fn job_with<H, Args>(
75 mut self,
76 schedule: &str,
77 handler: H,
78 options: CronOptions,
79 ) -> Result<Self>
80 where
81 H: CronHandler<Args> + Send + Sync,
82 {
83 let name = std::any::type_name::<H>().to_string();
84 let parsed = Schedule::parse(schedule)?;
85
86 let erased: ErasedCronHandler = Arc::new(
87 move |ctx: CronContext| -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
88 let h = handler.clone();
89 Box::pin(async move { h.call(ctx).await })
90 },
91 );
92
93 self.entries.push(CronEntry {
94 name,
95 schedule: parsed,
96 handler: erased,
97 timeout_secs: options.timeout_secs,
98 });
99 Ok(self)
100 }
101
102 pub async fn start(self) -> Scheduler {
104 let cancel = CancellationToken::new();
105 let mut handles = Vec::new();
106
107 for entry in self.entries {
108 let handle = tokio::spawn(cron_job_loop(
109 entry.name,
110 entry.schedule,
111 entry.handler,
112 entry.timeout_secs,
113 self.registry.clone(),
114 cancel.clone(),
115 ));
116 handles.push(handle);
117 }
118
119 Scheduler { cancel, handles }
120 }
121}
122
123pub struct Scheduler {
132 cancel: CancellationToken,
133 handles: Vec<JoinHandle<()>>,
134}
135
136impl Scheduler {
137 pub fn builder(registry: &Registry) -> SchedulerBuilder {
142 SchedulerBuilder {
143 registry: registry.snapshot(),
144 entries: Vec::new(),
145 }
146 }
147}
148
149impl crate::runtime::Task for Scheduler {
150 async fn shutdown(self) -> Result<()> {
151 self.cancel.cancel();
152 let drain = async {
153 for handle in self.handles {
154 let _ = handle.await;
155 }
156 };
157 let _ = tokio::time::timeout(Duration::from_secs(30), drain).await;
158 Ok(())
159 }
160}
161
162async fn cron_job_loop(
163 name: String,
164 schedule: Schedule,
165 handler: ErasedCronHandler,
166 timeout_secs: u64,
167 registry: Arc<RegistrySnapshot>,
168 cancel: CancellationToken,
169) {
170 let running = Arc::new(AtomicBool::new(false));
171 let timeout_dur = Duration::from_secs(timeout_secs);
172 let mut handler_tasks = JoinSet::new();
173
174 let mut next_tick = match schedule.next_tick(Utc::now()) {
175 Some(t) => t,
176 None => {
177 tracing::error!(cron_job = %name, "cron expression has no future occurrence; stopping");
178 return;
179 }
180 };
181
182 loop {
183 let sleep_duration = (next_tick - Utc::now()).to_std().unwrap_or(Duration::ZERO);
184
185 tokio::select! {
186 _ = cancel.cancelled() => break,
187 _ = tokio::time::sleep(sleep_duration) => {
188 while handler_tasks.try_join_next().is_some() {}
190
191 if running.load(Ordering::SeqCst) {
193 tracing::warn!(cron_job = %name, "skipping tick, previous run still active");
194 next_tick = match schedule.next_tick(Utc::now()) {
195 Some(t) => t,
196 None => {
197 tracing::error!(cron_job = %name, "cron expression has no future occurrence; stopping");
198 break;
199 }
200 };
201 continue;
202 }
203
204 running.store(true, Ordering::SeqCst);
205
206 let deadline = tokio::time::Instant::now() + timeout_dur;
207
208 let ctx = CronContext {
209 registry: registry.clone(),
210 meta: Meta {
211 name: name.clone(),
212 deadline: Some(deadline),
213 tick: next_tick,
214 },
215 };
216
217 let running_flag = running.clone();
218 let handler_clone = handler.clone();
219 let job_name = name.clone();
220 handler_tasks.spawn(async move {
221 let result =
222 tokio::time::timeout(timeout_dur, (handler_clone)(ctx)).await;
223
224 match result {
225 Ok(Ok(())) => {
226 tracing::debug!(cron_job = %job_name, "completed");
227 }
228 Ok(Err(e)) => {
229 tracing::error!(cron_job = %job_name, error = %e, "failed");
230 }
231 Err(_) => {
232 tracing::error!(cron_job = %job_name, "timed out");
233 }
234 }
235
236 running_flag.store(false, Ordering::SeqCst);
237 });
238
239 next_tick = match schedule.next_tick(Utc::now()) {
240 Some(t) => t,
241 None => {
242 tracing::error!(cron_job = %name, "cron expression has no future occurrence; stopping");
243 break;
244 }
245 };
246 }
247 }
248 }
249
250 while handler_tasks.join_next().await.is_some() {}
252}