cloudiful-scheduler 0.4.2

Single-job async scheduling library for background work with optional Valkey-backed state.
Documentation
#![cfg(feature = "valkey-store")]

#[path = "support/valkey_cleanup.rs"]
mod valkey_cleanup;
#[path = "support/valkey_runtime.rs"]
mod valkey_runtime;
#[path = "support/valkey_state_fixtures.rs"]
mod valkey_state_fixtures;

use chrono::Utc;
use scheduler::{
    CoordinatedLeaseConfig, CoordinatedPendingTrigger, CoordinatedStateStore,
    ExecutionGuardRenewal, JobState, ValkeyCoordinatedStateStore,
};
use std::time::Duration;
use valkey_cleanup::delete_matching_prefix;
use valkey_runtime::{connection, unique_id, valkey_url};
use valkey_state_fixtures::fixture_state;

fn lease_config() -> CoordinatedLeaseConfig {
    CoordinatedLeaseConfig {
        ttl: Duration::from_millis(40),
        renew_interval: Duration::from_millis(10),
    }
}

async fn cleanup(
    connection: &mut redis::aio::MultiplexedConnection,
    state_prefix: &str,
    execution_prefix: &str,
) {
    delete_matching_prefix(connection, state_prefix).await;
    delete_matching_prefix(connection, execution_prefix).await;
}

async fn new_store(
    test_name: &str,
) -> (
    String,
    String,
    ValkeyCoordinatedStateStore,
    redis::aio::MultiplexedConnection,
) {
    let url = valkey_url().expect("SCHEDULER_VALKEY_URL must be set");
    let state_prefix = format!("scheduler:test:{test_name}:state:{}:", unique_id(test_name));
    let execution_prefix = format!("scheduler:test:{test_name}:exec:{}:", unique_id(test_name));
    let store = ValkeyCoordinatedStateStore::with_prefixes(
        &url,
        state_prefix.clone(),
        execution_prefix.clone(),
    )
    .await
    .expect("failed to connect to valkey");
    let connection = connection(&url).await;
    (state_prefix, execution_prefix, store, connection)
}

#[tokio::test]
#[ignore = "requires SCHEDULER_VALKEY_URL pointing to a reachable Valkey server"]
async fn save_state_script_updates_state_when_not_inflight() {
    let (state_prefix, execution_prefix, store, mut connection) = new_store("coord-save").await;
    let job_id = unique_id("coord-save-job");
    let initial = fixture_state(&job_id);
    let mut next = initial.clone();
    next.last_error = Some("updated".to_string());

    let runtime = store
        .load_or_initialize(&job_id, initial)
        .await
        .expect("load_or_initialize failed");
    let saved = store
        .save_state(&job_id, runtime.revision, &next)
        .await
        .expect("save_state failed");

    assert!(saved);

    let loaded = store
        .load_or_initialize(&job_id, JobState::new(&job_id, None))
        .await
        .expect("reload failed");
    assert_eq!(loaded.state, next);
    assert_eq!(loaded.revision, 1);

    cleanup(&mut connection, &state_prefix, &execution_prefix).await;
}

#[tokio::test]
#[ignore = "requires SCHEDULER_VALKEY_URL pointing to a reachable Valkey server"]
async fn claim_trigger_script_claims_occurrence_in_valkey() {
    let (state_prefix, execution_prefix, store, mut connection) = new_store("coord-claim").await;
    let job_id = unique_id("coord-claim-job");
    let initial = fixture_state(&job_id);
    let trigger = CoordinatedPendingTrigger {
        scheduled_at: Utc::now(),
        catch_up: false,
        trigger_count: 1,
    };

    let runtime = store
        .load_or_initialize(&job_id, initial.clone())
        .await
        .expect("load_or_initialize failed");
    let claim = store
        .claim_trigger(
            &job_id,
            "resource",
            runtime.revision,
            trigger.clone(),
            &initial,
            lease_config(),
        )
        .await
        .expect("claim_trigger failed")
        .expect("expected claim");

    assert!(!claim.replayed);
    assert_eq!(claim.trigger, trigger);
    assert_eq!(claim.state.revision, 1);

    cleanup(&mut connection, &state_prefix, &execution_prefix).await;
}

#[tokio::test]
#[ignore = "requires SCHEDULER_VALKEY_URL pointing to a reachable Valkey server"]
async fn reclaim_inflight_script_reclaims_expired_occurrence_from_valkey() {
    let (state_prefix, execution_prefix, store, mut connection) = new_store("coord-reclaim").await;
    let job_id = unique_id("coord-reclaim-job");
    let initial = fixture_state(&job_id);
    let trigger = CoordinatedPendingTrigger {
        scheduled_at: Utc::now(),
        catch_up: false,
        trigger_count: 1,
    };
    let config = CoordinatedLeaseConfig {
        ttl: Duration::from_millis(20),
        renew_interval: Duration::from_millis(5),
    };

    let runtime = store
        .load_or_initialize(&job_id, initial.clone())
        .await
        .expect("load_or_initialize failed");
    let claim = store
        .claim_trigger(
            &job_id,
            "resource",
            runtime.revision,
            trigger.clone(),
            &initial,
            config,
        )
        .await
        .expect("claim_trigger failed")
        .expect("expected claim");
    assert!(!claim.replayed);

    tokio::time::sleep(Duration::from_millis(25)).await;

    let replay = store
        .reclaim_inflight(&job_id, "resource", config)
        .await
        .expect("reclaim_inflight failed")
        .expect("expected replay");

    assert!(replay.replayed);
    assert_eq!(replay.trigger, trigger);

    cleanup(&mut connection, &state_prefix, &execution_prefix).await;
}

