tokio-cron-scheduler 0.9.3

Schedule tasks on tokio using cron-like annotation, at an instant or repeat them at a fixed duration. Tasks can optionally be persisted using PostgreSQL or Nats.
Documentation
use crate::context::Context;
#[cfg(not(feature = "has_bytes"))]
use crate::job::job_data::JobState;
#[cfg(feature = "has_bytes")]
use crate::job::job_data_prost::JobState;
use crate::job::to_code::JobCode;
use crate::job_scheduler::JobsSchedulerLocked;
use crate::JobSchedulerError;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::RwLock;
use tracing::error;
use uuid::Uuid;

#[derive(Default)]
pub struct JobRunner {}

impl JobRunner {
    async fn listen_for_activations(
        job_code: Arc<RwLock<Box<dyn JobCode + Send + Sync>>>,
        mut rx: Receiver<Uuid>,
        tx_notify: Sender<(Uuid, JobState)>,
        job_scheduler: JobsSchedulerLocked,
    ) {
        loop {
            let val = rx.recv().await;
            if let Err(e) = val {
                error!("Error receiving {:?}", e);
                break;
            }
            let uuid = val.unwrap();
            {
                let tx = tx_notify.clone();
                tokio::spawn(async move {
                    if let Err(e) = tx.send((uuid, JobState::Started)) {
                        error!("Error sending error listening for activation {:?}", e);
                    }
                });
            }
            let mut w = job_code.write().await;
            let code = w.get(uuid).await;
            match code {
                Ok(Some(job)) => {
                    let mut job = job.write().await;
                    let v = (job)(uuid, job_scheduler.clone());
                    let tx = tx_notify.clone();
                    tokio::spawn(async move {
                        v.await;
                        if let Err(e) = tx.send((uuid, JobState::Done)) {
                            error!("Error sending spawned task {:?}", e);
                        }
                    });
                }
                _ => {
                    error!("Error getting {:?} from job code", uuid);
                    continue;
                }
            }
        }
    }

    pub fn init(
        &mut self,
        context: &Context,
        job_scheduler: JobsSchedulerLocked,
    ) -> Pin<Box<dyn Future<Output = Result<(), JobSchedulerError>> + Send>> {
        let job_code = context.job_code.clone();
        let notify_tx = context.notify_tx.clone();
        let job_activation_rx = context.job_activation_tx.subscribe();

        Box::pin(async move {
            tokio::spawn(JobRunner::listen_for_activations(
                job_code,
                job_activation_rx,
                notify_tx,
                job_scheduler,
            ));
            Ok(())
        })
    }
}