1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
//! An async, synchronized, database-backed Rust job scheduler
//!
//! This library provides an async job runner, which can run user-defined jobs in an interval, or
//! based on a cron schedule.
//!
//! Also, the library automatically synchronizes multiple instances of the runner via PostgreSQL.
//! This is important to ensure, that a job is only run once for each interval, or schedule.
//!
//! A `Job` in this library can be created by implementing the `Job` trait. There the user can
//! define a custom `run` function, which is executed for each interval, or schedule of the job.
//!
//! This interval, as well as other relevant metadata, needs to be configured using a `JobConfig`
//! for each job.
//!
//! Then, once all your jobs are defined, you can create a `JobRunner`. This is the main mechanism
//! underlying this scheduling library. It will check, at a user-defined interval, if a job needs
//! to run, or not.
//!
//! This `JobRunner` is configured using the `RunnerConfig`, where the user can define database
//! configuration, as well as an initial delay and the interval for checking for job runs.
//!
//! Once everything is configured, you can run the `JobRunner` and, if it doesn't return an error
//! during job validation, it will run forever, scheduling and running your jobs asynchronously
//! using Tokio.
#![cfg_attr(feature = "docs", feature(doc_cfg))]
#![warn(missing_docs)]
mod config;

pub use async_trait::async_trait;
pub use config::JobConfig;
pub use config::RunnerConfig;
use futures::future::join_all;
use log::{error, info};
use std::fmt;
use std::sync::Arc;

mod db;

#[cfg(feature = "pool-mobc")]
mod pool;

type BoxedJob = Box<dyn Job + Send + Sync>;

/// The error type returned by methods in this crate
#[derive(Debug)]
pub enum Error {
    /// A database error
    DBError(db::DBError),
    /// An error parsing the database configuration
    DBConfigError(tokio_postgres::Error),
    /// An error indicating an invalid job, with neither `cron`, nor `interval` set
    InvalidJobError,
}

impl std::error::Error for Error {
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        None
    }
}

impl fmt::Display for Error {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Error::DBError(ref e) => write!(f, "db error: {}", e),
            Error::DBConfigError(ref e) => write!(f, "db configuration error: {}", e),
            Error::InvalidJobError => write!(
                f,
                "invalid job found - check if all jobs have interval or cron set"
            ),
        }
    }
}

#[async_trait]
/// A trait for implementing a woddle job
///
/// Example implementation:
///
/// ```ignore
/// use std::time::Duration;
/// use crate::{JobConfig, Job, async_trait};
///
/// #[derive(Clone)]
/// struct MyJob {
///     config: JobConfig,
/// }
///
/// #[async_trait]
/// impl Job for MyJob {
///     async fn run(&self) {
///         log::info!("running my job!");
///     }
///
///     fn get_config(&self) -> &JobConfig {
///         &self.config
///     }
/// }
///
/// fn main() {
///     let job_cfg = JobConfig::new("my_job", "someSyncKey").interval(Duration::from_secs(5));

///     let my_job = MyJob {
///         config: job_cfg,
///     };
/// }
/// ```
pub trait Job: JobClone {
    /// Runs the job
    ///
    /// This is an async function, so if you plan to do long-running, blocking operations, you
    /// should spawn them on [Tokio's Blocking Threadpool](https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html).
    ///
    /// You need the `blocking` feature to be active, for this to work.
    ///
    /// Otherwise, you might block the scheduler threads, slowing down your whole application.
    async fn run(&self);
    /// Exposes the configuration of the job
    fn get_config(&self) -> &JobConfig;
}

#[doc(hidden)]
pub trait JobClone {
    fn box_clone(&self) -> BoxedJob;
}

impl<T> JobClone for T
where
    T: 'static + Job + Clone + Send + Sync,
{
    fn box_clone(&self) -> BoxedJob {
        Box::new((*self).clone())
    }
}

impl Clone for Box<dyn Job> {
    fn clone(&self) -> Box<dyn Job> {
        self.box_clone()
    }
}

/// The runner, which holds the jobs and runner configuration
pub struct JobRunner {
    jobs: Vec<BoxedJob>,
    config: RunnerConfig,
}