#[tokio::test]
#[ignore = "requires SCHEDULER_VALKEY_URL pointing to a reachable Valkey server"]
async fn renew_lease_script_extends_live_occurrence_lease() {
    let (state_prefix, execution_prefix, store, mut connection) = new_store("coord-renew").await;
    let job_id = unique_id("coord-renew-job");
    let initial = fixture_state(&job_id);
    let trigger = CoordinatedPendingTrigger {
        scheduled_at: Utc::now(),
        catch_up: false,
        trigger_count: 1,
    };
    let config = lease_config();

    let runtime = store
        .load_or_initialize(&job_id, initial.clone())
        .await
        .expect("load_or_initialize failed");
    let claim = store
        .claim_trigger(
            &job_id,
            "resource",
            runtime.revision,
            trigger,
            &initial,
            config,
        )
        .await
        .expect("claim_trigger failed")
        .expect("expected claim");

    let renewed = store
        .renew(&claim.lease, config)
        .await
        .expect("renew failed");

    assert_eq!(renewed, ExecutionGuardRenewal::Renewed);

    cleanup(&mut connection, &state_prefix, &execution_prefix).await;
}

#[tokio::test]
#[ignore = "requires SCHEDULER_VALKEY_URL pointing to a reachable Valkey server"]
async fn complete_script_clears_inflight_and_commits_state() {
    let (state_prefix, execution_prefix, store, mut connection) = new_store("coord-complete").await;
    let job_id = unique_id("coord-complete-job");
    let initial = fixture_state(&job_id);
    let trigger = CoordinatedPendingTrigger {
        scheduled_at: Utc::now(),
        catch_up: false,
        trigger_count: 1,
    };
    let config = lease_config();

    let runtime = store
        .load_or_initialize(&job_id, initial.clone())
        .await
        .expect("load_or_initialize failed");
    let claim = store
        .claim_trigger(
            &job_id,
            "resource",
            runtime.revision,
            trigger,
            &initial,
            config,
        )
        .await
        .expect("claim_trigger failed")
        .expect("expected claim");
    let mut final_state = initial.clone();
    final_state.last_error = Some("completed".to_string());

    let completed = store
        .complete(&job_id, claim.state.revision, &claim.lease, &final_state)
        .await
        .expect("complete failed");

    assert!(completed);

    let loaded = store
        .load_or_initialize(&job_id, JobState::new(&job_id, None))
        .await
        .expect("reload failed");
    assert_eq!(loaded.state, final_state);
    assert_eq!(loaded.revision, 2);

    cleanup(&mut connection, &state_prefix, &execution_prefix).await;
}

#[tokio::test]
#[ignore = "requires SCHEDULER_VALKEY_URL pointing to a reachable Valkey server"]
async fn pause_script_marks_runtime_as_paused() {
    let (state_prefix, execution_prefix, store, mut connection) = new_store("coord-pause").await;
    let job_id = unique_id("coord-pause-job");

    store
        .load_or_initialize(&job_id, fixture_state(&job_id))
        .await
        .expect("load_or_initialize failed");

    let changed = store.pause(&job_id).await.expect("pause failed");
    let unchanged = store.pause(&job_id).await.expect("second pause failed");

    assert!(changed);
    assert!(!unchanged);

    let loaded = store
        .load_or_initialize(&job_id, JobState::new(&job_id, None))
        .await
        .expect("reload failed");
    assert!(loaded.paused);

    cleanup(&mut connection, &state_prefix, &execution_prefix).await;
}

#[tokio::test]
#[ignore = "requires SCHEDULER_VALKEY_URL pointing to a reachable Valkey server"]
async fn resume_script_unpauses_runtime() {
    let (state_prefix, execution_prefix, store, mut connection) = new_store("coord-resume").await;
    let job_id = unique_id("coord-resume-job");

    store
        .load_or_initialize(&job_id, fixture_state(&job_id))
        .await
        .expect("load_or_initialize failed");
    store.pause(&job_id).await.expect("pause failed");

    let changed = store.resume(&job_id).await.expect("resume failed");
    let unchanged = store.resume(&job_id).await.expect("second resume failed");

    assert!(changed);
    assert!(!unchanged);

    let loaded = store
        .load_or_initialize(&job_id, JobState::new(&job_id, None))
        .await
        .expect("reload failed");
    assert!(!loaded.paused);

    cleanup(&mut connection, &state_prefix, &execution_prefix).await;
}