#![cfg_attr(feature = "docs", feature(doc_cfg))]
#![warn(missing_docs)]
mod config;
pub use async_trait::async_trait;
pub use config::JobConfig;
pub use config::RunnerConfig;
use futures::future::join_all;
use log::{error, info};
use std::fmt;
use std::sync::Arc;
mod db;
#[cfg(feature = "pool-mobc")]
mod pool;
type BoxedJob = Box<dyn Job + Send + Sync>;
#[derive(Debug)]
pub enum Error {
DBError(db::DBError),
DBConfigError(tokio_postgres::Error),
InvalidJobError,
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
None
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::DBError(ref e) => write!(f, "db error: {}", e),
Error::DBConfigError(ref e) => write!(f, "db configuration error: {}", e),
Error::InvalidJobError => write!(
f,
"invalid job found - check if all jobs have interval or cron set"
),
}
}
}
#[async_trait]
pub trait Job: JobClone {
async fn run(&self);
fn get_config(&self) -> &JobConfig;
}
#[doc(hidden)]
pub trait JobClone {
fn box_clone(&self) -> BoxedJob;
}
impl<T> JobClone for T
where
T: 'static + Job + Clone + Send + Sync,
{
fn box_clone(&self) -> BoxedJob {
Box::new((*self).clone())
}
}
impl Clone for Box<dyn Job> {
fn clone(&self) -> Box<dyn Job> {
self.box_clone()
}
}
pub struct JobRunner {
jobs: Vec<BoxedJob>,
config: RunnerConfig,
}
impl JobRunner {
pub fn new(config: RunnerConfig) -> Self {
Self {
config,
jobs: Vec::new(),
}
}
pub fn new_with_vec(
config: RunnerConfig,
jobs: Vec<impl Job + Send + Sync + Clone + 'static>,
) -> Self {
let mut boxed_jobs = vec![];
for j in jobs {
boxed_jobs.push(Box::new(j) as BoxedJob);
}
Self {
config,
jobs: boxed_jobs,
}
}
pub fn add_job(mut self, job: impl Job + Send + Sync + Clone + 'static) -> Self {
self.jobs.push(Box::new(job) as BoxedJob);
self
}
pub async fn start(self) -> Result<(), Error> {
self.validate()?;
self.initialize().await?;
self.announce_jobs();
if let Some(initial_delay) = self.config.initial_delay {
tokio::time::sleep(initial_delay).await;
}
let mut job_interval = tokio::time::interval(self.config.check_interval);
let jobs = Arc::new(&self.jobs);
loop {
job_interval.tick().await;
self.check_and_run_jobs(jobs.clone()).await;
}
}
fn validate(&self) -> Result<(), Error> {
for job in &self.jobs {
let cfg = job.get_config();
if cfg.interval.is_none() && cfg.cron.is_none() {
return Err(Error::InvalidJobError);
}
}
Ok(())
}
async fn initialize(&self) -> Result<(), Error> {
let con = db::get_con(&self.config).await.map_err(Error::DBError)?;
db::create_tables(&con).await.map_err(Error::DBError)?;
for j in self.jobs.iter() {
db::insert_job(&con, j).await.map_err(Error::DBError)?;
}
Ok(())
}
fn announce_jobs(&self) {
for job in &self.jobs {
match job.get_config().interval {
Some(interval) => {
info!(
"job '{}' with interval: {:?} registered successfully",
job.get_config().name,
interval
);
}
None => match job.get_config().cron_str {
Some(ref cron) => {
info!(
"job '{}' with cron-schedule: {:?} registered successfully",
job.get_config().name,
cron
);
}
None => unreachable!("can't get here, since running a job with neither cron, nor interval fails earlier"),
},
}
}
}
async fn check_and_run_jobs(&self, jobs: Arc<&Vec<BoxedJob>>) {
let job_futures = jobs
.iter()
.map(|job| {
let j = job.box_clone();
self.check_and_run_job(j)
})
.collect::<Vec<_>>();
join_all(job_futures).await;
}
async fn check_and_run_job(&self, job: BoxedJob) -> Result<(), Error> {
let mut con = db::get_con(&self.config).await.map_err(|e| {
error!("error checking job {}, {}", job.get_config().name, e);
Error::DBError(e)
})?;
let should_run_job = db::update_job_if_ready(&mut con, &job).await.map_err(|e| {
error!("error checking job {}, {}", job.get_config().name, e);
Error::DBError(e)
})?;
if should_run_job {
tokio::spawn(async move {
job.run().await;
});
}
Ok(())
}
}