cloudiful-scheduler 0.4.1

Single-job async scheduling library for background work with optional Valkey-backed state.
Documentation
mod support;

use chrono::Utc;
use scheduler::{
    CoordinatedClaim, CoordinatedLeaseConfig, CoordinatedPendingTrigger, CoordinatedRuntimeState,
    CoordinatedStateStore, ExecutionGuardRenewal, ExecutionGuardScope, ExecutionLease,
    InMemoryStateStore, Job, JobState, OverlapPolicy, Schedule, Scheduler, SchedulerConfig,
    SchedulerEvent, SchedulerObserver, Task,
};
use std::convert::Infallible;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use support::shanghai_after;

#[derive(Clone, Default)]
struct RecordingObserver {
    events: Arc<Mutex<Vec<SchedulerEvent>>>,
}

impl RecordingObserver {
    fn snapshot(&self) -> Vec<SchedulerEvent> {
        self.events.lock().unwrap().clone()
    }
}

impl SchedulerObserver for RecordingObserver {
    fn on_event(&self, event: &SchedulerEvent) {
        self.events.lock().unwrap().push(event.clone());
    }
}

#[derive(Clone)]
struct FakeCoordinatedStore {
    inner: Arc<Mutex<FakeCoordinatedStoreState>>,
}

#[derive(Clone)]
struct FakeCoordinatedStoreState {
    runtime: CoordinatedRuntimeState,
    inflight: Option<FakeInflight>,
}

#[derive(Clone)]
struct FakeInflight {
    _trigger: CoordinatedPendingTrigger,
    _resource_id: String,
    lease: ExecutionLease,
    expires_at: Instant,
}

impl FakeCoordinatedStore {
    fn new(state: JobState) -> Self {
        Self {
            inner: Arc::new(Mutex::new(FakeCoordinatedStoreState {
                runtime: CoordinatedRuntimeState {
                    state,
                    revision: 0,
                    paused: false,
                },
                inflight: None,
            })),
        }
    }
}

impl CoordinatedStateStore for FakeCoordinatedStore {
    type Error = Infallible;

    async fn load_or_initialize(
        &self,
        _job_id: &str,
        _initial_state: JobState,
    ) -> Result<CoordinatedRuntimeState, Self::Error> {
        Ok(self.inner.lock().unwrap().runtime.clone())
    }

    async fn save_state(
        &self,
        _job_id: &str,
        revision: u64,
        state: &JobState,
    ) -> Result<bool, Self::Error> {
        let mut inner = self.inner.lock().unwrap();
        if inner.runtime.revision != revision || inner.inflight.is_some() {
            return Ok(false);
        }
        inner.runtime.revision += 1;
        inner.runtime.state = state.clone();
        Ok(true)
    }

    async fn reclaim_inflight(
        &self,
        _job_id: &str,
        _resource_id: &str,
        _lease_config: CoordinatedLeaseConfig,
    ) -> Result<Option<CoordinatedClaim>, Self::Error> {
        Ok(None)
    }

    async fn claim_trigger(
        &self,
        job_id: &str,
        resource_id: &str,
        revision: u64,
        trigger: CoordinatedPendingTrigger,
        next_state: &JobState,
        lease_config: CoordinatedLeaseConfig,
    ) -> Result<Option<CoordinatedClaim>, Self::Error> {
        let mut inner = self.inner.lock().unwrap();
        if inner.runtime.paused || inner.runtime.revision != revision || inner.inflight.is_some() {
            return Ok(None);
        }

        inner.runtime.revision += 1;
        inner.runtime.state = next_state.clone();
        let lease = ExecutionLease::new(
            job_id.to_string(),
            resource_id.to_string(),
            ExecutionGuardScope::Occurrence,
            Some(trigger.scheduled_at),
            "claim-token",
            "claim-lease",
        );
        inner.inflight = Some(FakeInflight {
            _trigger: trigger.clone(),
            _resource_id: resource_id.to_string(),
            expires_at: Instant::now() + lease_config.ttl,
            lease: lease.clone(),
        });

        Ok(Some(CoordinatedClaim {
            state: inner.runtime.clone(),
            trigger,
            lease,
            replayed: false,
        }))
    }

