cloudiful-scheduler 0.4.2

Single-job async scheduling library for background work with optional Valkey-backed state.
Documentation
use crate::error::ExecutionGuardErrorKind;
use chrono::{DateTime, Utc};
use std::convert::Infallible;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ExecutionGuardScope {
    #[default]
    Occurrence,
    Resource,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExecutionSlot {
    pub job_id: String,
    pub resource_id: String,
    pub scope: ExecutionGuardScope,
    pub scheduled_at: Option<DateTime<Utc>>,
}

impl ExecutionSlot {
    pub fn new(job_id: impl Into<String>, scheduled_at: DateTime<Utc>) -> Self {
        let job_id = job_id.into();
        Self::for_occurrence(job_id.clone(), job_id, scheduled_at)
    }

    pub fn for_occurrence(
        job_id: impl Into<String>,
        resource_id: impl Into<String>,
        scheduled_at: DateTime<Utc>,
    ) -> Self {
        Self {
            job_id: job_id.into(),
            resource_id: resource_id.into(),
            scope: ExecutionGuardScope::Occurrence,
            scheduled_at: Some(scheduled_at),
        }
    }

    pub fn for_resource(job_id: impl Into<String>, resource_id: impl Into<String>) -> Self {
        Self {
            job_id: job_id.into(),
            resource_id: resource_id.into(),
            scope: ExecutionGuardScope::Resource,
            scheduled_at: None,
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExecutionLease {
    pub job_id: String,
    pub resource_id: String,
    pub scope: ExecutionGuardScope,
    pub scheduled_at: Option<DateTime<Utc>>,
    pub token: String,
    pub lease_key: String,
}

impl ExecutionLease {
    pub fn new(
        job_id: impl Into<String>,
        resource_id: impl Into<String>,
        scope: ExecutionGuardScope,
        scheduled_at: Option<DateTime<Utc>>,
        token: impl Into<String>,
        lease_key: impl Into<String>,
    ) -> Self {
        Self {
            job_id: job_id.into(),
            resource_id: resource_id.into(),
            scope,
            scheduled_at,
            token: token.into(),
            lease_key: lease_key.into(),
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExecutionGuardAcquire {
    Acquired(ExecutionLease),
    Contended,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExecutionGuardRenewal {
    Renewed,
    Lost,
}

pub trait ExecutionGuard {
    type Error: std::error::Error + Send + Sync + 'static;

    fn acquire(
        &self,
        slot: ExecutionSlot,
    ) -> impl Future<Output = Result<ExecutionGuardAcquire, Self::Error>> + Send;

    fn renew(
        &self,
        lease: &ExecutionLease,
    ) -> impl Future<Output = Result<ExecutionGuardRenewal, Self::Error>> + Send;

    fn release(
        &self,
        lease: &ExecutionLease,
    ) -> impl Future<Output = Result<(), Self::Error>> + Send;

    fn classify_error(_error: &Self::Error) -> ExecutionGuardErrorKind
    where
        Self: Sized,
    {
        ExecutionGuardErrorKind::Unknown
    }

    fn renew_interval(&self, _lease: &ExecutionLease) -> Option<Duration> {
        None
    }
}

#[derive(Debug, Clone, Copy, Default)]
pub struct NoopExecutionGuard;

impl ExecutionGuard for NoopExecutionGuard {
    type Error = Infallible;

    async fn acquire(&self, slot: ExecutionSlot) -> Result<ExecutionGuardAcquire, Self::Error> {
        Ok(ExecutionGuardAcquire::Acquired(ExecutionLease::new(
            slot.job_id,
            slot.resource_id,
            slot.scope,
            slot.scheduled_at,
            "",
            "",
        )))
    }

    async fn renew(&self, _lease: &ExecutionLease) -> Result<ExecutionGuardRenewal, Self::Error> {
        Ok(ExecutionGuardRenewal::Renewed)
    }

    async fn release(&self, _lease: &ExecutionLease) -> Result<(), Self::Error> {
        Ok(())
    }
}

impl<T> ExecutionGuard for Arc<T>
where
    T: ExecutionGuard + Send + Sync,
{
    type Error = T::Error;

    async fn acquire(&self, slot: ExecutionSlot) -> Result<ExecutionGuardAcquire, Self::Error> {
        self.as_ref().acquire(slot).await
    }

    async fn renew(&self, lease: &ExecutionLease) -> Result<ExecutionGuardRenewal, Self::Error> {
        self.as_ref().renew(lease).await
    }

    async fn release(&self, lease: &ExecutionLease) -> Result<(), Self::Error> {
        self.as_ref().release(lease).await
    }

    fn classify_error(error: &Self::Error) -> ExecutionGuardErrorKind
    where
        Self: Sized,
    {
        T::classify_error(error)
    }

    fn renew_interval(&self, lease: &ExecutionLease) -> Option<Duration> {
        self.as_ref().renew_interval(lease)
    }
}