cloudiful-scheduler 0.4.2

Single-job async scheduling library for background work with optional Valkey-backed state.
Documentation
#[path = "support/time.rs"]
mod time_support;

use chrono::{TimeDelta, Utc};
use scheduler::{
    InMemoryStateStore, Job, JobState, NoopObserver, Schedule, Scheduler, SchedulerConfig,
    SchedulerEvent, SchedulerObserver, SchedulerStopReason, StateLoadSource, StateStore, Task,
    TerminalStatePolicy,
};
use std::sync::{
    Arc, Mutex,
    atomic::{AtomicUsize, Ordering},
};
use std::time::Duration;
use time_support::shanghai_after;
use tokio::sync::mpsc;

#[derive(Clone, Default)]
struct RecordingObserver {
    events: Arc<Mutex<Vec<SchedulerEvent>>>,
}

impl RecordingObserver {
    fn snapshot(&self) -> Vec<SchedulerEvent> {
        self.events.lock().unwrap().clone()
    }
}

impl SchedulerObserver for RecordingObserver {
    fn on_event(&self, event: &SchedulerEvent) {
        self.events.lock().unwrap().push(event.clone());
    }
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn observer_receives_repair_trigger_and_completion_events() {
    let store = Arc::new(InMemoryStateStore::new());
    store
        .save(&JobState {
            job_id: "observer-repair".to_string(),
            trigger_count: 2,
            last_run_at: Some(Utc::now() - TimeDelta::seconds(10)),
            last_success_at: Some(Utc::now() - TimeDelta::seconds(9)),
            next_run_at: None,
            last_error: Some("stale".to_string()),
        })
        .await
        .unwrap();

    let observer = RecordingObserver::default();
    let scheduler = Arc::new(Scheduler::with_observer(
        SchedulerConfig::default(),
        store.clone(),
        observer.clone(),
    ));
    let handle = scheduler.handle();
    let invocations = Arc::new(AtomicUsize::new(0));
    let seen = invocations.clone();
    let (tx, mut rx) = mpsc::channel::<()>(1);

    let task = {
        let scheduler = scheduler.clone();
        tokio::spawn(async move {
            scheduler
                .run(Job::without_deps(
                    "observer-repair",
                    Schedule::Interval(Duration::from_millis(80)),
                    Task::from_async(move |_| {
                        let tx = tx.clone();
                        let seen = seen.clone();
                        async move {
                            seen.fetch_add(1, Ordering::SeqCst);
                            let _ = tx.send(()).await;
                            Ok(())
                        }
                    }),
                ))
                .await
                .unwrap()
        })
    };

    let shutdown_handle = handle.clone();
    tokio::spawn(async move {
        let _ = rx.recv().await;
        shutdown_handle.shutdown();
    });

    let _report = task.await.unwrap();
    let events = observer.snapshot();

    assert_eq!(invocations.load(Ordering::SeqCst), 1);
    assert!(events.iter().any(|event| {
        matches!(
            event,
            SchedulerEvent::StateRepaired { job_id, trigger_count, .. }
                if job_id == "observer-repair" && *trigger_count == 2
        )
    }));
    assert!(events.iter().any(|event| {
        matches!(
            event,
            SchedulerEvent::StateLoaded { job_id, source, .. }
                if job_id == "observer-repair" && *source == StateLoadSource::Repaired
        )
    }));
    assert!(events.iter().any(|event| {
        matches!(
            event,
            SchedulerEvent::TriggerEmitted { job_id, catch_up, .. }
                if job_id == "observer-repair" && !catch_up
        )
    }));
    assert!(events.iter().any(|event| {
        matches!(
            event,
            SchedulerEvent::RunCompleted { job_id, trigger_count, .. }
                if job_id == "observer-repair" && *trigger_count == 3
        )
    }));
    assert!(events.iter().any(|event| {
        matches!(
            event,
            SchedulerEvent::SchedulerStopped { job_id, reason, .. }
                if job_id == "observer-repair" && *reason == SchedulerStopReason::Shutdown
        )
    }));
}

#[tokio::test]
async fn terminal_state_policy_delete_removes_state_and_emits_event() {
    let store = Arc::new(InMemoryStateStore::new());
    let observer = RecordingObserver::default();
    let scheduler = Scheduler::with_observer(
        SchedulerConfig {
            terminal_state_policy: TerminalStatePolicy::Delete,
            ..SchedulerConfig::default()
        },
        store.clone(),
        observer.clone(),
    );

    let report = scheduler
        .run(
            Job::without_deps(
                "cleanup-terminal",
                Schedule::AtTimes(vec![shanghai_after(20)]),
                Task::from_async(|_| async { Ok(()) }),
            )
            .with_max_runs(1),
        )
        .await
        .unwrap();

    let persisted = store.load("cleanup-terminal").await.unwrap();
    let events = observer.snapshot();

    assert_eq!(report.state.trigger_count, 1);
    assert!(persisted.is_none());
    assert!(events.iter().any(|event| {
        matches!(
            event,
            SchedulerEvent::TerminalStateDeleted { job_id, trigger_count }
                if job_id == "cleanup-terminal" && *trigger_count == 1
        )
    }));
    assert!(events.iter().any(|event| {
        matches!(
            event,
            SchedulerEvent::SchedulerStopped { job_id, reason, .. }
                if job_id == "cleanup-terminal" && *reason == SchedulerStopReason::Terminal
        )
    }));
}

#[test]
fn noop_observer_accepts_events_without_side_effects() {
    let observer = NoopObserver;
    observer.on_event(&SchedulerEvent::SchedulerStopped {
        job_id: "noop".to_string(),
        trigger_count: 0,
        reason: SchedulerStopReason::Terminal,
    });
}