cloudiful-scheduler 0.3.5

Single-job async scheduling library for background work with optional Valkey-backed state.
Documentation
mod support;

use chrono::Utc;
use scheduler::{
    InMemoryStateStore, Job, RunContext, Schedule, Scheduler, SchedulerConfig, Task, TaskContext,
};
use std::sync::{
    Arc,
    atomic::{AtomicUsize, Ordering},
};
use std::time::Duration;
use support::{RefreshDeps, shanghai_after};

#[tokio::test]
async fn async_task_without_context_runs() {
    let scheduler = Scheduler::new(SchedulerConfig::default(), InMemoryStateStore::new());
    let invocations = Arc::new(AtomicUsize::new(0));
    let seen = invocations.clone();

    let job = Job::without_deps(
        "async-no-context",
        Schedule::Interval(Duration::from_millis(20)),
        Task::from_async(move |_| {
            let seen = seen.clone();
            async move {
                seen.fetch_add(1, Ordering::SeqCst);
                Ok(())
            }
        }),
    )
    .with_max_runs(1);

    let report = scheduler.run(job).await.unwrap();

    assert_eq!(invocations.load(Ordering::SeqCst), 1);
    assert_eq!(report.history.len(), 1);
    assert_eq!(report.state.trigger_count, 1);
}

#[tokio::test]
async fn sync_task_without_context_runs() {
    let scheduler = Scheduler::new(SchedulerConfig::default(), InMemoryStateStore::new());
    let invocations = Arc::new(AtomicUsize::new(0));
    let seen = invocations.clone();

    let job = Job::without_deps(
        "sync-no-context",
        Schedule::Interval(Duration::from_millis(20)),
        Task::from_sync(move |_| {
            seen.fetch_add(1, Ordering::SeqCst);
            Ok(())
        }),
    )
    .with_max_runs(1);

    let report = scheduler.run(job).await.unwrap();

    assert_eq!(invocations.load(Ordering::SeqCst), 1);
    assert_eq!(report.history.len(), 1);
}

#[tokio::test]
async fn async_task_with_run_context_receives_scheduled_time() {
    let scheduler = Scheduler::new(SchedulerConfig::default(), InMemoryStateStore::new());
    let planned = shanghai_after(70).with_timezone(&Utc);

    let job = Job::without_deps(
        "async-with-run",
        Schedule::AtTimes(vec![planned.with_timezone(&chrono_tz::Asia::Shanghai)]),
        Task::from_async(move |context: TaskContext<()>| async move {
            let run: RunContext = context.run;
            assert_eq!(run.scheduled_at, planned);
            Ok(())
        }),
    );

    let report = scheduler.run(job).await.unwrap();

    assert_eq!(report.history.len(), 1);
    assert_eq!(report.history[0].scheduled_at, planned);
}

#[tokio::test]
async fn async_task_with_injected_deps_runs() {
    let scheduler = Scheduler::new(SchedulerConfig::default(), InMemoryStateStore::new());

    let job = Job::new(
        "async-with-deps",
        Schedule::Interval(Duration::from_millis(20)),
        RefreshDeps {
            label: "deps-only",
            seen: AtomicUsize::new(0),
        },
        Task::from_async(|context: TaskContext<RefreshDeps>| async move {
            assert_eq!(context.deps.label, "deps-only");
            context.deps.seen.fetch_add(1, Ordering::SeqCst);
            Ok(())
        }),
    )
    .with_max_runs(1);

    let report = scheduler.run(job).await.unwrap();

    assert_eq!(report.history.len(), 1);
    assert_eq!(report.state.trigger_count, 1);
}

#[tokio::test]
async fn async_task_with_full_context_runs() {
    let scheduler = Scheduler::new(SchedulerConfig::default(), InMemoryStateStore::new());
    let planned = shanghai_after(60).with_timezone(&Utc);

    let job = Job::new(
        "async-with-context",
        Schedule::AtTimes(vec![planned.with_timezone(&chrono_tz::Asia::Shanghai)]),
        RefreshDeps {
            label: "context",
            seen: AtomicUsize::new(0),
        },
        Task::from_async(move |context: TaskContext<RefreshDeps>| async move {
            assert_eq!(context.run.scheduled_at, planned);
            assert_eq!(context.deps.label, "context");
            context.deps.seen.fetch_add(1, Ordering::SeqCst);
            Ok(())
        }),
    );

    let report = scheduler.run(job).await.unwrap();

    assert_eq!(report.history.len(), 1);
}

#[tokio::test]
async fn blocking_task_runs() {
    let scheduler = Scheduler::new(SchedulerConfig::default(), InMemoryStateStore::new());
    let invocations = Arc::new(AtomicUsize::new(0));
    let seen = invocations.clone();

    let job = Job::without_deps(
        "blocking-task",
        Schedule::Interval(Duration::from_millis(20)),
        Task::from_blocking(move |_| {
            std::thread::sleep(Duration::from_millis(10));
            seen.fetch_add(1, Ordering::SeqCst);
            Ok(())
        }),
    )
    .with_max_runs(1);

    let report = scheduler.run(job).await.unwrap();

    assert_eq!(invocations.load(Ordering::SeqCst), 1);
    assert_eq!(report.history.len(), 1);
}