1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::Duration;
6
7use chrono::Utc;
8use tokio::task::{JoinHandle, JoinSet};
9use tokio_util::sync::CancellationToken;
10
11use crate::error::Result;
12use crate::service::{Registry, RegistrySnapshot};
13
14use super::context::CronContext;
15use super::handler::CronHandler;
16use super::meta::Meta;
17use super::schedule::Schedule;
18
19#[non_exhaustive]
31pub struct CronOptions {
32 pub timeout_secs: u64,
35}
36
37impl Default for CronOptions {
38 fn default() -> Self {
40 Self { timeout_secs: 300 }
41 }
42}
43
44type ErasedCronHandler =
45 Arc<dyn Fn(CronContext) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
46
47struct CronEntry {
48 name: String,
49 schedule: Schedule,
50 handler: ErasedCronHandler,
51 timeout_secs: u64,
52}
53
54#[must_use]
58pub struct SchedulerBuilder {
59 registry: Arc<RegistrySnapshot>,
60 entries: Vec<CronEntry>,
61}
62
63impl SchedulerBuilder {
64 pub fn job<H, Args>(self, schedule: &str, handler: H) -> Result<Self>
73 where
74 H: CronHandler<Args> + Send + Sync,
75 {
76 self.job_with(schedule, handler, CronOptions::default())
77 }
78
79 pub fn job_with<H, Args>(
85 mut self,
86 schedule: &str,
87 handler: H,
88 options: CronOptions,
89 ) -> Result<Self>
90 where
91 H: CronHandler<Args> + Send + Sync,
92 {
93 let name = std::any::type_name::<H>().to_string();
94 let parsed = Schedule::parse(schedule)?;
95
96 let erased: ErasedCronHandler = Arc::new(
97 move |ctx: CronContext| -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
98 let h = handler.clone();
99 Box::pin(async move { h.call(ctx).await })
100 },
101 );
102
103 self.entries.push(CronEntry {
104 name,
105 schedule: parsed,
106 handler: erased,
107 timeout_secs: options.timeout_secs,
108 });
109 Ok(self)
110 }
111
112 pub async fn start(self) -> Scheduler {
114 let cancel = CancellationToken::new();
115 let mut handles = Vec::new();
116
117 for entry in self.entries {
118 let handle = tokio::spawn(cron_job_loop(
119 entry.name,
120 entry.schedule,
121 entry.handler,
122 entry.timeout_secs,
123 self.registry.clone(),
124 cancel.clone(),
125 ));
126 handles.push(handle);
127 }
128
129 Scheduler { cancel, handles }
130 }
131}
132
133pub struct Scheduler {
142 cancel: CancellationToken,
143 handles: Vec<JoinHandle<()>>,
144}
145
146impl Scheduler {
147 pub fn builder(registry: &Registry) -> SchedulerBuilder {
152 SchedulerBuilder {
153 registry: registry.snapshot(),
154 entries: Vec::new(),
155 }
156 }
157}
158
159impl crate::runtime::Task for Scheduler {
160 async fn shutdown(self) -> Result<()> {
161 self.cancel.cancel();
162 let drain = async {
163 for handle in self.handles {
164 let _ = handle.await;
165 }
166 };
167 let _ = tokio::time::timeout(Duration::from_secs(30), drain).await;
168 Ok(())
169 }
170}
171
172async fn cron_job_loop(
173 name: String,
174 schedule: Schedule,
175 handler: ErasedCronHandler,
176 timeout_secs: u64,
177 registry: Arc<RegistrySnapshot>,
178 cancel: CancellationToken,
179) {
180 let running = Arc::new(AtomicBool::new(false));
181 let timeout_dur = Duration::from_secs(timeout_secs);
182 let mut handler_tasks = JoinSet::new();
183
184 let mut next_tick = match schedule.next_tick(Utc::now()) {
185 Some(t) => t,
186 None => {
187 tracing::error!(cron_job = %name, "cron expression has no future occurrence; stopping");
188 return;
189 }
190 };
191
192 loop {
193 let sleep_duration = (next_tick - Utc::now()).to_std().unwrap_or(Duration::ZERO);
194
195 tokio::select! {
196 _ = cancel.cancelled() => break,
197 _ = tokio::time::sleep(sleep_duration) => {
198 while handler_tasks.try_join_next().is_some() {}
200
201 if running.load(Ordering::SeqCst) {
203 tracing::warn!(cron_job = %name, "skipping tick, previous run still active");
204 next_tick = match schedule.next_tick(Utc::now()) {
205 Some(t) => t,
206 None => {
207 tracing::error!(cron_job = %name, "cron expression has no future occurrence; stopping");
208 break;
209 }
210 };
211 continue;
212 }
213
214 running.store(true, Ordering::SeqCst);
215
216 let deadline = tokio::time::Instant::now() + timeout_dur;
217
218 let ctx = CronContext {
219 registry: registry.clone(),
220 meta: Meta {
221 name: name.clone(),
222 deadline: Some(deadline),
223 tick: next_tick,
224 },
225 };
226
227 let running_flag = running.clone();
228 let handler_clone = handler.clone();
229 let job_name = name.clone();
230 handler_tasks.spawn(async move {
231 let result =
232 tokio::time::timeout(timeout_dur, (handler_clone)(ctx)).await;
233
234 match result {
235 Ok(Ok(())) => {
236 tracing::debug!(cron_job = %job_name, "completed");
237 }
238 Ok(Err(e)) => {
239 tracing::error!(cron_job = %job_name, error = %e, "failed");
240 }
241 Err(_) => {
242 tracing::error!(cron_job = %job_name, "timed out");
243 }
244 }
245
246 running_flag.store(false, Ordering::SeqCst);
247 });
248
249 next_tick = match schedule.next_tick(Utc::now()) {
250 Some(t) => t,
251 None => {
252 tracing::error!(cron_job = %name, "cron expression has no future occurrence; stopping");
253 break;
254 }
255 };
256 }
257 }
258 }
259
260 while handler_tasks.join_next().await.is_some() {}
262}