cloudiful-scheduler 0.3.5

Single-job async scheduling library for background work with optional Valkey-backed state.
Documentation
use crate::execution_guard::ExecutionGuardScope;
use crate::model::RunStatus;
use crate::store::StoreOperation;
use chrono::{DateTime, Utc};
use log::{debug, info, warn};

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StateLoadSource {
    New,
    Restored,
    Repaired,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SchedulerStopReason {
    Cancelled,
    Shutdown,
    Terminal,
    ChannelClosed,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SchedulerEvent {
    StateLoaded {
        job_id: String,
        trigger_count: u32,
        next_run_at: Option<DateTime<Utc>>,
        source: StateLoadSource,
    },
    StateRepaired {
        job_id: String,
        trigger_count: u32,
        previous_next_run_at: Option<DateTime<Utc>>,
        repaired_next_run_at: Option<DateTime<Utc>>,
    },
    TriggerEmitted {
        job_id: String,
        scheduled_at: DateTime<Utc>,
        catch_up: bool,
        trigger_count: u32,
    },
    RunCompleted {
        job_id: String,
        scheduled_at: DateTime<Utc>,
        catch_up: bool,
        trigger_count: u32,
        status: RunStatus,
        error: Option<String>,
    },
    ExecutionGuardAcquired {
        job_id: String,
        resource_id: String,
        scope: ExecutionGuardScope,
        lease_key: String,
        scheduled_at: Option<DateTime<Utc>>,
        catch_up: bool,
        trigger_count: u32,
    },
    ExecutionGuardContended {
        job_id: String,
        resource_id: String,
        scope: ExecutionGuardScope,
        scheduled_at: Option<DateTime<Utc>>,
        catch_up: bool,
        trigger_count: u32,
    },
    ExecutionGuardRenewed {
        job_id: String,
        resource_id: String,
        scope: ExecutionGuardScope,
        lease_key: String,
        scheduled_at: Option<DateTime<Utc>>,
        catch_up: bool,
        trigger_count: u32,
        renewal_count: u32,
    },
    ExecutionGuardRenewFailed {
        job_id: String,
        resource_id: String,
        scope: ExecutionGuardScope,
        lease_key: String,
        scheduled_at: Option<DateTime<Utc>>,
        catch_up: bool,
        trigger_count: u32,
        renewal_count: u32,
        failed_renewal_count: u32,
        error: String,
    },
    ExecutionGuardLost {
        job_id: String,
        resource_id: String,
        scope: ExecutionGuardScope,
        lease_key: String,
        scheduled_at: Option<DateTime<Utc>>,
        catch_up: bool,
        trigger_count: u32,
        renewal_count: u32,
        failed_renewal_count: u32,
    },
    ExecutionGuardReleased {
        job_id: String,
        resource_id: String,
        scope: ExecutionGuardScope,
        lease_key: String,
        scheduled_at: Option<DateTime<Utc>>,
        catch_up: bool,
        trigger_count: u32,
    },
    ExecutionGuardReleaseFailed {
        job_id: String,
        resource_id: String,
        scope: ExecutionGuardScope,
        lease_key: String,
        scheduled_at: Option<DateTime<Utc>>,
        catch_up: bool,
        trigger_count: u32,
        error: String,
    },
    StoreDegraded {
        job_id: String,
        operation: StoreOperation,
        error: String,
    },
    TerminalStateDeleted {
        job_id: String,
        trigger_count: u32,
    },
    SchedulerStopped {
        job_id: String,
        trigger_count: u32,
        reason: SchedulerStopReason,
    },
}

pub trait SchedulerObserver: Send + Sync + 'static {
    fn on_event(&self, event: &SchedulerEvent);
}

#[derive(Debug, Clone, Copy, Default)]
pub struct NoopObserver;

impl SchedulerObserver for NoopObserver {
    fn on_event(&self, _event: &SchedulerEvent) {}
}

#[derive(Debug, Clone, Copy, Default)]
pub struct LogObserver;

impl SchedulerObserver for LogObserver {
    fn on_event(&self, event: &SchedulerEvent) {
        match event {
            SchedulerEvent::StateLoaded {
                job_id,
                trigger_count,
                next_run_at,
                source,
            } => info!(
                "scheduler state loaded job_id={} source={:?} trigger_count={} next_run_at={:?}",
                job_id, source, trigger_count, next_run_at
            ),
            SchedulerEvent::StateRepaired {
                job_id,
                trigger_count,
                previous_next_run_at,
                repaired_next_run_at,
            } => warn!(
                "scheduler repaired state job_id={} trigger_count={} previous_next_run_at={:?} repaired_next_run_at={:?}",
                job_id, trigger_count, previous_next_run_at, repaired_next_run_at
            ),
            SchedulerEvent::TriggerEmitted {
                job_id,
                scheduled_at,
                catch_up,
                trigger_count,
            } => debug!(
                "scheduler trigger emitted job_id={} scheduled_at={} catch_up={} trigger_count={}",
                job_id, scheduled_at, catch_up, trigger_count
            ),
            SchedulerEvent::RunCompleted {
                job_id,
                scheduled_at,
                catch_up,
                trigger_count,
                status,
                error,
            } => debug!(
                "scheduler run completed job_id={} scheduled_at={} catch_up={} trigger_count={} status={:?} error={:?}",
                job_id, scheduled_at, catch_up, trigger_count, status, error
            ),
            SchedulerEvent::ExecutionGuardAcquired {
                job_id,
                resource_id,
                scope,
                lease_key,
                scheduled_at,
                catch_up,
                trigger_count,
            } => debug!(
                "scheduler execution guard acquired job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={}",
                job_id, resource_id, scope, lease_key, scheduled_at, catch_up, trigger_count
            ),
            SchedulerEvent::ExecutionGuardContended {
                job_id,
                resource_id,
                scope,
                scheduled_at,
                catch_up,
                trigger_count,
            } => debug!(
                "scheduler execution guard contended job_id={} resource_id={} scope={:?} scheduled_at={:?} catch_up={} trigger_count={}",
                job_id, resource_id, scope, scheduled_at, catch_up, trigger_count
            ),
            SchedulerEvent::ExecutionGuardRenewed {
                job_id,
                resource_id,
                scope,
                lease_key,
                scheduled_at,
                catch_up,
                trigger_count,
                renewal_count,
            } => debug!(
                "scheduler execution guard renewed job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} renewal_count={}",
                job_id,
                resource_id,
                scope,
                lease_key,
                scheduled_at,
                catch_up,
                trigger_count,
                renewal_count
            ),
            SchedulerEvent::ExecutionGuardRenewFailed {
                job_id,
                resource_id,
                scope,
                lease_key,
                scheduled_at,
                catch_up,
                trigger_count,
                renewal_count,
                failed_renewal_count,
                error,
            } => warn!(
                "scheduler execution guard renew failed job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} renewal_count={} failed_renewal_count={} error={}",
                job_id,
                resource_id,
                scope,
                lease_key,
                scheduled_at,
                catch_up,
                trigger_count,
                renewal_count,
                failed_renewal_count,
                error
            ),
            SchedulerEvent::ExecutionGuardLost {
                job_id,
                resource_id,
                scope,
                lease_key,
                scheduled_at,
                catch_up,
                trigger_count,
                renewal_count,
                failed_renewal_count,
            } => warn!(
                "scheduler execution guard lost job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} renewal_count={} failed_renewal_count={}",
                job_id,
                resource_id,
                scope,
                lease_key,
                scheduled_at,
                catch_up,
                trigger_count,
                renewal_count,
                failed_renewal_count
            ),
            SchedulerEvent::ExecutionGuardReleased {
                job_id,
                resource_id,
                scope,
                lease_key,
                scheduled_at,
                catch_up,
                trigger_count,
            } => debug!(
                "scheduler execution guard released job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={}",
                job_id, resource_id, scope, lease_key, scheduled_at, catch_up, trigger_count
            ),
            SchedulerEvent::ExecutionGuardReleaseFailed {
                job_id,
                resource_id,
                scope,
                lease_key,
                scheduled_at,
                catch_up,
                trigger_count,
                error,
            } => warn!(
                "scheduler execution guard release failed job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} error={}",
                job_id, resource_id, scope, lease_key, scheduled_at, catch_up, trigger_count, error
            ),
            SchedulerEvent::StoreDegraded {
                job_id,
                operation,
                error,
            } => warn!(
                "scheduler store degraded job_id={} operation={:?} error={}",
                job_id, operation, error
            ),
            SchedulerEvent::TerminalStateDeleted {
                job_id,
                trigger_count,
            } => info!(
                "scheduler deleted terminal state job_id={} trigger_count={}",
                job_id, trigger_count
            ),
            SchedulerEvent::SchedulerStopped {
                job_id,
                trigger_count,
                reason,
            } => info!(
                "scheduler stopped job_id={} trigger_count={} reason={:?}",
                job_id, trigger_count, reason
            ),
        }
    }
}