deschuler 0.4.1

An async, tokio based scheduling library for rust with a built-in cron builder.
Documentation
use std::sync::Arc;
use std::time::Duration;

use chrono::SubsecRound;
use croner::Cron;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tracing::error;

use crate::scheduler::job::Job;
use crate::scheduler::tokio_scheduler::config::TokioSchedulerConfig;
use crate::scheduler::Scheduler;

pub mod config;

type JobHandle = Box<dyn FnOnce() -> JoinHandle<()> + Send + Sync>;

#[derive(Debug)]
pub struct TokioScheduler {
    config: TokioSchedulerConfig,
    sender: Sender<JobHandle>,
    worker_sender: watch::Sender<bool>,
}

impl TokioScheduler {
    pub fn new(config: TokioSchedulerConfig) -> Self {
        let (sender, receiver) = channel(config.channel_size);
        let (worker_sender, worker_receiver) = watch::channel(false);

        let scheduler = Self {
            config,
            sender,
            worker_sender,
        };

        Self::start_worker(worker_receiver, receiver);

        scheduler
    }
}

impl Default for TokioScheduler {
    fn default() -> Self {
        Self::new(TokioSchedulerConfig::default())
    }
}

impl Scheduler for TokioScheduler {
    fn start(&mut self) {
        self.worker_sender.send(true).unwrap();
    }

    fn schedule_job(&mut self, cron: Cron, task: Arc<Job>) {
        let config = self.config.clone();
        let sender = self.sender.clone();
        let builder_config = config.builder_config.clone();
        tokio::spawn(async move {
            loop {
                if task.is_interrupted() {
                    break;
                }

                let now = builder_config.get_now().round_subsecs(0);
                match cron.find_next_occurrence(&now, false) {
                    Ok(next) => {
                        let _now = now.to_rfc3339();
                        let _next = next.to_rfc3339();
                        let duration = next.signed_duration_since(now).num_seconds();
                        let duration = Duration::from_secs(duration as u64);
                        sleep(duration).await;
                        let job = task.get_job();

                        if task.is_interrupted() {
                            break;
                        }

                        let job_handle = Box::new(move || {
                            tokio::spawn(async move {
                                job(now).await;
                            })
                        });

                        if let Err(err) = sender.send(job_handle).await {
                            error!("Error while sending job: {:?}", err);
                        }
                    }
                    Err(err) => {
                        error!("Error while finding next occurrence: {:?}", err);
                        break;
                    }
                }
            }
        });
    }

    fn stop(&mut self) {
        self.worker_sender.send(false).unwrap();
    }
}

impl TokioScheduler {
    fn start_worker(worker_receiver: watch::Receiver<bool>, mut receiver: Receiver<JobHandle>) {
        tokio::spawn(async move {
            loop {
                match worker_receiver.has_changed() {
                    Ok(bool) => match receiver.recv().await {
                        Some(job) => {
                            if bool {
                                Self::handle_task(job)
                            }
                        }
                        None => break,
                    },
                    Err(err) => {
                        error!("The worker has experienced an unexpected error: {:?}", err);
                        break;
                    }
                }
            }
        });
    }

    fn handle_task(job: JobHandle) {
        tokio::spawn(async move { job().await });
    }
}