cloudiful-scheduler 0.3.4

Single-job async scheduling library for background work with optional Valkey-backed state.
Documentation
#![cfg(feature = "valkey-store")]

use chrono::{TimeDelta, Utc};
use redis::{AsyncCommands, Client};
use scheduler::{
    Job, JobState, Schedule, Scheduler, SchedulerConfig, StateStore, Task, ValkeyStateStore,
};
use std::sync::{
    Arc,
    atomic::{AtomicUsize, Ordering},
};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;

const DEFAULT_KEY_PREFIX: &str = "scheduler:valkey:job-state:";
const LEGACY_DEFAULT_KEY_PREFIX: &str = "scheduler:job-state:";

fn valkey_url() -> Option<String> {
    std::env::var("SCHEDULER_VALKEY_URL").ok()
}

fn unique_id() -> String {
    let now = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .expect("system time before unix epoch")
        .as_nanos();
    format!("scheduler-it-{}-{now}", std::process::id())
}

fn fixture_state(job_id: &str) -> JobState {
    JobState {
        job_id: job_id.to_string(),
        trigger_count: 3,
        last_run_at: Some(Utc::now()),
        last_success_at: Some(Utc::now() + TimeDelta::seconds(1)),
        next_run_at: Some(Utc::now() + TimeDelta::seconds(10)),
        last_error: Some("integration".to_string()),
    }
}

#[tokio::test]
#[ignore = "requires SCHEDULER_VALKEY_URL pointing to a reachable Valkey server"]
async fn valkey_store_round_trips_state() {
    let url = valkey_url().expect("SCHEDULER_VALKEY_URL must be set");
    let prefix = format!("scheduler:test:valkey-store:{}:", unique_id());
    let store = ValkeyStateStore::with_prefix(&url, prefix.clone())
        .await
        .expect("failed to connect to valkey");
    let client = Client::open(url).expect("invalid valkey url");
    let mut connection = client
        .get_multiplexed_async_connection()
        .await
        .expect("failed to get valkey connection");
    let job_id = unique_id();
    let key = format!("{prefix}{job_id}");
    let state = fixture_state(&job_id);

    store.save(&state).await.expect("save failed");
    let loaded = store.load(&job_id).await.expect("load failed");

    assert_eq!(loaded, Some(state));

    let _: usize = connection.del(key).await.expect("cleanup failed");
}

#[tokio::test]
#[ignore = "requires SCHEDULER_VALKEY_URL pointing to a reachable Valkey server"]
async fn valkey_store_reads_legacy_default_prefix() {
    let url = valkey_url().expect("SCHEDULER_VALKEY_URL must be set");
    let store = ValkeyStateStore::new(&url)
        .await
        .expect("failed to connect to valkey");
    let client = Client::open(url).expect("invalid valkey url");
    let mut connection = client
        .get_multiplexed_async_connection()
        .await
        .expect("failed to get valkey connection");
    let job_id = unique_id();
    let legacy_key = format!("{LEGACY_DEFAULT_KEY_PREFIX}{job_id}");
    let default_key = format!("{DEFAULT_KEY_PREFIX}{job_id}");
    let state = fixture_state(&job_id);
    let payload = serde_json::to_string(&state).expect("failed to serialize job state");

    let _: () = connection
        .set(&legacy_key, payload)
        .await
        .expect("failed to seed legacy key");

    let loaded = store.load(&job_id).await.expect("load failed");

    assert_eq!(loaded, Some(state));

    let _: usize = connection
        .del(legacy_key)
        .await
        .expect("legacy cleanup failed");
    let _: usize = connection
        .del(default_key)
        .await
        .expect("default cleanup failed");
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires SCHEDULER_VALKEY_URL pointing to a reachable Valkey server"]
async fn scheduler_restores_state_across_instances_using_valkey() {
    let url = valkey_url().expect("SCHEDULER_VALKEY_URL must be set");
    let prefix = format!("scheduler:test:restore:{}:", unique_id());
    let client = Client::open(url.clone()).expect("invalid valkey url");
    let mut connection = client
        .get_multiplexed_async_connection()
        .await
        .expect("failed to get valkey connection");
    let job_id = unique_id();
    let key = format!("{prefix}{job_id}");
    let scheduler_one = Scheduler::new(
        SchedulerConfig::default(),
        ValkeyStateStore::with_prefix(&url, prefix.clone())
            .await
            .expect("failed to connect to valkey"),
    );
    let handle = scheduler_one.handle();
    let (tx, mut rx) = mpsc::channel::<()>(1);
    let invocations = Arc::new(AtomicUsize::new(0));
    let seen = invocations.clone();
    let first_at = Utc::now() + TimeDelta::milliseconds(30);
    let second_at = Utc::now() + TimeDelta::milliseconds(120);
    let times = vec![
        first_at.with_timezone(&chrono_tz::Asia::Shanghai),
        second_at.with_timezone(&chrono_tz::Asia::Shanghai),
    ];

    let job = Job::without_deps(
        job_id.clone(),
        Schedule::AtTimes(times.clone()),
        Task::from_async(move |_| {
            let tx = tx.clone();
            let seen = seen.clone();
            async move {
                seen.fetch_add(1, Ordering::SeqCst);
                let _ = tx.send(()).await;
                tokio::time::sleep(Duration::from_millis(20)).await;
                Ok(())
            }
        }),
    );

    let shutdown_handle = handle.clone();
    tokio::spawn(async move {
        let _ = rx.recv().await;
        shutdown_handle.shutdown();
    });

    let first_report = scheduler_one.run(job).await.expect("first run failed");

    assert_eq!(invocations.load(Ordering::SeqCst), 1);
    assert_eq!(first_report.history.len(), 1);

    let scheduler_two = Scheduler::new(
        SchedulerConfig::default(),
        ValkeyStateStore::with_prefix(&url, prefix.clone())
            .await
            .expect("failed to reconnect to valkey"),
    );
    let seen = invocations.clone();
    let job = Job::without_deps(
        job_id.clone(),
        Schedule::AtTimes(times),
        Task::from_async(move |_| {
            let seen = seen.clone();
            async move {
                seen.fetch_add(1, Ordering::SeqCst);
                Ok(())
            }
        }),
    );

    let second_report = scheduler_two.run(job).await.expect("second run failed");

    assert_eq!(invocations.load(Ordering::SeqCst), 2);
    assert_eq!(second_report.history.len(), 1);
    assert_eq!(second_report.state.trigger_count, 2);
    assert!(second_report.state.next_run_at.is_none());

    let _: usize = connection.del(key).await.expect("cleanup failed");
}