extern crate chrono;
extern crate cron;
#[macro_use]
extern crate log;
use chrono::{DateTime, Duration, Utc};
use lazy_static::lazy_static;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{
atomic::{AtomicBool, Ordering},
mpsc, Arc, Mutex,
};
use std::thread::{Builder, JoinHandle};
pub use cron::Schedule;
lazy_static! {
pub static ref TRACKER: Mutex<Tracker> = Mutex::new(Tracker::new());
}
pub trait Job: Send + Sync {
fn is_active(&self) -> bool {
true
}
fn allow_parallel_runs(&self) -> bool {
false
}
fn schedule(&self) -> Schedule;
fn handle(&self);
fn should_run(&self) -> bool {
if self.is_active() {
for item in self.schedule().upcoming(Utc).take(1) {
let now = Utc::now();
let difference = item - now;
if difference <= Duration::milliseconds(100) {
return true;
}
}
}
false
}
fn now(&self) -> DateTime<Utc> {
Utc::now()
}
}
pub struct Tracker(Vec<usize>);
impl Default for Tracker {
fn default() -> Self {
Self::new()
}
}
impl Tracker {
pub fn new() -> Self {
Tracker(vec![])
}
pub fn running(&self, id: &usize) -> bool {
self.0.contains(id)
}
pub fn start(&mut self, id: &usize) -> usize {
if !self.running(id) {
self.0.push(*id);
}
self.0.len()
}
pub fn stop(&mut self, id: &usize) -> usize {
if self.running(id) {
match self.0.iter().position(|&r| r == *id) {
Some(i) => self.0.remove(i),
None => 0,
};
}
self.0.len()
}
}
pub struct Runner {
jobs: Vec<Arc<Box<dyn Job>>>,
thread: Option<JoinHandle<()>>,
running: bool,
tx: Option<mpsc::Sender<Result<(), ()>>>,
working: Arc<AtomicBool>,
}
impl Default for Runner {
fn default() -> Self {
Self::new()
}
}
impl Runner {
pub fn new() -> Self {
Runner {
jobs: vec![],
thread: None,
running: false,
tx: None,
working: Arc::new(AtomicBool::new(false)),
}
}
#[allow(clippy::should_implement_trait)]
pub fn add(mut self, job: Box<dyn Job>) -> Self {
if self.running {
panic!("Cannot push job onto runner once the runner is started!");
}
self.jobs.push(Arc::new(job));
self
}
pub fn jobs_to_run(&self) -> usize {
self.jobs.len()
}
pub fn run(self) -> Self {
if self.jobs.is_empty() {
return self;
}
let working = Arc::new(AtomicBool::new(false));
let (thread, tx) = spawn(self, working.clone());
Self {
thread,
jobs: vec![],
running: true,
tx,
working,
}
}
pub fn stop(mut self) {
if !self.running {
return;
}
if let Some(thread) = self.thread.take() {
if let Some(tx) = self.tx {
match tx.send(Ok(())) {
Ok(_) => (),
Err(e) => error!("Could not send stop signal to cron runner thread: {}", e),
};
}
thread
.join()
.expect("Could not stop the spawned cron runner thread");
}
}
pub fn is_running(&self) -> bool {
self.running
}
pub fn is_working(&self) -> bool {
self.working.load(Ordering::Relaxed)
}
}
type TxRx = (Sender<Result<(), ()>>, Receiver<Result<(), ()>>);
type JhTx = (Option<JoinHandle<()>>, Option<Sender<Result<(), ()>>>);
fn spawn(runner: Runner, working: Arc<AtomicBool>) -> JhTx {
let (tx, rx): TxRx = mpsc::channel();
match Builder::new()
.name(String::from("cron-runner-thread"))
.spawn(move || {
let jobs = runner.jobs;
loop {
if rx.try_recv().is_ok() {
info!("Stopping the cron runner thread");
break;
}
for (id, job) in jobs.iter().enumerate() {
let no = (id + 1).to_string();
let _job = job.clone();
if _job.should_run()
&& (_job.allow_parallel_runs() || !TRACKER.lock().unwrap().running(&id))
{
TRACKER.lock().unwrap().start(&id);
let working_clone = working.clone();
match Builder::new()
.name(format!("cron-job-thread-{}", &no))
.spawn(move || {
let now = Utc::now();
debug!(
"START: {} --- {}",
format!("cron-job-thread-{}", &no),
now.format("%H:%M:%S%.f")
);
working_clone.store(true, Ordering::Relaxed);
_job.handle();
working_clone.store(
TRACKER.lock().unwrap().stop(&id) != 0,
Ordering::Relaxed,
);
debug!(
"FINISH: {} --- {}",
format!("cron-job-thread-{}", &no),
now.format("%H:%M:%S%.f")
);
}) {
Ok(_) => (),
Err(_) => {
let no = (id + 1).to_string();
let now = Utc::now();
debug!("START: N/A-{} --- {}", &no, now.format("%H:%M:%S%.f"));
working.store(true, Ordering::Relaxed);
job.handle();
working.store(
TRACKER.lock().unwrap().stop(&id) != 0,
Ordering::Relaxed,
);
debug!("FINISH: N/A-{} --- {}", &no, now.format("%H:%M:%S%.f"));
}
};
}
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
}) {
Ok(jh) => (Some(jh), Some(tx)),
Err(e) => {
error!("Could not start the cron runner thread: {}", e);
(None, None)
}
}
}
#[cfg(test)]
mod tests {
use super::{Job, Runner};
use cron::Schedule;
use std::str::FromStr;
struct SomeJob;
impl Job for SomeJob {
fn schedule(&self) -> Schedule {
Schedule::from_str("0 * * * * *").unwrap()
}
fn handle(&self) {}
}
struct AnotherJob;
impl Job for AnotherJob {
fn schedule(&self) -> Schedule {
Schedule::from_str("0 * * * * *").unwrap()
}
fn handle(&self) {}
}
#[test]
fn create_job() {
let some_job = SomeJob;
assert_eq!(some_job.handle(), ());
}
#[test]
fn test_adding_jobs_to_runner() {
let some_job = SomeJob;
let another_job = AnotherJob;
let runner = Runner::new()
.add(Box::new(some_job))
.add(Box::new(another_job));
assert_eq!(runner.jobs_to_run(), 2);
}
#[test]
fn test_jobs_are_empty_after_runner_starts() {
let some_job = SomeJob;
let another_job = AnotherJob;
let runner = Runner::new()
.add(Box::new(some_job))
.add(Box::new(another_job))
.run();
assert_eq!(runner.jobs_to_run(), 0);
}
#[test]
fn test_stopping_the_runner() {
let some_job = SomeJob;
let another_job = AnotherJob;
let runner = Runner::new()
.add(Box::new(some_job))
.add(Box::new(another_job))
.run();
assert_eq!(runner.stop(), ());
}
}