woddle/lib.rs
1//! An async, synchronized, database-backed Rust job scheduler
2//!
3//! This library provides an async job runner, which can run user-defined jobs in an interval, or
4//! based on a cron schedule.
5//!
6//! Also, the library automatically synchronizes multiple instances of the runner via PostgreSQL.
7//! This is important to ensure, that a job is only run once for each interval, or schedule.
8//!
9//! A `Job` in this library can be created by implementing the `Job` trait. There the user can
10//! define a custom `run` function, which is executed for each interval, or schedule of the job.
11//!
12//! This interval, as well as other relevant metadata, needs to be configured using a `JobConfig`
13//! for each job.
14//!
15//! Then, once all your jobs are defined, you can create a `JobRunner`. This is the main mechanism
16//! underlying this scheduling library. It will check, at a user-defined interval, if a job needs
17//! to run, or not.
18//!
19//! This `JobRunner` is configured using the `RunnerConfig`, where the user can define database
20//! configuration, as well as an initial delay and the interval for checking for job runs.
21//!
22//! Once everything is configured, you can run the `JobRunner` and, if it doesn't return an error
23//! during job validation, it will run forever, scheduling and running your jobs asynchronously
24//! using Tokio.
25#![cfg_attr(feature = "docs", feature(doc_cfg))]
26#![warn(missing_docs)]
27mod config;
28
29pub use async_trait::async_trait;
30pub use config::JobConfig;
31pub use config::RunnerConfig;
32use futures::future::join_all;
33use log::{error, info};
34use std::fmt;
35use std::sync::Arc;
36
37mod db;
38
39#[cfg(feature = "pool-mobc")]
40mod pool;
41
42type BoxedJob = Box<dyn Job + Send + Sync>;
43
44/// The error type returned by methods in this crate
45#[derive(Debug)]
46pub enum Error {
47 /// A database error
48 DBError(db::DBError),
49 /// An error parsing the database configuration
50 DBConfigError(tokio_postgres::Error),
51 /// An error indicating an invalid job, with neither `cron`, nor `interval` set
52 InvalidJobError,
53}
54
55impl std::error::Error for Error {
56 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
57 None
58 }
59}
60
61impl fmt::Display for Error {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 match self {
64 Error::DBError(ref e) => write!(f, "db error: {}", e),
65 Error::DBConfigError(ref e) => write!(f, "db configuration error: {}", e),
66 Error::InvalidJobError => write!(
67 f,
68 "invalid job found - check if all jobs have interval or cron set"
69 ),
70 }
71 }
72}
73
74#[async_trait]
75/// A trait for implementing a woddle job
76///
77/// Example implementation:
78///
79/// ```ignore
80/// use std::time::Duration;
81/// use crate::{JobConfig, Job, async_trait};
82///
83/// #[derive(Clone)]
84/// struct MyJob {
85/// config: JobConfig,
86/// }
87///
88/// #[async_trait]
89/// impl Job for MyJob {
90/// async fn run(&self) {
91/// log::info!("running my job!");
92/// }
93///
94/// fn get_config(&self) -> &JobConfig {
95/// &self.config
96/// }
97/// }
98///
99/// fn main() {
100/// let job_cfg = JobConfig::new("my_job", "someSyncKey").interval(Duration::from_secs(5));
101
102/// let my_job = MyJob {
103/// config: job_cfg,
104/// };
105/// }
106/// ```
107pub trait Job: JobClone {
108 /// Runs the job
109 ///
110 /// This is an async function, so if you plan to do long-running, blocking operations, you
111 /// should spawn them on [Tokio's Blocking Threadpool](https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html).
112 ///
113 /// You need the `blocking` feature to be active, for this to work.
114 ///
115 /// Otherwise, you might block the scheduler threads, slowing down your whole application.
116 async fn run(&self);
117 /// Exposes the configuration of the job
118 fn get_config(&self) -> &JobConfig;
119}
120
121#[doc(hidden)]
122pub trait JobClone {
123 fn box_clone(&self) -> BoxedJob;
124}
125
126impl<T> JobClone for T
127where
128 T: 'static + Job + Clone + Send + Sync,
129{
130 fn box_clone(&self) -> BoxedJob {
131 Box::new((*self).clone())
132 }
133}
134
135impl Clone for Box<dyn Job> {
136 fn clone(&self) -> Box<dyn Job> {
137 self.box_clone()
138 }
139}
140
141/// The runner, which holds the jobs and runner configuration
142pub struct JobRunner {
143 jobs: Vec<BoxedJob>,
144 config: RunnerConfig,
145}
146
147impl JobRunner {
148 /// Creates a new runner based on the given RunnerConfig
149 pub fn new(config: RunnerConfig) -> Self {
150 Self {
151 config,
152 jobs: Vec::new(),
153 }
154 }
155
156 /// Creates a new runner based on the given RunnerConfig and vector of jobs
157 pub fn new_with_vec(
158 config: RunnerConfig,
159 jobs: Vec<impl Job + Send + Sync + Clone + 'static>,
160 ) -> Self {
161 let mut boxed_jobs = vec![];
162 for j in jobs {
163 boxed_jobs.push(Box::new(j) as BoxedJob);
164 }
165 Self {
166 config,
167 jobs: boxed_jobs,
168 }
169 }
170
171 /// Adds a job to the Runner
172 pub fn add_job(mut self, job: impl Job + Send + Sync + Clone + 'static) -> Self {
173 self.jobs.push(Box::new(job) as BoxedJob);
174 self
175 }
176
177 /// Starts the runner
178 ///
179 /// This will:
180 ///
181 /// * Validate the added jobs
182 /// * Initialize the database state, creating the `woddle_jobs` table
183 /// * Announce all registered jobs with their timers
184 /// * Start checking and running jobs
185 pub async fn start(self) -> Result<(), Error> {
186 self.validate()?;
187 self.initialize().await?;
188 self.announce_jobs();
189
190 if let Some(initial_delay) = self.config.initial_delay {
191 tokio::time::sleep(initial_delay).await;
192 }
193
194 let mut job_interval = tokio::time::interval(self.config.check_interval);
195 let jobs = Arc::new(&self.jobs);
196 loop {
197 job_interval.tick().await;
198 self.check_and_run_jobs(jobs.clone()).await;
199 }
200 }
201
202 // Validates all jobs
203 fn validate(&self) -> Result<(), Error> {
204 for job in &self.jobs {
205 let cfg = job.get_config();
206 if cfg.interval.is_none() && cfg.cron.is_none() {
207 return Err(Error::InvalidJobError);
208 }
209 }
210 Ok(())
211 }
212
213 // Asserts that the woddle_jobs table is there and insert all new jobs
214 async fn initialize(&self) -> Result<(), Error> {
215 let con = db::get_con(&self.config).await.map_err(Error::DBError)?;
216 db::create_tables(&con).await.map_err(Error::DBError)?;
217 for j in self.jobs.iter() {
218 db::insert_job(&con, j).await.map_err(Error::DBError)?;
219 }
220 Ok(())
221 }
222
223 // Logs an announcement for all registered jobs
224 fn announce_jobs(&self) {
225 for job in &self.jobs {
226 match job.get_config().interval {
227 Some(interval) => {
228 info!(
229 "job '{}' with interval: {:?} registered successfully",
230 job.get_config().name,
231 interval
232 );
233 }
234 None => match job.get_config().cron_str {
235 Some(ref cron) => {
236 info!(
237 "job '{}' with cron-schedule: {:?} registered successfully",
238 job.get_config().name,
239 cron
240 );
241 }
242 None => unreachable!("can't get here, since running a job with neither cron, nor interval fails earlier"),
243 },
244 }
245 }
246 }
247
248 // Checks and runs, if necessary, all jobs concurrently
249 async fn check_and_run_jobs(&self, jobs: Arc<&Vec<BoxedJob>>) {
250 let job_futures = jobs
251 .iter()
252 .map(|job| {
253 let j = job.box_clone();
254 self.check_and_run_job(j)
255 })
256 .collect::<Vec<_>>();
257 join_all(job_futures).await;
258 }
259
260 // Checks and runs a single [Job](crate::Job)
261 //
262 // Connects to the database, checks if the given job should be run again and if so, sets the
263 // `last_run` of the job to `now()` and executes the job.
264 async fn check_and_run_job(&self, job: BoxedJob) -> Result<(), Error> {
265 let mut con = db::get_con(&self.config).await.map_err(|e| {
266 error!("error checking job {}, {}", job.get_config().name, e);
267 Error::DBError(e)
268 })?;
269
270 let should_run_job = db::update_job_if_ready(&mut con, &job).await.map_err(|e| {
271 error!("error checking job {}, {}", job.get_config().name, e);
272 Error::DBError(e)
273 })?;
274
275 if should_run_job {
276 tokio::spawn(async move {
277 job.run().await;
278 });
279 }
280
281 Ok(())
282 }
283}