async_job/
lib.rs

1//! # async_job: a simple async cron runner
2//!
3//! Use the `Job` trait to create your cron job struct, pass it to the `Runner` and then start it via `run()` method.
4//! Runner will spawn new async task where it will start looping through the jobs and will run their handle
5//! method once the scheduled time is reached.
6//!
7//! If your OS has enough threads to spare each job will get its own thread to execute, if not it will be
8//! executed in the same thread as the loop but will hold the loop until the job is finished.
9//!
10//! Please look at the [**`Job trait`**](./trait.Job.html) documentation for more information.
11//!
12//! ## Example
13//! ```
14//! use async_job::{Job, Runner, Schedule, async_trait};
15//! use tokio::time::Duration;
16//! use tokio;
17//!
18//! struct ExampleJob;
19//!
20//! #[async_trait]
21//! impl Job for ExampleJob {
22//!     fn schedule(&self) -> Option<Schedule> {
23//!         Some("1/5 * * * * *".parse().unwrap())
24//!     }
25//!     async fn handle(&mut self) {
26//!         println!("Hello, I am a cron job running at: {}", self.now());
27//!     }
28//! }
29//!
30//! async fn run() {
31//!     let mut runner = Runner::new();
32//!
33//!     println!("Adding ExampleJob to the Runner");
34//!     runner = runner.add(Box::new(ExampleJob));
35//!
36//!     println!("Starting the Runner for 20 seconds");
37//!     runner = runner.run().await;
38//!     tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
39//!
40//!     println!("Stopping the Runner");
41//!     runner.stop().await;
42//! }
43//!
44//! #[tokio::main]
45//! async fn main() {
46//!     run().await;
47//! }
48//! ```
49//!
50//! Output:
51//! ```shell
52//! Adding ExampleJob to the Runner
53//! Starting the Runner for 20 seconds
54//! Hello, I am a cron job running at: 2021-01-31 03:06:25.908475 UTC
55//! Hello, I am a cron job running at: 2021-01-31 03:06:30.912637 UTC
56//! Hello, I am a cron job running at: 2021-01-31 03:06:35.926938 UTC
57//! Hello, I am a cron job running at: 2021-01-31 03:06:40.962138 UTC
58//! Stopping the Runner
59//! ```
60extern crate chrono;
61extern crate cron;
62
63pub use async_trait::async_trait;
64use chrono::{DateTime, Duration, Utc};
65pub use cron::Schedule;
66use lazy_static::lazy_static;
67use log::{debug, error, info};
68use std::sync::{
69    atomic::{AtomicBool, Ordering},
70    Arc, RwLock,
71};
72use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
73use tokio::task::JoinHandle;
74
75lazy_static! {
76    /// Singleton instance of a tracker that won't allow
77    /// same job to run again while its already running
78    /// unless you specificly allow the job to run in
79    /// parallel with itself
80    pub static ref TRACKER: RwLock<Tracker> = RwLock::new(Tracker::new());
81}
82
83#[async_trait]
84/// A cron job that runs for a website.
85pub trait Job: Send + Sync {
86    /// Default implementation of is_active method will
87    /// make this job always active
88    fn is_active(&self) -> bool {
89        true
90    }
91
92    /// In case your job takes longer to finish and it's scheduled
93    /// to start again (while its still running), default behaviour
94    /// will skip the next run while one instance is already running.
95    /// (if your OS has enough threads, and is spawning a thread for next job)
96    ///
97    /// To override this behaviour and enable it to run in parallel
98    /// with other instances of self, return `true` on this instance.
99    fn allow_parallel_runs(&self) -> bool {
100        false
101    }
102
103    /// Define the run schedule for your job
104    fn schedule(&self) -> Option<Schedule>;
105
106    /// This is where your jobs magic happens, define the action that
107    /// will happen once the cron start running your job
108    ///
109    /// If this method panics, your entire job will panic and that may
110    /// or may not make the whole runner panic. Handle your errors
111    /// properly and don't let it panic.
112    async fn handle(&mut self);
113
114    /// Decide wheather or not to start running your job
115    fn should_run(&self) -> bool {
116        if self.is_active() {
117            match self.schedule() {
118                Some(schedule) => {
119                    for item in schedule.upcoming(Utc).take(1) {
120                        let difference = item - Utc::now();
121                        if difference <= Duration::milliseconds(100) {
122                            return true;
123                        }
124                    }
125                }
126                _ => (),
127            }
128        }
129
130        false
131    }
132
133    /// Simple output that will return current time so you don't have to do so
134    /// in your job if you wish to display the time of the run.
135    fn now(&self) -> DateTime<Utc> {
136        Utc::now()
137    }
138}
139
140/// Struct for marking jobs running
141pub struct Tracker(Vec<usize>);
142
143impl Default for Tracker {
144    fn default() -> Self {
145        Self::new()
146    }
147}
148
149impl Tracker {
150    /// Return new instance of running
151    pub fn new() -> Self {
152        Tracker(vec![])
153    }
154
155    /// Check if id of the job is marked as running
156    pub fn running(&self, id: &usize) -> bool {
157        self.0.contains(id)
158    }
159
160    /// Set job id as running
161    pub fn start(&mut self, id: &usize) -> usize {
162        if !self.running(id) {
163            self.0.push(*id);
164        }
165        self.0.len()
166    }
167
168    /// Unmark the job from running
169    pub fn stop(&mut self, id: &usize) -> usize {
170        if self.running(id) {
171            match self.0.iter().position(|&r| r == *id) {
172                Some(i) => self.0.remove(i),
173                None => 0,
174            };
175        }
176        self.0.len()
177    }
178}
179
180/// Runner that will hold all the jobs and will start up the execution
181/// and eventually will stop it.
182pub struct Runner {
183    /// the current jobs
184    pub jobs: Vec<Box<dyn Job>>,
185    /// the task that is running the handle
186    pub thread: Option<JoinHandle<()>>,
187    /// is the task running or not
188    pub running: bool,
189    /// channel sending message
190    pub tx: Option<UnboundedSender<Result<(), ()>>>,
191    /// tracker to determine crons working
192    pub working: Arc<AtomicBool>,
193}
194
195impl Default for Runner {
196    fn default() -> Self {
197        Self::new()
198    }
199}
200
201impl Runner {
202    /// Create new runner
203    pub fn new() -> Self {
204        Runner {
205            jobs: vec![],
206            thread: None,
207            running: false,
208            tx: None,
209            working: Arc::new(AtomicBool::new(false)),
210        }
211    }
212
213    /// Add jobs into the runner
214    ///
215    /// Does nothing if already running.
216    #[allow(clippy::should_implement_trait)]
217    pub fn add(mut self, job: Box<dyn Job>) -> Self {
218        if !self.running {
219            self.jobs.push(job);
220        }
221        self
222    }
223
224    /// Number of jobs ready to start running
225    pub fn jobs_to_run(&self) -> usize {
226        self.jobs.len()
227    }
228
229    /// Start the loop and job execution
230    pub async fn run(self) -> Self {
231        if self.jobs.is_empty() {
232            return self;
233        }
234
235        let working = Arc::new(AtomicBool::new(false));
236        let (thread, tx) = spawn(self, working.clone()).await;
237
238        Self {
239            thread,
240            jobs: vec![],
241            running: true,
242            tx,
243            working,
244        }
245    }
246
247    /// Stop the spawned runner
248    pub async fn stop(&mut self) {
249        if !self.running {
250            return;
251        }
252        if let Some(thread) = self.thread.take() {
253            if let Some(tx) = &self.tx {
254                match tx.send(Ok(())) {
255                    Ok(_) => (),
256                    Err(e) => error!("Could not send stop signal to cron runner thread: {}", e),
257                };
258            }
259            thread.abort()
260        }
261    }
262
263    /// Lets us know if the cron worker is running
264    pub fn is_running(&self) -> bool {
265        self.running
266    }
267
268    /// Lets us know if the worker is in the process of executing a job currently
269    pub fn is_working(&self) -> bool {
270        self.working.load(Ordering::Relaxed)
271    }
272}
273
274/// Spawn the thread for the runner and return its sender to stop it
275async fn spawn(
276    runner: Runner,
277    working: Arc<AtomicBool>,
278) -> (
279    Option<JoinHandle<()>>,
280    Option<UnboundedSender<Result<(), ()>>>,
281) {
282    let (tx, mut rx): (
283        UnboundedSender<Result<(), ()>>,
284        UnboundedReceiver<Result<(), ()>>,
285    ) = unbounded_channel();
286
287    let handler = tokio::spawn(async move {
288        let mut jobs = runner.jobs;
289
290        loop {
291            if rx.try_recv().is_ok() {
292                info!("Stopping the cron runner thread");
293                break;
294            }
295
296            for (id, job) in jobs.iter_mut().enumerate() {
297                let no: String = (id + 1).to_string();
298
299                if job.should_run()
300                    && (job.allow_parallel_runs()
301                        || match TRACKER.read() {
302                            Ok(s) => !s.running(&id),
303                            _ => false,
304                        })
305                {
306                    match TRACKER.write() {
307                        Ok(mut s) => {
308                            s.start(&id);
309                        }
310                        _ => (),
311                    }
312
313                    let now = Utc::now();
314                    debug!(
315                        "START: {} --- {}",
316                        format!("cron-job-thread-{}", no),
317                        now.format("%H:%M:%S%.f")
318                    );
319
320                    working.store(true, Ordering::Relaxed);
321
322                    job.handle().await;
323
324                    working.store(
325                        match TRACKER.write() {
326                            Ok(mut s) => s.stop(&id) != 0,
327                            _ => false,
328                        },
329                        Ordering::Relaxed,
330                    );
331
332                    debug!(
333                        "FINISH: {} --- {}",
334                        format!("cron-job-thread-{}", no),
335                        now.format("%H:%M:%S%.f")
336                    );
337                }
338            }
339            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
340        }
341    });
342
343    (Some(handler), Some(tx))
344}
345
346#[cfg(test)]
347mod tests {
348    use super::{Job, Runner};
349    use async_trait::async_trait;
350    use cron::Schedule;
351    use std::str::FromStr;
352    struct SomeJob;
353
354    #[async_trait]
355    impl Job for SomeJob {
356        fn schedule(&self) -> Option<Schedule> {
357            Some(Schedule::from_str("0 * * * * *").unwrap())
358        }
359
360        async fn handle(&mut self) {}
361    }
362    struct AnotherJob;
363    #[async_trait]
364    impl Job for AnotherJob {
365        fn schedule(&self) -> Option<Schedule> {
366            Some(Schedule::from_str("0 * * * * *").unwrap())
367        }
368
369        async fn handle(&mut self) {}
370    }
371    #[tokio::test]
372    async fn create_job() {
373        let mut some_job = SomeJob;
374
375        assert_eq!(some_job.handle().await, ());
376    }
377
378    #[tokio::test]
379    async fn test_adding_jobs_to_runner() {
380        let some_job = SomeJob;
381        let another_job = AnotherJob;
382
383        let runner = Runner::new()
384            .add(Box::new(some_job))
385            .add(Box::new(another_job));
386
387        assert_eq!(runner.jobs_to_run(), 2);
388    }
389
390    #[tokio::test]
391    async fn test_jobs_are_empty_after_runner_starts() {
392        let some_job = SomeJob;
393        let another_job = AnotherJob;
394
395        let runner = Runner::new()
396            .add(Box::new(some_job))
397            .add(Box::new(another_job))
398            .run()
399            .await;
400
401        assert_eq!(runner.jobs_to_run(), 0);
402    }
403
404    #[tokio::test]
405    async fn test_stopping_the_runner() {
406        let some_job = SomeJob;
407        let another_job = AnotherJob;
408
409        let mut runner = Runner::new()
410            .add(Box::new(some_job))
411            .add(Box::new(another_job))
412            .run()
413            .await;
414
415        assert_eq!(runner.stop().await, ());
416    }
417}