    async fn renew(
        &self,
        lease: &ExecutionLease,
        lease_config: CoordinatedLeaseConfig,
    ) -> Result<ExecutionGuardRenewal, Self::Error> {
        let mut inner = self.inner.lock().unwrap();
        let Some(inflight) = inner.inflight.as_mut() else {
            return Ok(ExecutionGuardRenewal::Lost);
        };
        if inflight.lease.token != lease.token {
            return Ok(ExecutionGuardRenewal::Lost);
        }
        inflight.expires_at = Instant::now() + lease_config.ttl;
        Ok(ExecutionGuardRenewal::Renewed)
    }

    async fn complete(
        &self,
        _job_id: &str,
        revision: u64,
        lease: &ExecutionLease,
        state: &JobState,
    ) -> Result<bool, Self::Error> {
        let mut inner = self.inner.lock().unwrap();
        if inner.runtime.revision != revision {
            return Ok(false);
        }
        if inner
            .inflight
            .as_ref()
            .map(|value| value.lease.token.as_str())
            != Some(lease.token.as_str())
        {
            return Ok(false);
        }
        inner.runtime.revision += 1;
        inner.runtime.state = state.clone();
        inner.inflight = None;
        Ok(true)
    }

    async fn delete(&self, _job_id: &str) -> Result<(), Self::Error> {
        Ok(())
    }

    async fn pause(&self, _job_id: &str) -> Result<bool, Self::Error> {
        let mut inner = self.inner.lock().unwrap();
        let changed = !inner.runtime.paused;
        inner.runtime.paused = true;
        Ok(changed)
    }

    async fn resume(&self, _job_id: &str) -> Result<bool, Self::Error> {
        let mut inner = self.inner.lock().unwrap();
        let changed = inner.runtime.paused;
        inner.runtime.paused = false;
        Ok(changed)
    }
}

fn core_event_kinds(events: &[SchedulerEvent]) -> Vec<&'static str> {
    events
        .iter()
        .filter_map(|event| match event {
            SchedulerEvent::StateLoaded { .. } => Some("state_loaded"),
            SchedulerEvent::TriggerEmitted { .. } => Some("trigger_emitted"),
            SchedulerEvent::RunCompleted { .. } => Some("run_completed"),
            SchedulerEvent::SchedulerStopped { .. } => Some("scheduler_stopped"),
            _ => None,
        })
        .collect()
}

#[tokio::test]
async fn legacy_and_coordinated_share_core_event_sequence() {
    let when = shanghai_after(20);

    let legacy_observer = RecordingObserver::default();
    let legacy_scheduler = Scheduler::with_observer(
        SchedulerConfig::default(),
        InMemoryStateStore::new(),
        legacy_observer.clone(),
    );
    let legacy_report = legacy_scheduler
        .run(
            Job::without_deps(
                "runtime-reuse-legacy",
                Schedule::AtTimes(vec![when]),
                Task::from_async(|_| async { Ok(()) }),
            )
            .with_overlap_policy(OverlapPolicy::Forbid)
            .with_max_runs(1),
        )
        .await
        .unwrap();

    let coordinated_observer = RecordingObserver::default();
    let coordinated_scheduler = Scheduler::with_observer_and_coordinated_state_store(
        SchedulerConfig::default(),
        FakeCoordinatedStore::new(JobState::new(
            "runtime-reuse-coordinated",
            Some(when.with_timezone(&Utc)),
        )),
        coordinated_observer.clone(),
        CoordinatedLeaseConfig {
            ttl: Duration::from_secs(1),
            renew_interval: Duration::from_millis(50),
        },
    );
    let coordinated_report = coordinated_scheduler
        .run(
            Job::without_deps(
                "runtime-reuse-coordinated",
                Schedule::AtTimes(vec![when]),
                Task::from_async(|_| async { Ok(()) }),
            )
            .with_overlap_policy(OverlapPolicy::Forbid)
            .with_max_runs(1),
        )
        .await
        .unwrap();

    assert_eq!(legacy_report.history.len(), 1);
    assert_eq!(coordinated_report.history.len(), 1);
    assert_eq!(
        core_event_kinds(&legacy_observer.snapshot()),
        vec!["state_loaded", "trigger_emitted", "run_completed", "scheduler_stopped"]
    );
    assert_eq!(
        core_event_kinds(&coordinated_observer.snapshot()),
        vec!["state_loaded", "trigger_emitted", "run_completed", "scheduler_stopped"]
    );
    let coordinated_events = coordinated_observer.snapshot();
    assert!(coordinated_events.iter().any(|event| matches!(event, SchedulerEvent::ExecutionGuardAcquired { .. })));
    assert!(coordinated_events.iter().any(|event| matches!(event, SchedulerEvent::ExecutionGuardReleased { .. })));
}