impl JobRunner {
    /// Creates a new runner based on the given RunnerConfig
    pub fn new(config: RunnerConfig) -> Self {
        Self {
            config,
            jobs: Vec::new(),
        }
    }

    /// Creates a new runner based on the given RunnerConfig and vector of jobs
    pub fn new_with_vec(
        config: RunnerConfig,
        jobs: Vec<impl Job + Send + Sync + Clone + 'static>,
    ) -> Self {
        let mut boxed_jobs = vec![];
        for j in jobs {
            boxed_jobs.push(Box::new(j) as BoxedJob);
        }
        Self {
            config,
            jobs: boxed_jobs,
        }
    }

    /// Adds a job to the Runner
    pub fn add_job(mut self, job: impl Job + Send + Sync + Clone + 'static) -> Self {
        self.jobs.push(Box::new(job) as BoxedJob);
        self
    }

    /// Starts the runner
    ///
    /// This will:
    ///
    /// * Validate the added jobs
    /// * Initialize the database state, creating the `woddle_jobs` table
    /// * Announce all registered jobs with their timers
    /// * Start checking and running jobs
    pub async fn start(self) -> Result<(), Error> {
        self.validate()?;
        self.initialize().await?;
        self.announce_jobs();

        if let Some(initial_delay) = self.config.initial_delay {
            tokio::time::sleep(initial_delay).await;
        }

        let mut job_interval = tokio::time::interval(self.config.check_interval);
        let jobs = Arc::new(&self.jobs);
        loop {
            job_interval.tick().await;
            self.check_and_run_jobs(jobs.clone()).await;
        }
    }

    // Validates all jobs
    fn validate(&self) -> Result<(), Error> {
        for job in &self.jobs {
            let cfg = job.get_config();
            if cfg.interval.is_none() && cfg.cron.is_none() {
                return Err(Error::InvalidJobError);
            }
        }
        Ok(())
    }

    // Asserts that the woddle_jobs table is there and insert all new jobs
    async fn initialize(&self) -> Result<(), Error> {
        let con = db::get_con(&self.config).await.map_err(Error::DBError)?;
        db::create_tables(&con).await.map_err(Error::DBError)?;
        for j in self.jobs.iter() {
            db::insert_job(&con, &j).await.map_err(Error::DBError)?;
        }
        Ok(())
    }

    // Logs an announcement for all registered jobs
    fn announce_jobs(&self) {
        for job in &self.jobs {
            match job.get_config().interval {
                Some(interval) => {
                    info!(
                        "job '{}' with interval: {:?} registered successfully",
                        job.get_config().name,
                        interval
                    );
                }
                None => match job.get_config().cron_str {
                    Some(ref cron) => {
                        info!(
                            "job '{}' with cron-schedule: {:?} registered successfully",
                            job.get_config().name,
                            cron
                        );
                    }
                    None => unreachable!("can't get here, since running a job with neither cron, nor interval fails earlier"),
                },
            }
        }
    }

    // Checks and runs, if necessary, all jobs concurrently
    async fn check_and_run_jobs(&self, jobs: Arc<&Vec<BoxedJob>>) {
        let job_futures = jobs
            .iter()
            .map(|job| {
                let j = job.box_clone();
                self.check_and_run_job(j)
            })
            .collect::<Vec<_>>();
        join_all(job_futures).await;
    }

    // Checks and runs a single [Job](crate::Job)
    //
    // Connects to the database, checks if the given job should be run again and if so, sets the
    // `last_run` of the job to `now()` and executes the job.
    async fn check_and_run_job(&self, job: BoxedJob) -> Result<(), Error> {
        let mut con = db::get_con(&self.config).await.map_err(|e| {
            error!("error checking job {}, {}", job.get_config().name, e);
            Error::DBError(e)
        })?;

        let should_run_job = db::update_job_if_ready(&mut con, &job).await.map_err(|e| {
            error!("error checking job {}, {}", job.get_config().name, e);
            Error::DBError(e)
        })?;

        if should_run_job {
            tokio::spawn(async move {
                job.run().await;
            });
        }

        Ok(())
    }
}