cloudiful-scheduler 0.4.2

Single-job async scheduling library for background work with optional Valkey-backed state.
Documentation
#![cfg(feature = "valkey-store")]

#[path = "support/valkey_runtime.rs"]
mod valkey_runtime;
#[path = "support/valkey_state_fixtures.rs"]
mod valkey_state_fixtures;

use chrono::{TimeDelta, Utc};
use redis::AsyncCommands;
use scheduler::{Job, Schedule, Scheduler, SchedulerConfig, StateStore, Task, ValkeyStateStore};
use std::sync::{
    Arc,
    atomic::{AtomicUsize, Ordering},
};
use std::time::Duration;
use valkey_runtime::{connection, unique_id, valkey_url};
use valkey_state_fixtures::fixture_state;
use tokio::sync::mpsc;

const DEFAULT_KEY_PREFIX: &str = "scheduler:valkey:job-state:";
const LEGACY_DEFAULT_KEY_PREFIX: &str = "scheduler:job-state:";

#[tokio::test]
#[ignore = "requires SCHEDULER_VALKEY_URL pointing to a reachable Valkey server"]
async fn valkey_store_round_trips_state() {
    let url = valkey_url().expect("SCHEDULER_VALKEY_URL must be set");
    let prefix = format!("scheduler:test:valkey-store:{}:", unique_id("scheduler-it"));
    let store = ValkeyStateStore::with_prefix(&url, prefix.clone())
        .await
        .expect("failed to connect to valkey");
    let mut connection = connection(&url).await;
    let job_id = unique_id("scheduler-it");
    let key = format!("{prefix}{job_id}");
    let state = fixture_state(&job_id);

    store.save(&state).await.expect("save failed");
    let loaded = store.load(&job_id).await.expect("load failed");

    assert_eq!(loaded, Some(state));

    let _: usize = connection.del(key).await.expect("cleanup failed");
}

#[tokio::test]
#[ignore = "requires SCHEDULER_VALKEY_URL pointing to a reachable Valkey server"]
async fn valkey_store_reads_legacy_default_prefix() {
    let url = valkey_url().expect("SCHEDULER_VALKEY_URL must be set");
    let store = ValkeyStateStore::new(&url)
        .await
        .expect("failed to connect to valkey");
    let mut connection = connection(&url).await;
    let job_id = unique_id("scheduler-it");
    let legacy_key = format!("{LEGACY_DEFAULT_KEY_PREFIX}{job_id}");
    let default_key = format!("{DEFAULT_KEY_PREFIX}{job_id}");
    let state = fixture_state(&job_id);
    let payload = serde_json::to_string(&state).expect("failed to serialize job state");

    let _: () = connection
        .set(&legacy_key, payload)
        .await
        .expect("failed to seed legacy key");

    let loaded = store.load(&job_id).await.expect("load failed");

    assert_eq!(loaded, Some(state));

    let _: usize = connection
        .del(legacy_key)
        .await
        .expect("legacy cleanup failed");
    let _: usize = connection
        .del(default_key)
        .await
        .expect("default cleanup failed");
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires SCHEDULER_VALKEY_URL pointing to a reachable Valkey server"]
async fn scheduler_restores_state_across_instances_using_valkey() {
    let url = valkey_url().expect("SCHEDULER_VALKEY_URL must be set");
    let prefix = format!("scheduler:test:restore:{}:", unique_id("scheduler-it"));
    let mut connection = connection(&url).await;
    let job_id = unique_id("scheduler-it");
    let key = format!("{prefix}{job_id}");
    let scheduler_one = Scheduler::new(
        SchedulerConfig::default(),
        ValkeyStateStore::with_prefix(&url, prefix.clone())
            .await
            .expect("failed to connect to valkey"),
    );
    let handle = scheduler_one.handle();
    let (tx, mut rx) = mpsc::channel::<()>(1);
    let invocations = Arc::new(AtomicUsize::new(0));
    let seen = invocations.clone();
    let first_at = Utc::now() + TimeDelta::milliseconds(30);
    let second_at = Utc::now() + TimeDelta::milliseconds(120);
    let times = vec![
        first_at.with_timezone(&chrono_tz::Asia::Shanghai),
        second_at.with_timezone(&chrono_tz::Asia::Shanghai),
    ];

    let job = Job::without_deps(
        job_id.clone(),
        Schedule::AtTimes(times.clone()),
        Task::from_async(move |_| {
            let tx = tx.clone();
            let seen = seen.clone();
            async move {
                seen.fetch_add(1, Ordering::SeqCst);
                let _ = tx.send(()).await;
                tokio::time::sleep(Duration::from_millis(20)).await;
                Ok(())
            }
        }),
    );

    let shutdown_handle = handle.clone();
    tokio::spawn(async move {
        let _ = rx.recv().await;
        shutdown_handle.shutdown();
    });

    let first_report = scheduler_one.run(job).await.expect("first run failed");

    assert_eq!(invocations.load(Ordering::SeqCst), 1);
    assert_eq!(first_report.history.len(), 1);

    let scheduler_two = Scheduler::new(
        SchedulerConfig::default(),
        ValkeyStateStore::with_prefix(&url, prefix.clone())
            .await
            .expect("failed to reconnect to valkey"),
    );
    let seen = invocations.clone();
    let job = Job::without_deps(
        job_id.clone(),
        Schedule::AtTimes(times),
        Task::from_async(move |_| {
            let seen = seen.clone();
            async move {
                seen.fetch_add(1, Ordering::SeqCst);
                Ok(())
            }
        }),
    );

    let second_report = scheduler_two.run(job).await.expect("second run failed");

    assert_eq!(invocations.load(Ordering::SeqCst), 2);
    assert_eq!(second_report.history.len(), 1);
    assert_eq!(second_report.state.trigger_count, 2);
    assert!(second_report.state.next_run_at.is_none());

    let _: usize = connection.del(key).await.expect("cleanup failed");
}