extern crate chrono;
extern crate cron;
pub use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc};
pub use cron::Schedule;
use lazy_static::lazy_static;
use log::{debug, error, info};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::task::JoinHandle;
lazy_static! {
pub static ref TRACKER: RwLock<Tracker> = RwLock::new(Tracker::new());
}
#[async_trait]
pub trait Job: Send + Sync {
fn is_active(&self) -> bool {
true
}
fn allow_parallel_runs(&self) -> bool {
false
}
fn schedule(&self) -> Option<Schedule>;
async fn handle(&mut self);
fn should_run(&self) -> bool {
if self.is_active() {
match self.schedule() {
Some(schedule) => {
for item in schedule.upcoming(Utc).take(1) {
let difference = item - Utc::now();
if difference <= Duration::milliseconds(100) {
return true;
}
}
}
_ => (),
}
}
false
}
fn now(&self) -> DateTime<Utc> {
Utc::now()
}
}
pub struct Tracker(Vec<usize>);
impl Default for Tracker {
fn default() -> Self {
Self::new()
}
}
impl Tracker {
pub fn new() -> Self {
Tracker(vec![])
}
pub fn running(&self, id: &usize) -> bool {
self.0.contains(id)
}
pub fn start(&mut self, id: &usize) -> usize {
if !self.running(id) {
self.0.push(*id);
}
self.0.len()
}
pub fn stop(&mut self, id: &usize) -> usize {
if self.running(id) {
match self.0.iter().position(|&r| r == *id) {
Some(i) => self.0.remove(i),
None => 0,
};
}
self.0.len()
}
}
pub struct Runner {
pub jobs: Vec<Box<dyn Job>>,
pub thread: Option<JoinHandle<()>>,
pub running: bool,
pub tx: Option<UnboundedSender<Result<(), ()>>>,
pub working: Arc<AtomicBool>,
}
impl Default for Runner {
fn default() -> Self {
Self::new()
}
}
impl Runner {
pub fn new() -> Self {
Runner {
jobs: vec![],
thread: None,
running: false,
tx: None,
working: Arc::new(AtomicBool::new(false)),
}
}
#[allow(clippy::should_implement_trait)]
pub fn add(mut self, job: Box<dyn Job>) -> Self {
if !self.running {
self.jobs.push(job);
}
self
}
pub fn jobs_to_run(&self) -> usize {
self.jobs.len()
}
pub async fn run(self) -> Self {
if self.jobs.is_empty() {
return self;
}
let working = Arc::new(AtomicBool::new(false));
let (thread, tx) = spawn(self, working.clone()).await;
Self {
thread,
jobs: vec![],
running: true,
tx,
working,
}
}
pub async fn stop(&mut self) {
if !self.running {
return;
}
if let Some(thread) = self.thread.take() {
if let Some(tx) = &self.tx {
match tx.send(Ok(())) {
Ok(_) => (),
Err(e) => error!("Could not send stop signal to cron runner thread: {}", e),
};
}
thread.abort()
}
}
pub fn is_running(&self) -> bool {
self.running
}
pub fn is_working(&self) -> bool {
self.working.load(Ordering::Relaxed)
}
}
async fn spawn(
runner: Runner,
working: Arc<AtomicBool>,
) -> (
Option<JoinHandle<()>>,
Option<UnboundedSender<Result<(), ()>>>,
) {
let (tx, mut rx): (
UnboundedSender<Result<(), ()>>,
UnboundedReceiver<Result<(), ()>>,
) = unbounded_channel();
let handler = tokio::spawn(async move {
let mut jobs = runner.jobs;
loop {
if rx.try_recv().is_ok() {
info!("Stopping the cron runner thread");
break;
}
for (id, job) in jobs.iter_mut().enumerate() {
let no: String = (id + 1).to_string();
if job.should_run()
&& (job.allow_parallel_runs()
|| match TRACKER.read() {
Ok(s) => !s.running(&id),
_ => false,
})
{
match TRACKER.write() {
Ok(mut s) => {
s.start(&id);
}
_ => (),
}
let now = Utc::now();
debug!(
"START: {} --- {}",
format!("cron-job-thread-{}", no),
now.format("%H:%M:%S%.f")
);
working.store(true, Ordering::Relaxed);
job.handle().await;
working.store(
match TRACKER.write() {
Ok(mut s) => s.stop(&id) != 0,
_ => false,
},
Ordering::Relaxed,
);
debug!(
"FINISH: {} --- {}",
format!("cron-job-thread-{}", no),
now.format("%H:%M:%S%.f")
);
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
});
(Some(handler), Some(tx))
}
#[cfg(test)]
mod tests {
use super::{Job, Runner};
use async_trait::async_trait;
use cron::Schedule;
use std::str::FromStr;
struct SomeJob;
#[async_trait]
impl Job for SomeJob {
fn schedule(&self) -> Option<Schedule> {
Some(Schedule::from_str("0 * * * * *").unwrap())
}
async fn handle(&mut self) {}
}
struct AnotherJob;
#[async_trait]
impl Job for AnotherJob {
fn schedule(&self) -> Option<Schedule> {
Some(Schedule::from_str("0 * * * * *").unwrap())
}
async fn handle(&mut self) {}
}
#[tokio::test]
async fn create_job() {
let mut some_job = SomeJob;
assert_eq!(some_job.handle().await, ());
}
#[tokio::test]
async fn test_adding_jobs_to_runner() {
let some_job = SomeJob;
let another_job = AnotherJob;
let runner = Runner::new()
.add(Box::new(some_job))
.add(Box::new(another_job));
assert_eq!(runner.jobs_to_run(), 2);
}
#[tokio::test]
async fn test_jobs_are_empty_after_runner_starts() {
let some_job = SomeJob;
let another_job = AnotherJob;
let runner = Runner::new()
.add(Box::new(some_job))
.add(Box::new(another_job))
.run()
.await;
assert_eq!(runner.jobs_to_run(), 0);
}
#[tokio::test]
async fn test_stopping_the_runner() {
let some_job = SomeJob;
let another_job = AnotherJob;
let mut runner = Runner::new()
.add(Box::new(some_job))
.add(Box::new(another_job))
.run()
.await;
assert_eq!(runner.stop().await, ());
}
}