cron_tab/async_cron.rs
1//! Asynchronous cron job scheduler implementation.
2//!
3//! This module provides a tokio-based async cron scheduler that executes jobs
4//! as async tasks using the tokio runtime. Jobs run concurrently without blocking
5//! the scheduler or each other.
6
7use std::future::Future;
8use std::str::FromStr;
9use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12
13use chrono::{DateTime, TimeZone, Utc};
14use tokio::select;
15use tokio::sync::{mpsc, Mutex};
16use tokio::time as tokio_time;
17
18use crate::async_entry::{AsyncEntry, TaskWrapper};
19use crate::{Result, MAX_WAIT_SECONDS};
20
21/// An asynchronous cron job scheduler that manages and executes scheduled async jobs.
22///
23/// The `AsyncCron` struct provides an async-first approach to job scheduling using
24/// tokio's runtime. Jobs are executed as async tasks, allowing for efficient
25/// concurrent execution without blocking threads.
26///
27/// # Type Parameters
28///
29/// * `Z` - A timezone type that implements `TimeZone + Send + Sync + 'static`
30///
31/// # Async Runtime
32///
33/// This scheduler requires a tokio runtime to function. All methods are async
34/// and jobs are executed as tokio tasks.
35///
36/// # Examples
37///
38/// ```rust
39/// use chrono::Utc;
40/// use cron_tab::AsyncCron;
41///
42/// # #[tokio::main]
43/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
44/// let mut cron = AsyncCron::new(Utc);
45///
46/// let job_id = cron.add_fn("*/5 * * * * * *", || async {
47/// println!("This async job runs every 5 seconds");
48/// // Can perform async operations here
49/// tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
50/// }).await?;
51///
52/// cron.start().await;
53/// // Jobs will now execute according to their schedule
54///
55/// // Later, you can stop the scheduler
56/// cron.stop().await;
57/// # Ok(())
58/// # }
59/// ```
60#[derive(Clone, Debug)]
61pub struct AsyncCron<Z>
62where
63 Z: TimeZone + Send + Sync + 'static,
64 Z::Offset: Send,
65{
66 /// A thread-safe, asynchronous list of job entries (schedules and tasks).
67 entries: Arc<Mutex<Vec<AsyncEntry<Z>>>>,
68
69 /// A counter for assigning unique IDs to job entries.
70 next_id: Arc<AtomicUsize>,
71
72 /// Indicates whether the cron is currently running.
73 running: Arc<AtomicBool>,
74
75 /// The timezone used for scheduling tasks.
76 tz: Z,
77
78 /// A channel sender for adding new entries to the cron scheduler.
79 add_tx: Arc<Mutex<Option<mpsc::UnboundedSender<AsyncEntry<Z>>>>>,
80
81 /// A channel sender for removing entries from the cron scheduler.
82 remove_tx: Arc<Mutex<Option<mpsc::UnboundedSender<usize>>>>,
83
84 /// A channel sender for stopping the cron scheduler.
85 stop_tx: Arc<Mutex<Option<mpsc::UnboundedSender<bool>>>>,
86}
87
88/// Implementation of the asynchronous cron scheduler.
89impl<Z> AsyncCron<Z>
90where
91 Z: TimeZone + Send + Sync + 'static,
92 Z::Offset: Send,
93{
94 /// Adds an async function to be executed according to the specified cron schedule.
95 ///
96 /// The function should return a Future that will be awaited when the job executes.
97 /// This allows for true asynchronous job execution without blocking threads.
98 ///
99 /// # Arguments
100 ///
101 /// * `spec` - A cron expression string in the format "sec min hour day month weekday year"
102 /// * `f` - A function that returns a Future implementing `Future<Output = ()> + Send + 'static`
103 ///
104 /// # Returns
105 ///
106 /// Returns a `Result<usize, CronError>` where the `usize` is a unique job ID
107 /// that can be used with [`remove`](Self::remove) to cancel the job.
108 ///
109 /// # Errors
110 ///
111 /// Returns [`CronError::ParseError`](crate::CronError::ParseError) if the cron expression is invalid.
112 ///
113 /// # Examples
114 ///
115 /// ```rust
116 /// use chrono::Utc;
117 /// use cron_tab::AsyncCron;
118 /// use std::sync::Arc;
119 /// use tokio::sync::Mutex;
120 ///
121 /// # #[tokio::main]
122 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
123 /// let mut cron = AsyncCron::new(Utc);
124 ///
125 /// // Simple async job
126 /// let job_id = cron.add_fn("*/10 * * * * * *", || async {
127 /// println!("Async job executed!");
128 /// tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
129 /// }).await?;
130 ///
131 /// // Job with shared state
132 /// let counter = Arc::new(Mutex::new(0));
133 /// let counter_clone = counter.clone();
134 /// cron.add_fn("* * * * * * *", move || {
135 /// let counter = counter_clone.clone();
136 /// async move {
137 /// let mut count = counter.lock().await;
138 /// *count += 1;
139 /// println!("Count: {}", *count);
140 /// }
141 /// }).await?;
142 /// # Ok(())
143 /// # }
144 /// ```
145 pub async fn add_fn<F, T>(&mut self, spec: &str, f: F) -> Result<usize>
146 where
147 F: 'static + Fn() -> T + Send + Sync,
148 T: 'static + Future<Output = ()> + Send,
149 {
150 let schedule = cron::Schedule::from_str(spec)?;
151 self.schedule(schedule, f).await
152 }
153
154 /// Returns a clone of the current timezone.
155 fn get_timezone(&self) -> Z {
156 self.tz.clone()
157 }
158
159 /// Creates a new async cron scheduler with the specified timezone.
160 ///
161 /// The scheduler is created in a stopped state. Call [`start`](Self::start)
162 /// to begin executing scheduled jobs.
163 ///
164 /// # Arguments
165 ///
166 /// * `tz` - The timezone to use for all scheduling calculations
167 ///
168 /// # Examples
169 ///
170 /// ```rust
171 /// use chrono::{Utc, FixedOffset};
172 /// use cron_tab::AsyncCron;
173 ///
174 /// // UTC timezone
175 /// let cron_utc = AsyncCron::new(Utc);
176 ///
177 /// // Fixed offset timezone (Tokyo: UTC+9)
178 /// let tokyo_tz = FixedOffset::east_opt(9 * 3600).unwrap();
179 /// let cron_tokyo = AsyncCron::new(tokyo_tz);
180 /// ```
181 pub fn new(tz: Z) -> AsyncCron<Z> {
182 AsyncCron {
183 entries: Arc::new(Mutex::new(Vec::new())),
184 next_id: Arc::new(AtomicUsize::new(0)),
185 running: Arc::new(AtomicBool::new(false)),
186 tz,
187 add_tx: Default::default(),
188 remove_tx: Default::default(),
189 stop_tx: Default::default(),
190 }
191 }
192
193 /// Returns the current time in the scheduler's timezone.
194 fn now(&self) -> DateTime<Z> {
195 self.get_timezone()
196 .from_utc_datetime(&Utc::now().naive_utc())
197 }
198
199 /// Removes a job from the scheduler.
200 ///
201 /// Once removed, the job will no longer be executed. If the job ID doesn't exist,
202 /// this method does nothing. If the scheduler is running, the removal is handled
203 /// asynchronously via the scheduler's event loop.
204 ///
205 /// # Arguments
206 ///
207 /// * `id` - The job ID returned by [`add_fn`](Self::add_fn)
208 ///
209 /// # Examples
210 ///
211 /// ```rust
212 /// use chrono::Utc;
213 /// use cron_tab::AsyncCron;
214 ///
215 /// # #[tokio::main]
216 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
217 /// let mut cron = AsyncCron::new(Utc);
218 /// let job_id = cron.add_fn("* * * * * * *", || async {
219 /// println!("This will be removed");
220 /// }).await?;
221 ///
222 /// cron.start().await;
223 ///
224 /// // Later, remove the job
225 /// cron.remove(job_id).await;
226 /// # Ok(())
227 /// # }
228 /// ```
229 pub async fn remove(&self, id: usize) {
230 if self.running.load(Ordering::SeqCst) {
231 let guard = self.remove_tx.lock().await;
232 if let Some(tx) = guard.as_ref() {
233 let _ = tx.send(id);
234 }
235 return;
236 }
237
238 self.remove_entry(id).await;
239 }
240
241 /// Internal method to remove a job entry by ID.
242 ///
243 /// This method acquires a lock on the entries vector and removes the job
244 /// with the matching ID if it exists.
245 async fn remove_entry(&self, id: usize) {
246 let mut entries = self.entries.lock().await;
247 if let Some(index) = entries.iter().position(|e| e.id == id) {
248 entries.remove(index);
249 }
250 }
251
252 /// Runs the scheduler in the current task (blocking).
253 ///
254 /// This method runs the main scheduler loop in the current async context,
255 /// blocking until [`stop`](Self::stop) is called. This is useful when you want
256 /// to run the scheduler as the main task of your application.
257 ///
258 /// # Behavior
259 ///
260 /// The scheduler will:
261 /// 1. Set up communication channels for job management
262 /// 2. Calculate the next execution time for all jobs
263 /// 3. Sleep until the next job is due
264 /// 4. Execute all due jobs as async tasks
265 /// 5. Handle job additions/removals during runtime
266 /// 6. Repeat until stopped
267 ///
268 /// # Examples
269 ///
270 /// ```rust,no_run
271 /// use chrono::Utc;
272 /// use cron_tab::AsyncCron;
273 ///
274 /// #[tokio::main]
275 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
276 /// let mut cron = AsyncCron::new(Utc);
277 /// cron.add_fn("0 0 * * * * *", || async {
278 /// println!("Top of the hour!");
279 /// }).await?;
280 ///
281 /// // This will block until stop() is called from another task
282 /// cron.start_blocking().await;
283 /// Ok(())
284 /// }
285 /// ```
286 pub async fn start_blocking(&mut self) {
287 // Channels for communicating with the cron loop (adding/removing/stopping jobs).
288 let (add_tx, mut add_rx) = mpsc::unbounded_channel();
289 let (remove_tx, mut remove_rx) = mpsc::unbounded_channel();
290 let (stop_tx, mut stop_rx) = mpsc::unbounded_channel();
291
292 {
293 *self.add_tx.lock().await = Some(add_tx);
294 *self.remove_tx.lock().await = Some(remove_tx);
295 *self.stop_tx.lock().await = Some(stop_tx);
296 }
297
298 // Initialize the next scheduled time for all entries.
299 for entry in self.entries.lock().await.iter_mut() {
300 entry.next = entry.get_next(self.get_timezone());
301 }
302
303 // Set a default long wait duration for sleeping.
304 let mut wait_duration = Duration::from_secs(MAX_WAIT_SECONDS);
305
306 loop {
307 // Lock and sort entries to prioritize the closest scheduled job.
308 let mut entries = self.entries.lock().await;
309 entries.sort_by(|b, a| b.next.cmp(&a.next));
310
311 // Determine the wait duration based on the next scheduled job.
312 if let Some(entry) = entries.first() {
313 // Calculate wait time until the next job execution
314 let wait_milis = (entry.next.as_ref().unwrap().timestamp_millis() as u64)
315 .saturating_sub(self.now().timestamp_millis() as u64);
316
317 wait_duration = Duration::from_millis(wait_milis);
318 }
319
320 // Release the lock before waiting
321 drop(entries);
322
323 // Use `select!` to handle multiple asynchronous operations concurrently.
324 select! {
325 // Timer expired - check for jobs to execute
326 _ = tokio_time::sleep(wait_duration) => {
327 let now = self.now();
328 for entry in self.entries.lock().await.iter_mut() {
329 // Stop when we reach jobs that aren't due yet
330 if entry.next.as_ref().unwrap().gt(&now) {
331 break;
332 }
333
334 // Spawn the job to run asynchronously as a tokio task
335 let run = entry.run.clone();
336 tokio::spawn(async move {
337 run.as_ref().get_pinned().await;
338 });
339
340 // Schedule the next run of the job.
341 entry.next = entry.get_next(self.get_timezone());
342 }
343 },
344 // New job added while running
345 new_entry = add_rx.recv() => {
346 let mut entry = new_entry.unwrap();
347 entry.next = entry.get_next(self.get_timezone());
348 self.entries.lock().await.push(entry);
349 },
350 // Job removal requested
351 id = remove_rx.recv() => {
352 self.remove_entry(id.unwrap()).await;
353 },
354 // Stop signal received
355 _ = stop_rx.recv() => {
356 return;
357 },
358 }
359 }
360 }
361
362 /// Internal method to schedule a job with a parsed cron schedule.
363 ///
364 /// This method generates a unique ID for the job and adds it to the scheduler.
365 /// If the scheduler is running, the job is sent via the add channel, otherwise
366 /// it's added directly to the entries list.
367 async fn schedule<F, T>(&mut self, schedule: cron::Schedule, f: F) -> Result<usize>
368 where
369 F: 'static + Fn() -> T + Send + Sync,
370 T: 'static + Future<Output = ()> + Send,
371 {
372 let next_id = self.next_id.fetch_add(1, Ordering::SeqCst);
373
374 let mut entry = AsyncEntry {
375 id: next_id,
376 schedule,
377 next: None,
378 run: Arc::new(TaskWrapper::new(f)),
379 };
380
381 // Determine the next scheduled time for the job.
382 entry.next = entry.get_next(self.get_timezone());
383
384 // If the cron is running, send the entry via the channel; otherwise, add it directly.
385 match self.add_tx.lock().await.as_ref() {
386 Some(tx) if self.running.load(Ordering::SeqCst) => tx.send(entry).unwrap(),
387 _ => self.entries.lock().await.push(entry),
388 }
389
390 Ok(next_id)
391 }
392
393 /// Sets the timezone for the scheduler.
394 ///
395 /// This affects how cron expressions are interpreted for all future job executions.
396 /// Existing jobs will use the new timezone for their next scheduled execution.
397 ///
398 /// # Arguments
399 ///
400 /// * `tz` - The new timezone to use
401 ///
402 /// # Examples
403 ///
404 /// ```rust
405 /// use chrono::{Utc, FixedOffset};
406 /// use cron_tab::AsyncCron;
407 ///
408 /// # #[tokio::main]
409 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
410 /// // Create async cron with UTC timezone
411 /// let mut cron_utc = AsyncCron::new(Utc);
412 ///
413 /// // Create a separate async cron with Tokyo timezone
414 /// let tokyo_tz = FixedOffset::east_opt(9 * 3600).unwrap();
415 /// let mut cron_tokyo = AsyncCron::new(tokyo_tz);
416 ///
417 /// // Each scheduler uses its own timezone for job scheduling
418 /// cron_utc.add_fn("0 0 12 * * * *", || async {
419 /// println!("Noon UTC");
420 /// }).await?;
421 ///
422 /// cron_tokyo.add_fn("0 0 12 * * * *", || async {
423 /// println!("Noon Tokyo time");
424 /// }).await?;
425 /// # Ok(())
426 /// # }
427 /// ```
428 pub fn set_timezone(&mut self, tz: Z) {
429 self.tz = tz;
430 }
431
432 /// Starts the cron scheduler in a background task.
433 ///
434 /// This method spawns a new tokio task that will continuously monitor for jobs
435 /// that need to be executed and spawn additional tasks to run them.
436 /// The method returns immediately, allowing your program to continue.
437 ///
438 /// # Async Runtime
439 ///
440 /// The scheduler runs as a tokio task and spawns additional tasks for
441 /// each job execution. This ensures that long-running async jobs don't block
442 /// the scheduler or other jobs.
443 ///
444 /// # Examples
445 ///
446 /// ```rust
447 /// use chrono::Utc;
448 /// use cron_tab::AsyncCron;
449 ///
450 /// # #[tokio::main]
451 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
452 /// let mut cron = AsyncCron::new(Utc);
453 /// cron.add_fn("*/2 * * * * * *", || async {
454 /// println!("Job executed every 2 seconds");
455 /// // Can perform async operations here
456 /// tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
457 /// }).await?;
458 ///
459 /// // Start the scheduler
460 /// cron.start().await;
461 ///
462 /// // The current task can continue with other work
463 /// tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
464 ///
465 /// // Stop the scheduler
466 /// cron.stop().await;
467 /// # Ok(())
468 /// # }
469 /// ```
470 pub async fn start(&mut self) {
471 let mut cloned = self.clone();
472 self.running.store(true, Ordering::SeqCst);
473 tokio::spawn(async move {
474 cloned.start_blocking().await;
475 });
476 }
477
478 /// Stops the cron scheduler.
479 ///
480 /// This sends a stop signal to the scheduler task, causing it to exit gracefully.
481 /// Any currently executing async jobs will continue to completion, but no new jobs
482 /// will be started.
483 ///
484 /// # Examples
485 ///
486 /// ```rust
487 /// use chrono::Utc;
488 /// use cron_tab::AsyncCron;
489 ///
490 /// # #[tokio::main]
491 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
492 /// let mut cron = AsyncCron::new(Utc);
493 /// cron.add_fn("* * * * * * *", || async {
494 /// println!("Hello async world!");
495 /// }).await?;
496 /// cron.start().await;
497 ///
498 /// // Later, stop the scheduler
499 /// cron.stop().await;
500 /// # Ok(())
501 /// # }
502 /// ```
503 pub async fn stop(&self) {
504 self.running.store(false, Ordering::SeqCst);
505 if let Some(tx) = self.stop_tx.lock().await.as_ref() {
506 let _ = tx.send(true);
507 }
508 }
509}