cron_tab/
async_cron.rs

1//! Asynchronous cron job scheduler implementation.
2//!
3//! This module provides a tokio-based async cron scheduler that executes jobs
4//! as async tasks using the tokio runtime. Jobs run concurrently without blocking
5//! the scheduler or each other.
6
7use std::future::Future;
8use std::str::FromStr;
9use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12
13use chrono::{DateTime, TimeZone, Utc};
14use tokio::select;
15use tokio::sync::{mpsc, Mutex};
16use tokio::time as tokio_time;
17
18use crate::async_entry::{AsyncEntry, TaskWrapper};
19use crate::{Result, MAX_WAIT_SECONDS};
20
21/// An asynchronous cron job scheduler that manages and executes scheduled async jobs.
22///
23/// The `AsyncCron` struct provides an async-first approach to job scheduling using
24/// tokio's runtime. Jobs are executed as async tasks, allowing for efficient
25/// concurrent execution without blocking threads.
26///
27/// # Type Parameters
28///
29/// * `Z` - A timezone type that implements `TimeZone + Send + Sync + 'static`
30///
31/// # Async Runtime
32///
33/// This scheduler requires a tokio runtime to function. All methods are async
34/// and jobs are executed as tokio tasks.
35///
36/// # Examples
37///
38/// ```rust
39/// use chrono::Utc;
40/// use cron_tab::AsyncCron;
41///
42/// # #[tokio::main]
43/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
44/// let mut cron = AsyncCron::new(Utc);
45/// 
46/// let job_id = cron.add_fn("*/5 * * * * * *", || async {
47///     println!("This async job runs every 5 seconds");
48///     // Can perform async operations here
49///     tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
50/// }).await?;
51///
52/// cron.start().await;
53/// // Jobs will now execute according to their schedule
54/// 
55/// // Later, you can stop the scheduler
56/// cron.stop().await;
57/// # Ok(())
58/// # }
59/// ```
60#[derive(Clone, Debug)]
61pub struct AsyncCron<Z>
62where
63    Z: TimeZone + Send + Sync + 'static,
64    Z::Offset: Send,
65{
66    /// A thread-safe, asynchronous list of job entries (schedules and tasks).
67    entries: Arc<Mutex<Vec<AsyncEntry<Z>>>>,
68
69    /// A counter for assigning unique IDs to job entries.
70    next_id: Arc<AtomicUsize>,
71
72    /// Indicates whether the cron is currently running.
73    running: Arc<AtomicBool>,
74
75    /// The timezone used for scheduling tasks.
76    tz: Z,
77
78    /// A channel sender for adding new entries to the cron scheduler.
79    add_tx: Arc<Mutex<Option<mpsc::UnboundedSender<AsyncEntry<Z>>>>>,
80
81    /// A channel sender for removing entries from the cron scheduler.
82    remove_tx: Arc<Mutex<Option<mpsc::UnboundedSender<usize>>>>,
83
84    /// A channel sender for stopping the cron scheduler.
85    stop_tx: Arc<Mutex<Option<mpsc::UnboundedSender<bool>>>>,
86}
87
88/// Implementation of the asynchronous cron scheduler.
89impl<Z> AsyncCron<Z>
90where
91    Z: TimeZone + Send + Sync + 'static,
92    Z::Offset: Send,
93{
94    /// Adds an async function to be executed according to the specified cron schedule.
95    ///
96    /// The function should return a Future that will be awaited when the job executes.
97    /// This allows for true asynchronous job execution without blocking threads.
98    ///
99    /// # Arguments
100    ///
101    /// * `spec` - A cron expression string in the format "sec min hour day month weekday year"
102    /// * `f` - A function that returns a Future implementing `Future<Output = ()> + Send + 'static`
103    ///
104    /// # Returns
105    ///
106    /// Returns a `Result<usize, CronError>` where the `usize` is a unique job ID
107    /// that can be used with [`remove`](Self::remove) to cancel the job.
108    ///
109    /// # Errors
110    ///
111    /// Returns [`CronError::ParseError`](crate::CronError::ParseError) if the cron expression is invalid.
112    ///
113    /// # Examples
114    ///
115    /// ```rust
116    /// use chrono::Utc;
117    /// use cron_tab::AsyncCron;
118    /// use std::sync::Arc;
119    /// use tokio::sync::Mutex;
120    ///
121    /// # #[tokio::main]
122    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
123    /// let mut cron = AsyncCron::new(Utc);
124    ///
125    /// // Simple async job
126    /// let job_id = cron.add_fn("*/10 * * * * * *", || async {
127    ///     println!("Async job executed!");
128    ///     tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
129    /// }).await?;
130    ///
131    /// // Job with shared state
132    /// let counter = Arc::new(Mutex::new(0));
133    /// let counter_clone = counter.clone();
134    /// cron.add_fn("* * * * * * *", move || {
135    ///     let counter = counter_clone.clone();
136    ///     async move {
137    ///         let mut count = counter.lock().await;
138    ///         *count += 1;
139    ///         println!("Count: {}", *count);
140    ///     }
141    /// }).await?;
142    /// # Ok(())
143    /// # }
144    /// ```
145    pub async fn add_fn<F, T>(&mut self, spec: &str, f: F) -> Result<usize>
146    where
147        F: 'static + Fn() -> T + Send + Sync,
148        T: 'static + Future<Output = ()> + Send,
149    {
150        let schedule = cron::Schedule::from_str(spec)?;
151        self.schedule(schedule, f).await
152    }
153
154    /// Returns a clone of the current timezone.
155    fn get_timezone(&self) -> Z {
156        self.tz.clone()
157    }
158
159    /// Creates a new async cron scheduler with the specified timezone.
160    ///
161    /// The scheduler is created in a stopped state. Call [`start`](Self::start) 
162    /// to begin executing scheduled jobs.
163    ///
164    /// # Arguments
165    ///
166    /// * `tz` - The timezone to use for all scheduling calculations
167    ///
168    /// # Examples
169    ///
170    /// ```rust
171    /// use chrono::{Utc, FixedOffset};
172    /// use cron_tab::AsyncCron;
173    ///
174    /// // UTC timezone
175    /// let cron_utc = AsyncCron::new(Utc);
176    ///
177    /// // Fixed offset timezone (Tokyo: UTC+9)
178    /// let tokyo_tz = FixedOffset::east_opt(9 * 3600).unwrap();
179    /// let cron_tokyo = AsyncCron::new(tokyo_tz);
180    /// ```
181    pub fn new(tz: Z) -> AsyncCron<Z> {
182        AsyncCron {
183            entries: Arc::new(Mutex::new(Vec::new())),
184            next_id: Arc::new(AtomicUsize::new(0)),
185            running: Arc::new(AtomicBool::new(false)),
186            tz,
187            add_tx: Default::default(),
188            remove_tx: Default::default(),
189            stop_tx: Default::default(),
190        }
191    }
192
193    /// Returns the current time in the scheduler's timezone.
194    fn now(&self) -> DateTime<Z> {
195        self.get_timezone()
196            .from_utc_datetime(&Utc::now().naive_utc())
197    }
198
199    /// Removes a job from the scheduler.
200    ///
201    /// Once removed, the job will no longer be executed. If the job ID doesn't exist,
202    /// this method does nothing. If the scheduler is running, the removal is handled
203    /// asynchronously via the scheduler's event loop.
204    ///
205    /// # Arguments
206    ///
207    /// * `id` - The job ID returned by [`add_fn`](Self::add_fn)
208    ///
209    /// # Examples
210    ///
211    /// ```rust
212    /// use chrono::Utc;
213    /// use cron_tab::AsyncCron;
214    ///
215    /// # #[tokio::main]
216    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
217    /// let mut cron = AsyncCron::new(Utc);
218    /// let job_id = cron.add_fn("* * * * * * *", || async {
219    ///     println!("This will be removed");
220    /// }).await?;
221    ///
222    /// cron.start().await;
223    /// 
224    /// // Later, remove the job
225    /// cron.remove(job_id).await;
226    /// # Ok(())
227    /// # }
228    /// ```
229    pub async fn remove(&self, id: usize) {
230        if self.running.load(Ordering::SeqCst) {
231            let guard = self.remove_tx.lock().await;
232            if let Some(tx) = guard.as_ref() {
233                let _ = tx.send(id);
234            }
235            return;
236        }
237
238        self.remove_entry(id).await;
239    }
240
241    /// Internal method to remove a job entry by ID.
242    ///
243    /// This method acquires a lock on the entries vector and removes the job
244    /// with the matching ID if it exists.
245    async fn remove_entry(&self, id: usize) {
246        let mut entries = self.entries.lock().await;
247        if let Some(index) = entries.iter().position(|e| e.id == id) {
248            entries.remove(index);
249        }
250    }
251
252    /// Runs the scheduler in the current task (blocking).
253    ///
254    /// This method runs the main scheduler loop in the current async context,
255    /// blocking until [`stop`](Self::stop) is called. This is useful when you want
256    /// to run the scheduler as the main task of your application.
257    ///
258    /// # Behavior
259    ///
260    /// The scheduler will:
261    /// 1. Set up communication channels for job management
262    /// 2. Calculate the next execution time for all jobs
263    /// 3. Sleep until the next job is due
264    /// 4. Execute all due jobs as async tasks
265    /// 5. Handle job additions/removals during runtime
266    /// 6. Repeat until stopped
267    ///
268    /// # Examples
269    ///
270    /// ```rust,no_run
271    /// use chrono::Utc;
272    /// use cron_tab::AsyncCron;
273    ///
274    /// #[tokio::main]
275    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
276    ///     let mut cron = AsyncCron::new(Utc);
277    ///     cron.add_fn("0 0 * * * * *", || async {
278    ///         println!("Top of the hour!");
279    ///     }).await?;
280    ///
281    ///     // This will block until stop() is called from another task
282    ///     cron.start_blocking().await;
283    ///     Ok(())
284    /// }
285    /// ```
286    pub async fn start_blocking(&mut self) {
287        // Channels for communicating with the cron loop (adding/removing/stopping jobs).
288        let (add_tx, mut add_rx) = mpsc::unbounded_channel();
289        let (remove_tx, mut remove_rx) = mpsc::unbounded_channel();
290        let (stop_tx, mut stop_rx) = mpsc::unbounded_channel();
291
292        {
293            *self.add_tx.lock().await = Some(add_tx);
294            *self.remove_tx.lock().await = Some(remove_tx);
295            *self.stop_tx.lock().await = Some(stop_tx);
296        }
297
298        // Initialize the next scheduled time for all entries.
299        for entry in self.entries.lock().await.iter_mut() {
300            entry.next = entry.get_next(self.get_timezone());
301        }
302
303        // Set a default long wait duration for sleeping.
304        let mut wait_duration = Duration::from_secs(MAX_WAIT_SECONDS);
305
306        loop {
307            // Lock and sort entries to prioritize the closest scheduled job.
308            let mut entries = self.entries.lock().await;
309            entries.sort_by(|b, a| b.next.cmp(&a.next));
310
311            // Determine the wait duration based on the next scheduled job.
312            if let Some(entry) = entries.first() {
313                // Calculate wait time until the next job execution
314                let wait_milis = (entry.next.as_ref().unwrap().timestamp_millis() as u64)
315                    .saturating_sub(self.now().timestamp_millis() as u64);
316
317                wait_duration = Duration::from_millis(wait_milis);
318            }
319
320            // Release the lock before waiting
321            drop(entries);
322
323            // Use `select!` to handle multiple asynchronous operations concurrently.
324            select! {
325                // Timer expired - check for jobs to execute
326                _ = tokio_time::sleep(wait_duration) => {
327                    let now = self.now();
328                    for entry in self.entries.lock().await.iter_mut() {
329                        // Stop when we reach jobs that aren't due yet
330                        if entry.next.as_ref().unwrap().gt(&now) {
331                            break;
332                        }
333
334                        // Spawn the job to run asynchronously as a tokio task
335                        let run = entry.run.clone();
336                        tokio::spawn(async move {
337                            run.as_ref().get_pinned().await;
338                        });
339
340                        // Schedule the next run of the job.
341                        entry.next = entry.get_next(self.get_timezone());
342                    }
343                },
344                // New job added while running
345                 new_entry = add_rx.recv() => {
346                    let mut entry = new_entry.unwrap();
347                    entry.next = entry.get_next(self.get_timezone());
348                    self.entries.lock().await.push(entry);
349                },
350                // Job removal requested
351                 id = remove_rx.recv() => {
352                    self.remove_entry(id.unwrap()).await;
353                },
354                // Stop signal received
355                _ = stop_rx.recv() => {
356                    return;
357                },
358            }
359        }
360    }
361
362    /// Internal method to schedule a job with a parsed cron schedule.
363    ///
364    /// This method generates a unique ID for the job and adds it to the scheduler.
365    /// If the scheduler is running, the job is sent via the add channel, otherwise
366    /// it's added directly to the entries list.
367    async fn schedule<F, T>(&mut self, schedule: cron::Schedule, f: F) -> Result<usize>
368    where
369        F: 'static + Fn() -> T + Send + Sync,
370        T: 'static + Future<Output = ()> + Send,
371    {
372        let next_id = self.next_id.fetch_add(1, Ordering::SeqCst);
373
374        let mut entry = AsyncEntry {
375            id: next_id,
376            schedule,
377            next: None,
378            run: Arc::new(TaskWrapper::new(f)),
379        };
380
381        // Determine the next scheduled time for the job.
382        entry.next = entry.get_next(self.get_timezone());
383
384        // If the cron is running, send the entry via the channel; otherwise, add it directly.
385        match self.add_tx.lock().await.as_ref() {
386            Some(tx) if self.running.load(Ordering::SeqCst) => tx.send(entry).unwrap(),
387            _ => self.entries.lock().await.push(entry),
388        }
389
390        Ok(next_id)
391    }
392
393    /// Sets the timezone for the scheduler.
394    ///
395    /// This affects how cron expressions are interpreted for all future job executions.
396    /// Existing jobs will use the new timezone for their next scheduled execution.
397    ///
398    /// # Arguments
399    ///
400    /// * `tz` - The new timezone to use
401    ///
402    /// # Examples
403    ///
404    /// ```rust
405    /// use chrono::{Utc, FixedOffset};
406    /// use cron_tab::AsyncCron;
407    ///
408    /// # #[tokio::main]
409    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
410    /// // Create async cron with UTC timezone
411    /// let mut cron_utc = AsyncCron::new(Utc);
412    ///
413    /// // Create a separate async cron with Tokyo timezone  
414    /// let tokyo_tz = FixedOffset::east_opt(9 * 3600).unwrap();
415    /// let mut cron_tokyo = AsyncCron::new(tokyo_tz);
416    ///
417    /// // Each scheduler uses its own timezone for job scheduling
418    /// cron_utc.add_fn("0 0 12 * * * *", || async {
419    ///     println!("Noon UTC");
420    /// }).await?;
421    ///
422    /// cron_tokyo.add_fn("0 0 12 * * * *", || async {
423    ///     println!("Noon Tokyo time");
424    /// }).await?;
425    /// # Ok(())
426    /// # }
427    /// ```
428    pub fn set_timezone(&mut self, tz: Z) {
429        self.tz = tz;
430    }
431
432    /// Starts the cron scheduler in a background task.
433    ///
434    /// This method spawns a new tokio task that will continuously monitor for jobs
435    /// that need to be executed and spawn additional tasks to run them.
436    /// The method returns immediately, allowing your program to continue.
437    ///
438    /// # Async Runtime
439    ///
440    /// The scheduler runs as a tokio task and spawns additional tasks for
441    /// each job execution. This ensures that long-running async jobs don't block
442    /// the scheduler or other jobs.
443    ///
444    /// # Examples
445    ///
446    /// ```rust
447    /// use chrono::Utc;
448    /// use cron_tab::AsyncCron;
449    ///
450    /// # #[tokio::main]
451    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
452    /// let mut cron = AsyncCron::new(Utc);
453    /// cron.add_fn("*/2 * * * * * *", || async {
454    ///     println!("Job executed every 2 seconds");
455    ///     // Can perform async operations here
456    ///     tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
457    /// }).await?;
458    ///
459    /// // Start the scheduler
460    /// cron.start().await;
461    ///
462    /// // The current task can continue with other work
463    /// tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
464    ///
465    /// // Stop the scheduler
466    /// cron.stop().await;
467    /// # Ok(())
468    /// # }
469    /// ```
470    pub async fn start(&mut self) {
471        let mut cloned = self.clone();
472        self.running.store(true, Ordering::SeqCst);
473        tokio::spawn(async move {
474            cloned.start_blocking().await;
475        });
476    }
477
478    /// Stops the cron scheduler.
479    ///
480    /// This sends a stop signal to the scheduler task, causing it to exit gracefully.
481    /// Any currently executing async jobs will continue to completion, but no new jobs
482    /// will be started.
483    ///
484    /// # Examples
485    ///
486    /// ```rust
487    /// use chrono::Utc;
488    /// use cron_tab::AsyncCron;
489    ///
490    /// # #[tokio::main]
491    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
492    /// let mut cron = AsyncCron::new(Utc);
493    /// cron.add_fn("* * * * * * *", || async {
494    ///     println!("Hello async world!");
495    /// }).await?;
496    /// cron.start().await;
497    ///
498    /// // Later, stop the scheduler
499    /// cron.stop().await;
500    /// # Ok(())
501    /// # }
502    /// ```
503    pub async fn stop(&self) {
504        self.running.store(false, Ordering::SeqCst);
505        if let Some(tx) = self.stop_tx.lock().await.as_ref() {
506            let _ = tx.send(true);
507        }
508    }
509}