woddle/
lib.rs

1//! An async, synchronized, database-backed Rust job scheduler
2//!
3//! This library provides an async job runner, which can run user-defined jobs in an interval, or
4//! based on a cron schedule.
5//!
6//! Also, the library automatically synchronizes multiple instances of the runner via PostgreSQL.
7//! This is important to ensure, that a job is only run once for each interval, or schedule.
8//!
9//! A `Job` in this library can be created by implementing the `Job` trait. There the user can
10//! define a custom `run` function, which is executed for each interval, or schedule of the job.
11//!
12//! This interval, as well as other relevant metadata, needs to be configured using a `JobConfig`
13//! for each job.
14//!
15//! Then, once all your jobs are defined, you can create a `JobRunner`. This is the main mechanism
16//! underlying this scheduling library. It will check, at a user-defined interval, if a job needs
17//! to run, or not.
18//!
19//! This `JobRunner` is configured using the `RunnerConfig`, where the user can define database
20//! configuration, as well as an initial delay and the interval for checking for job runs.
21//!
22//! Once everything is configured, you can run the `JobRunner` and, if it doesn't return an error
23//! during job validation, it will run forever, scheduling and running your jobs asynchronously
24//! using Tokio.
25#![cfg_attr(feature = "docs", feature(doc_cfg))]
26#![warn(missing_docs)]
27mod config;
28
29pub use async_trait::async_trait;
30pub use config::JobConfig;
31pub use config::RunnerConfig;
32use futures::future::join_all;
33use log::{error, info};
34use std::fmt;
35use std::sync::Arc;
36
37mod db;
38
39#[cfg(feature = "pool-mobc")]
40mod pool;
41
42type BoxedJob = Box<dyn Job + Send + Sync>;
43
44/// The error type returned by methods in this crate
45#[derive(Debug)]
46pub enum Error {
47    /// A database error
48    DBError(db::DBError),
49    /// An error parsing the database configuration
50    DBConfigError(tokio_postgres::Error),
51    /// An error indicating an invalid job, with neither `cron`, nor `interval` set
52    InvalidJobError,
53}
54
55impl std::error::Error for Error {
56    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
57        None
58    }
59}
60
61impl fmt::Display for Error {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        match self {
64            Error::DBError(ref e) => write!(f, "db error: {}", e),
65            Error::DBConfigError(ref e) => write!(f, "db configuration error: {}", e),
66            Error::InvalidJobError => write!(
67                f,
68                "invalid job found - check if all jobs have interval or cron set"
69            ),
70        }
71    }
72}
73
74#[async_trait]
75/// A trait for implementing a woddle job
76///
77/// Example implementation:
78///
79/// ```ignore
80/// use std::time::Duration;
81/// use crate::{JobConfig, Job, async_trait};
82///
83/// #[derive(Clone)]
84/// struct MyJob {
85///     config: JobConfig,
86/// }
87///
88/// #[async_trait]
89/// impl Job for MyJob {
90///     async fn run(&self) {
91///         log::info!("running my job!");
92///     }
93///
94///     fn get_config(&self) -> &JobConfig {
95///         &self.config
96///     }
97/// }
98///
99/// fn main() {
100///     let job_cfg = JobConfig::new("my_job", "someSyncKey").interval(Duration::from_secs(5));
101
102///     let my_job = MyJob {
103///         config: job_cfg,
104///     };
105/// }
106/// ```
107pub trait Job: JobClone {
108    /// Runs the job
109    ///
110    /// This is an async function, so if you plan to do long-running, blocking operations, you
111    /// should spawn them on [Tokio's Blocking Threadpool](https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html).
112    ///
113    /// You need the `blocking` feature to be active, for this to work.
114    ///
115    /// Otherwise, you might block the scheduler threads, slowing down your whole application.
116    async fn run(&self);
117    /// Exposes the configuration of the job
118    fn get_config(&self) -> &JobConfig;
119}
120
121#[doc(hidden)]
122pub trait JobClone {
123    fn box_clone(&self) -> BoxedJob;
124}
125
126impl<T> JobClone for T
127where
128    T: 'static + Job + Clone + Send + Sync,
129{
130    fn box_clone(&self) -> BoxedJob {
131        Box::new((*self).clone())
132    }
133}
134
135impl Clone for Box<dyn Job> {
136    fn clone(&self) -> Box<dyn Job> {
137        self.box_clone()
138    }
139}
140
141/// The runner, which holds the jobs and runner configuration
142pub struct JobRunner {
143    jobs: Vec<BoxedJob>,
144    config: RunnerConfig,
145}
146
147impl JobRunner {
148    /// Creates a new runner based on the given RunnerConfig
149    pub fn new(config: RunnerConfig) -> Self {
150        Self {
151            config,
152            jobs: Vec::new(),
153        }
154    }
155
156    /// Creates a new runner based on the given RunnerConfig and vector of jobs
157    pub fn new_with_vec(
158        config: RunnerConfig,
159        jobs: Vec<impl Job + Send + Sync + Clone + 'static>,
160    ) -> Self {
161        let mut boxed_jobs = vec![];
162        for j in jobs {
163            boxed_jobs.push(Box::new(j) as BoxedJob);
164        }
165        Self {
166            config,
167            jobs: boxed_jobs,
168        }
169    }
170
171    /// Adds a job to the Runner
172    pub fn add_job(mut self, job: impl Job + Send + Sync + Clone + 'static) -> Self {
173        self.jobs.push(Box::new(job) as BoxedJob);
174        self
175    }
176
177    /// Starts the runner
178    ///
179    /// This will:
180    ///
181    /// * Validate the added jobs
182    /// * Initialize the database state, creating the `woddle_jobs` table
183    /// * Announce all registered jobs with their timers
184    /// * Start checking and running jobs
185    pub async fn start(self) -> Result<(), Error> {
186        self.validate()?;
187        self.initialize().await?;
188        self.announce_jobs();
189
190        if let Some(initial_delay) = self.config.initial_delay {
191            tokio::time::sleep(initial_delay).await;
192        }
193
194        let mut job_interval = tokio::time::interval(self.config.check_interval);
195        let jobs = Arc::new(&self.jobs);
196        loop {
197            job_interval.tick().await;
198            self.check_and_run_jobs(jobs.clone()).await;
199        }
200    }
201
202    // Validates all jobs
203    fn validate(&self) -> Result<(), Error> {
204        for job in &self.jobs {
205            let cfg = job.get_config();
206            if cfg.interval.is_none() && cfg.cron.is_none() {
207                return Err(Error::InvalidJobError);
208            }
209        }
210        Ok(())
211    }
212
213    // Asserts that the woddle_jobs table is there and insert all new jobs
214    async fn initialize(&self) -> Result<(), Error> {
215        let con = db::get_con(&self.config).await.map_err(Error::DBError)?;
216        db::create_tables(&con).await.map_err(Error::DBError)?;
217        for j in self.jobs.iter() {
218            db::insert_job(&con, j).await.map_err(Error::DBError)?;
219        }
220        Ok(())
221    }
222
223    // Logs an announcement for all registered jobs
224    fn announce_jobs(&self) {
225        for job in &self.jobs {
226            match job.get_config().interval {
227                Some(interval) => {
228                    info!(
229                        "job '{}' with interval: {:?} registered successfully",
230                        job.get_config().name,
231                        interval
232                    );
233                }
234                None => match job.get_config().cron_str {
235                    Some(ref cron) => {
236                        info!(
237                            "job '{}' with cron-schedule: {:?} registered successfully",
238                            job.get_config().name,
239                            cron
240                        );
241                    }
242                    None => unreachable!("can't get here, since running a job with neither cron, nor interval fails earlier"),
243                },
244            }
245        }
246    }
247
248    // Checks and runs, if necessary, all jobs concurrently
249    async fn check_and_run_jobs(&self, jobs: Arc<&Vec<BoxedJob>>) {
250        let job_futures = jobs
251            .iter()
252            .map(|job| {
253                let j = job.box_clone();
254                self.check_and_run_job(j)
255            })
256            .collect::<Vec<_>>();
257        join_all(job_futures).await;
258    }
259
260    // Checks and runs a single [Job](crate::Job)
261    //
262    // Connects to the database, checks if the given job should be run again and if so, sets the
263    // `last_run` of the job to `now()` and executes the job.
264    async fn check_and_run_job(&self, job: BoxedJob) -> Result<(), Error> {
265        let mut con = db::get_con(&self.config).await.map_err(|e| {
266            error!("error checking job {}, {}", job.get_config().name, e);
267            Error::DBError(e)
268        })?;
269
270        let should_run_job = db::update_job_if_ready(&mut con, &job).await.map_err(|e| {
271            error!("error checking job {}, {}", job.get_config().name, e);
272            Error::DBError(e)
273        })?;
274
275        if should_run_job {
276            tokio::spawn(async move {
277                job.run().await;
278            });
279        }
280
281        Ok(())
282    }
283}