rexecutor 0.1.0

A robust job processing library
Documentation
use std::{hash::Hash, marker::PhantomData, ops::Sub, time::Duration};

use chrono::{DateTime, TimeDelta, TimeZone, Utc};
use cron::Schedule;
use serde::{de::DeserializeOwned, Serialize};
use tokio_util::sync::CancellationToken;

use crate::{backend::Backend, executor::Executor, job::uniqueness_criteria::UniquenessCriteria};

pub(crate) struct CronRunner<B, E>
where
    B: Backend,
    E: Executor + 'static,
    E::Data: Send + DeserializeOwned,
    E::Metadata: Serialize + DeserializeOwned,
{
    backend: B,
    data: E::Data,
    schedule: Schedule,
    _executor: PhantomData<E>,
}

impl<B, E> CronRunner<B, E>
where
    B: Backend + Send + 'static + Sync + Clone,
    E: Executor + 'static + Sync + Send,
    E::Data: Send + Serialize + DeserializeOwned + Hash + Clone + Sync,
    E::Metadata: Serialize + DeserializeOwned + Send + Sync,
{
    pub(crate) fn new(backend: B, schedule: Schedule, data: E::Data) -> Self {
        Self {
            backend,
            schedule,
            data,
            _executor: PhantomData,
        }
    }

    pub(crate) fn spawn(
        self,
        timezone: impl TimeZone + Send + 'static,
        cancellation_token: CancellationToken,
    ) {
        tokio::spawn({
            async move {
                loop {
                    let next = self
                        .schedule
                        .upcoming(timezone.clone())
                        .next()
                        .expect("No future scheduled time for cron job")
                        .to_utc();
                    let delay = next
                        .sub(Utc::now())
                        .sub(TimeDelta::milliseconds(10))
                        .to_std()
                        .unwrap_or(Duration::ZERO);
                    tokio::select! {
                        _ = tokio::time::sleep(delay) => {
                            self.enqueue_job(next).await;
                            let delay = next - Utc::now();
                            if delay > TimeDelta::zero() {
                                tokio::time::sleep(delay.to_std().unwrap()).await;
                            }
                        },
                        _ = cancellation_token.cancelled() => {
                            tracing::debug!("Shutting down cron scheduler for {}", E::NAME);
                            break;
                        },
                    }
                }
            }
        });
    }

    async fn enqueue_job(&self, scheduled_at: DateTime<Utc>) {
        let criteria = UniquenessCriteria::by_executor()
            .and_within(TimeDelta::zero())
            .and_key(&self.data)
            .and_executor();

        let _ = E::builder()
            .schedule_at(scheduled_at)
            .with_data(self.data.clone())
            .unique(criteria)
            .enqueue_to_backend(&self.backend)
            .await
            .inspect_err(|err| {
                tracing::error!(?err, "Failed to enqueue cron job {} with {err}", E::NAME);
            });
    }
}