1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
//! An async, synchronized, database-backed Rust job scheduler
//!
//! This library provides an async job runner, which can run user-defined jobs in an interval, or
//! based on a cron schedule.
//!
//! Also, the library automatically synchronizes multiple instances of the runner via PostgreSQL.
//! This is important to ensure, that a job is only run once for each interval, or schedule.
//!
//! A `Job` in this library can be created by implementing the `Job` trait. There the user can
//! define a custom `run` function, which is executed for each interval, or schedule of the job.
//!
//! This interval, as well as other relevant metadata, needs to be configured using a `JobConfig`
//! for each job.
//!
//! Then, once all your jobs are defined, you can create a `JobRunner`. This is the main mechanism
//! underlying this scheduling library. It will check, at a user-defined interval, if a job needs
//! to run, or not.
//!
//! This `JobRunner` is configured using the `RunnerConfig`, where the user can define database
//! configuration, as well as an initial delay and the interval for checking for job runs.
//!
//! Once everything is configured, you can run the `JobRunner` and, if it doesn't return an error
//! during job validation, it will run forever, scheduling and running your jobs asynchronously
//! using Tokio.
#![cfg_attr(feature = "docs", feature(doc_cfg))]
#![warn(missing_docs)]
mod config;
pub use async_trait::async_trait;
pub use config::JobConfig;
pub use config::RunnerConfig;
use futures::future::join_all;
use log::{error, info};
use std::fmt;
use std::sync::Arc;
mod db;
#[cfg(feature = "pool-mobc")]
mod pool;
type BoxedJob = Box<dyn Job + Send + Sync>;
/// The error type returned by methods in this crate
#[derive(Debug)]
pub enum Error {
/// A database error
DBError(db::DBError),
/// An error parsing the database configuration
DBConfigError(tokio_postgres::Error),
/// An error indicating an invalid job, with neither `cron`, nor `interval` set
InvalidJobError,
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
None
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::DBError(ref e) => write!(f, "db error: {}", e),
Error::DBConfigError(ref e) => write!(f, "db configuration error: {}", e),
Error::InvalidJobError => write!(
f,
"invalid job found - check if all jobs have interval or cron set"
),
}
}
}
#[async_trait]
/// A trait for implementing a woddle job
///
/// Example implementation:
///
/// ```ignore
/// use std::time::Duration;
/// use crate::{JobConfig, Job, async_trait};
///
/// #[derive(Clone)]
/// struct MyJob {
/// config: JobConfig,
/// }
///
/// #[async_trait]
/// impl Job for MyJob {
/// async fn run(&self) {
/// log::info!("running my job!");
/// }
///
/// fn get_config(&self) -> &JobConfig {
/// &self.config
/// }
/// }
///
/// fn main() {
/// let job_cfg = JobConfig::new("my_job", "someSyncKey").interval(Duration::from_secs(5));
/// let my_job = MyJob {
/// config: job_cfg,
/// };
/// }
/// ```
pub trait Job: JobClone {
/// Runs the job
///
/// This is an async function, so if you plan to do long-running, blocking operations, you
/// should spawn them on [Tokio's Blocking Threadpool](https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html).
///
/// You need the `blocking` feature to be active, for this to work.
///
/// Otherwise, you might block the scheduler threads, slowing down your whole application.
async fn run(&self);
/// Exposes the configuration of the job
fn get_config(&self) -> &JobConfig;
}
#[doc(hidden)]
pub trait JobClone {
fn box_clone(&self) -> BoxedJob;
}
impl<T> JobClone for T
where
T: 'static + Job + Clone + Send + Sync,
{
fn box_clone(&self) -> BoxedJob {
Box::new((*self).clone())
}
}
impl Clone for Box<dyn Job> {
fn clone(&self) -> Box<dyn Job> {
self.box_clone()
}
}
/// The runner, which holds the jobs and runner configuration
pub struct JobRunner {
jobs: Vec<BoxedJob>,
config: RunnerConfig,
}
impl JobRunner {
/// Creates a new runner based on the given RunnerConfig
pub fn new(config: RunnerConfig) -> Self {
Self {
config,
jobs: Vec::new(),
}
}
/// Creates a new runner based on the given RunnerConfig and vector of jobs
pub fn new_with_vec(
config: RunnerConfig,
jobs: Vec<impl Job + Send + Sync + Clone + 'static>,
) -> Self {
let mut boxed_jobs = vec![];
for j in jobs {
boxed_jobs.push(Box::new(j) as BoxedJob);
}
Self {
config,
jobs: boxed_jobs,
}
}
/// Adds a job to the Runner
pub fn add_job(mut self, job: impl Job + Send + Sync + Clone + 'static) -> Self {
self.jobs.push(Box::new(job) as BoxedJob);
self
}
/// Starts the runner
///
/// This will:
///
/// * Validate the added jobs
/// * Initialize the database state, creating the `woddle_jobs` table
/// * Announce all registered jobs with their timers
/// * Start checking and running jobs
pub async fn start(self) -> Result<(), Error> {
self.validate()?;
self.initialize().await?;
self.announce_jobs();
if let Some(initial_delay) = self.config.initial_delay {
tokio::time::sleep(initial_delay).await;
}
let mut job_interval = tokio::time::interval(self.config.check_interval);
let jobs = Arc::new(&self.jobs);
loop {
job_interval.tick().await;
self.check_and_run_jobs(jobs.clone()).await;
}
}
// Validates all jobs
fn validate(&self) -> Result<(), Error> {
for job in &self.jobs {
let cfg = job.get_config();
if cfg.interval.is_none() && cfg.cron.is_none() {
return Err(Error::InvalidJobError);
}
}
Ok(())
}
// Asserts that the woddle_jobs table is there and insert all new jobs
async fn initialize(&self) -> Result<(), Error> {
let con = db::get_con(&self.config).await.map_err(Error::DBError)?;
db::create_tables(&con).await.map_err(Error::DBError)?;
for j in self.jobs.iter() {
db::insert_job(&con, j).await.map_err(Error::DBError)?;
}
Ok(())
}
// Logs an announcement for all registered jobs
fn announce_jobs(&self) {
for job in &self.jobs {
match job.get_config().interval {
Some(interval) => {
info!(
"job '{}' with interval: {:?} registered successfully",
job.get_config().name,
interval
);
}
None => match job.get_config().cron_str {
Some(ref cron) => {
info!(
"job '{}' with cron-schedule: {:?} registered successfully",
job.get_config().name,
cron
);
}
None => unreachable!("can't get here, since running a job with neither cron, nor interval fails earlier"),
},
}
}
}
// Checks and runs, if necessary, all jobs concurrently
async fn check_and_run_jobs(&self, jobs: Arc<&Vec<BoxedJob>>) {
let job_futures = jobs
.iter()
.map(|job| {
let j = job.box_clone();
self.check_and_run_job(j)
})
.collect::<Vec<_>>();
join_all(job_futures).await;
}
// Checks and runs a single [Job](crate::Job)
//
// Connects to the database, checks if the given job should be run again and if so, sets the
// `last_run` of the job to `now()` and executes the job.
async fn check_and_run_job(&self, job: BoxedJob) -> Result<(), Error> {
let mut con = db::get_con(&self.config).await.map_err(|e| {
error!("error checking job {}, {}", job.get_config().name, e);
Error::DBError(e)
})?;
let should_run_job = db::update_job_if_ready(&mut con, &job).await.map_err(|e| {
error!("error checking job {}, {}", job.get_config().name, e);
Error::DBError(e)
})?;
if should_run_job {
tokio::spawn(async move {
job.run().await;
});
}
Ok(())
}
}