async_job/lib.rs
1//! # async_job: a simple async cron runner
2//!
3//! Use the `Job` trait to create your cron job struct, pass it to the `Runner` and then start it via `run()` method.
4//! Runner will spawn new async task where it will start looping through the jobs and will run their handle
5//! method once the scheduled time is reached.
6//!
7//! If your OS has enough threads to spare each job will get its own thread to execute, if not it will be
8//! executed in the same thread as the loop but will hold the loop until the job is finished.
9//!
10//! Please look at the [**`Job trait`**](./trait.Job.html) documentation for more information.
11//!
12//! ## Example
13//! ```
14//! use async_job::{Job, Runner, Schedule, async_trait};
15//! use tokio::time::Duration;
16//! use tokio;
17//!
18//! struct ExampleJob;
19//!
20//! #[async_trait]
21//! impl Job for ExampleJob {
22//! fn schedule(&self) -> Option<Schedule> {
23//! Some("1/5 * * * * *".parse().unwrap())
24//! }
25//! async fn handle(&mut self) {
26//! println!("Hello, I am a cron job running at: {}", self.now());
27//! }
28//! }
29//!
30//! async fn run() {
31//! let mut runner = Runner::new();
32//!
33//! println!("Adding ExampleJob to the Runner");
34//! runner = runner.add(Box::new(ExampleJob));
35//!
36//! println!("Starting the Runner for 20 seconds");
37//! runner = runner.run().await;
38//! tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
39//!
40//! println!("Stopping the Runner");
41//! runner.stop().await;
42//! }
43//!
44//! #[tokio::main]
45//! async fn main() {
46//! run().await;
47//! }
48//! ```
49//!
50//! Output:
51//! ```shell
52//! Adding ExampleJob to the Runner
53//! Starting the Runner for 20 seconds
54//! Hello, I am a cron job running at: 2021-01-31 03:06:25.908475 UTC
55//! Hello, I am a cron job running at: 2021-01-31 03:06:30.912637 UTC
56//! Hello, I am a cron job running at: 2021-01-31 03:06:35.926938 UTC
57//! Hello, I am a cron job running at: 2021-01-31 03:06:40.962138 UTC
58//! Stopping the Runner
59//! ```
60extern crate chrono;
61extern crate cron;
62
63pub use async_trait::async_trait;
64use chrono::{DateTime, Duration, Utc};
65pub use cron::Schedule;
66use lazy_static::lazy_static;
67use log::{debug, error, info};
68use std::sync::{
69 atomic::{AtomicBool, Ordering},
70 Arc, RwLock,
71};
72use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
73use tokio::task::JoinHandle;
74
75lazy_static! {
76 /// Singleton instance of a tracker that won't allow
77 /// same job to run again while its already running
78 /// unless you specificly allow the job to run in
79 /// parallel with itself
80 pub static ref TRACKER: RwLock<Tracker> = RwLock::new(Tracker::new());
81}
82
83#[async_trait]
84/// A cron job that runs for a website.
85pub trait Job: Send + Sync {
86 /// Default implementation of is_active method will
87 /// make this job always active
88 fn is_active(&self) -> bool {
89 true
90 }
91
92 /// In case your job takes longer to finish and it's scheduled
93 /// to start again (while its still running), default behaviour
94 /// will skip the next run while one instance is already running.
95 /// (if your OS has enough threads, and is spawning a thread for next job)
96 ///
97 /// To override this behaviour and enable it to run in parallel
98 /// with other instances of self, return `true` on this instance.
99 fn allow_parallel_runs(&self) -> bool {
100 false
101 }
102
103 /// Define the run schedule for your job
104 fn schedule(&self) -> Option<Schedule>;
105
106 /// This is where your jobs magic happens, define the action that
107 /// will happen once the cron start running your job
108 ///
109 /// If this method panics, your entire job will panic and that may
110 /// or may not make the whole runner panic. Handle your errors
111 /// properly and don't let it panic.
112 async fn handle(&mut self);
113
114 /// Decide wheather or not to start running your job
115 fn should_run(&self) -> bool {
116 if self.is_active() {
117 match self.schedule() {
118 Some(schedule) => {
119 for item in schedule.upcoming(Utc).take(1) {
120 let difference = item - Utc::now();
121 if difference <= Duration::milliseconds(100) {
122 return true;
123 }
124 }
125 }
126 _ => (),
127 }
128 }
129
130 false
131 }
132
133 /// Simple output that will return current time so you don't have to do so
134 /// in your job if you wish to display the time of the run.
135 fn now(&self) -> DateTime<Utc> {
136 Utc::now()
137 }
138}
139
140/// Struct for marking jobs running
141pub struct Tracker(Vec<usize>);
142
143impl Default for Tracker {
144 fn default() -> Self {
145 Self::new()
146 }
147}
148
149impl Tracker {
150 /// Return new instance of running
151 pub fn new() -> Self {
152 Tracker(vec![])
153 }
154
155 /// Check if id of the job is marked as running
156 pub fn running(&self, id: &usize) -> bool {
157 self.0.contains(id)
158 }
159
160 /// Set job id as running
161 pub fn start(&mut self, id: &usize) -> usize {
162 if !self.running(id) {
163 self.0.push(*id);
164 }
165 self.0.len()
166 }
167
168 /// Unmark the job from running
169 pub fn stop(&mut self, id: &usize) -> usize {
170 if self.running(id) {
171 match self.0.iter().position(|&r| r == *id) {
172 Some(i) => self.0.remove(i),
173 None => 0,
174 };
175 }
176 self.0.len()
177 }
178}
179
180/// Runner that will hold all the jobs and will start up the execution
181/// and eventually will stop it.
182pub struct Runner {
183 /// the current jobs
184 pub jobs: Vec<Box<dyn Job>>,
185 /// the task that is running the handle
186 pub thread: Option<JoinHandle<()>>,
187 /// is the task running or not
188 pub running: bool,
189 /// channel sending message
190 pub tx: Option<UnboundedSender<Result<(), ()>>>,
191 /// tracker to determine crons working
192 pub working: Arc<AtomicBool>,
193}
194
195impl Default for Runner {
196 fn default() -> Self {
197 Self::new()
198 }
199}
200
201impl Runner {
202 /// Create new runner
203 pub fn new() -> Self {
204 Runner {
205 jobs: vec![],
206 thread: None,
207 running: false,
208 tx: None,
209 working: Arc::new(AtomicBool::new(false)),
210 }
211 }
212
213 /// Add jobs into the runner
214 ///
215 /// Does nothing if already running.
216 #[allow(clippy::should_implement_trait)]
217 pub fn add(mut self, job: Box<dyn Job>) -> Self {
218 if !self.running {
219 self.jobs.push(job);
220 }
221 self
222 }
223
224 /// Number of jobs ready to start running
225 pub fn jobs_to_run(&self) -> usize {
226 self.jobs.len()
227 }
228
229 /// Start the loop and job execution
230 pub async fn run(self) -> Self {
231 if self.jobs.is_empty() {
232 return self;
233 }
234
235 let working = Arc::new(AtomicBool::new(false));
236 let (thread, tx) = spawn(self, working.clone()).await;
237
238 Self {
239 thread,
240 jobs: vec![],
241 running: true,
242 tx,
243 working,
244 }
245 }
246
247 /// Stop the spawned runner
248 pub async fn stop(&mut self) {
249 if !self.running {
250 return;
251 }
252 if let Some(thread) = self.thread.take() {
253 if let Some(tx) = &self.tx {
254 match tx.send(Ok(())) {
255 Ok(_) => (),
256 Err(e) => error!("Could not send stop signal to cron runner thread: {}", e),
257 };
258 }
259 thread.abort()
260 }
261 }
262
263 /// Lets us know if the cron worker is running
264 pub fn is_running(&self) -> bool {
265 self.running
266 }
267
268 /// Lets us know if the worker is in the process of executing a job currently
269 pub fn is_working(&self) -> bool {
270 self.working.load(Ordering::Relaxed)
271 }
272}
273
274/// Spawn the thread for the runner and return its sender to stop it
275async fn spawn(
276 runner: Runner,
277 working: Arc<AtomicBool>,
278) -> (
279 Option<JoinHandle<()>>,
280 Option<UnboundedSender<Result<(), ()>>>,
281) {
282 let (tx, mut rx): (
283 UnboundedSender<Result<(), ()>>,
284 UnboundedReceiver<Result<(), ()>>,
285 ) = unbounded_channel();
286
287 let handler = tokio::spawn(async move {
288 let mut jobs = runner.jobs;
289
290 loop {
291 if rx.try_recv().is_ok() {
292 info!("Stopping the cron runner thread");
293 break;
294 }
295
296 for (id, job) in jobs.iter_mut().enumerate() {
297 let no: String = (id + 1).to_string();
298
299 if job.should_run()
300 && (job.allow_parallel_runs()
301 || match TRACKER.read() {
302 Ok(s) => !s.running(&id),
303 _ => false,
304 })
305 {
306 match TRACKER.write() {
307 Ok(mut s) => {
308 s.start(&id);
309 }
310 _ => (),
311 }
312
313 let now = Utc::now();
314 debug!(
315 "START: {} --- {}",
316 format!("cron-job-thread-{}", no),
317 now.format("%H:%M:%S%.f")
318 );
319
320 working.store(true, Ordering::Relaxed);
321
322 job.handle().await;
323
324 working.store(
325 match TRACKER.write() {
326 Ok(mut s) => s.stop(&id) != 0,
327 _ => false,
328 },
329 Ordering::Relaxed,
330 );
331
332 debug!(
333 "FINISH: {} --- {}",
334 format!("cron-job-thread-{}", no),
335 now.format("%H:%M:%S%.f")
336 );
337 }
338 }
339 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
340 }
341 });
342
343 (Some(handler), Some(tx))
344}
345
346#[cfg(test)]
347mod tests {
348 use super::{Job, Runner};
349 use async_trait::async_trait;
350 use cron::Schedule;
351 use std::str::FromStr;
352 struct SomeJob;
353
354 #[async_trait]
355 impl Job for SomeJob {
356 fn schedule(&self) -> Option<Schedule> {
357 Some(Schedule::from_str("0 * * * * *").unwrap())
358 }
359
360 async fn handle(&mut self) {}
361 }
362 struct AnotherJob;
363 #[async_trait]
364 impl Job for AnotherJob {
365 fn schedule(&self) -> Option<Schedule> {
366 Some(Schedule::from_str("0 * * * * *").unwrap())
367 }
368
369 async fn handle(&mut self) {}
370 }
371 #[tokio::test]
372 async fn create_job() {
373 let mut some_job = SomeJob;
374
375 assert_eq!(some_job.handle().await, ());
376 }
377
378 #[tokio::test]
379 async fn test_adding_jobs_to_runner() {
380 let some_job = SomeJob;
381 let another_job = AnotherJob;
382
383 let runner = Runner::new()
384 .add(Box::new(some_job))
385 .add(Box::new(another_job));
386
387 assert_eq!(runner.jobs_to_run(), 2);
388 }
389
390 #[tokio::test]
391 async fn test_jobs_are_empty_after_runner_starts() {
392 let some_job = SomeJob;
393 let another_job = AnotherJob;
394
395 let runner = Runner::new()
396 .add(Box::new(some_job))
397 .add(Box::new(another_job))
398 .run()
399 .await;
400
401 assert_eq!(runner.jobs_to_run(), 0);
402 }
403
404 #[tokio::test]
405 async fn test_stopping_the_runner() {
406 let some_job = SomeJob;
407 let another_job = AnotherJob;
408
409 let mut runner = Runner::new()
410 .add(Box::new(some_job))
411 .add(Box::new(another_job))
412 .run()
413 .await;
414
415 assert_eq!(runner.stop().await, ());
416 }
417}