# cloudiful-scheduler
`cloudiful-scheduler` is a single-job async scheduling library for background ingestion tasks.
The published package name is `cloudiful-scheduler`, while the Rust library import stays `scheduler`.
Links:
- GitHub: <https://github.com/cloudiful/scheduler>
- docs.rs: <https://docs.rs/cloudiful-scheduler>
- crates.io: <https://crates.io/crates/cloudiful-scheduler>
Version `0.4.1` exposes:
- explicit schedules via `Schedule::Interval`, `Schedule::AtTimes`, or `Schedule::Cron`
- job-level execution windows via `JobTimeWindow`
- missed-run handling via `MissedRunPolicy`
- overlap control via `OverlapPolicy`
- persistent job state via `StateStore`
- optional distributed per-occurrence execution leases via `ExecutionGuard`
- optional terminal-state cleanup via `TerminalStatePolicy`
- optional runtime observation via `SchedulerObserver`
- bounded execution history via `SchedulerReport`
- pause / resume control via `SchedulerHandle`
Optional features:
- `valkey-guard` adds `ValkeyExecutionGuard` for Valkey-backed execution leases keyed by `job_id + scheduled_at`.
- `valkey-store` adds `ValkeyStateStore` for Valkey-backed state persistence.
- `ValkeyStateStore::resilient(...)` permanently downgrades to an in-process mirror after connection-class failures.
The scheduler is responsible only for deciding when to trigger work. Domain-specific recovery, cursor logic, and idempotent refresh commands stay in the caller.
State recovery is keyed only by `job_id`. `StateStore` does not provide distributed locking or leader election by itself; use `ExecutionGuard` when multiple scheduler instances may see the same trigger.
## Add the crate
```toml
[dependencies]
scheduler = { package = "cloudiful-scheduler", version = "0.4.1" }
chrono = "0.4"
chrono-tz = "0.10"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
```
Enable Valkey-backed state persistence:
```toml
[dependencies]
scheduler = { package = "cloudiful-scheduler", version = "0.4.1", features = ["valkey-store"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
```
Enable Valkey-backed execution leases:
```toml
[dependencies]
scheduler = { package = "cloudiful-scheduler", version = "0.4.1", features = ["valkey-guard"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
```
If you need to consume a tagged GitHub release directly:
```toml
[dependencies]
scheduler = { package = "cloudiful-scheduler", git = "https://github.com/cloudiful/scheduler.git", tag = "v0.4.1" }
```
## Core concepts
- `Scheduler::new(config, store)` creates the runtime.
- `Scheduler::with_observer(config, store, observer)` attaches structured runtime events.
- `Scheduler::with_log_observer(config, store)` adapts runtime events to the `log` crate.
- `Scheduler::with_execution_guard(config, store, guard)` adds distributed per-occurrence mutual exclusion.
- `Scheduler::with_observer_and_execution_guard(config, store, observer, guard)` combines both.
- `Task::from_async(task)` defines an async task from the full `TaskContext`.
- `Task::from_sync(task)` defines a lightweight synchronous task from the full `TaskContext`.
- `Task::from_blocking(task)` defines a blocking synchronous task via `tokio::task::spawn_blocking`.
- `Job::without_deps(job_id, schedule, task)` defines a task with no injected dependencies.
- `Job::new(job_id, schedule, deps, task)` defines a task with explicit injected `deps`.
- `Job::with_time_window(window)` restricts execution to local weekdays and time segments.
- `Scheduler::run(job)` runs until the schedule finishes or a control handle requests cancel or shutdown.
- `SchedulerHandle::cancel()` stops while waiting.
- `SchedulerHandle::shutdown()` stops accepting new work and waits for the current run to finish.
- `SchedulerHandle::pause().await` stops future triggers without interrupting the current run.
- `SchedulerHandle::resume().await` wakes the scheduler immediately and resumes with the configured missed-run policy.
Dependency injection in this crate is explicit: you pass a dependency value at job construction time. The scheduler does not auto-resolve arbitrary function parameters.
## Example: async task without dependencies
```rust
use std::time::Duration;
use scheduler::{InMemoryStateStore, Job, Schedule, Scheduler, SchedulerConfig, Task};
#[tokio::main]
async fn main() {
let scheduler = Scheduler::new(SchedulerConfig::default(), InMemoryStateStore::new());
let job = Job::without_deps(
"refresh-cache",
Schedule::Interval(Duration::from_secs(5)),
Task::from_async(|_| async move {
// Call your async command here.
Ok(())
}),
)
.with_max_runs(1);
let report = scheduler.run(job).await.unwrap();
println!("history length: {}", report.history.len());
}
```
## Example: observe runtime events
```rust
use std::time::Duration;
use scheduler::{InMemoryStateStore, Job, LogObserver, Schedule, Scheduler, SchedulerConfig, Task};
#[tokio::main]
async fn main() {
let scheduler = Scheduler::with_observer(
SchedulerConfig::default(),
InMemoryStateStore::new(),
LogObserver,
);
let job = Job::without_deps(
"observer-demo",
Schedule::Interval(Duration::from_secs(5)),
Task::from_async(|_| async move { Ok(()) }),
)
.with_max_runs(1);
let _ = scheduler.run(job).await.unwrap();
}
```
## Example: cron schedule
Cron expressions use the standard 5-field form: minute, hour, day-of-month, month, day-of-week.
The expression is evaluated in `SchedulerConfig::timezone`.
```rust
use scheduler::{CronSchedule, InMemoryStateStore, Job, Schedule, Scheduler, SchedulerConfig, Task};
#[tokio::main]
async fn main() {
let scheduler = Scheduler::new(SchedulerConfig::default(), InMemoryStateStore::new());
let job = Job::without_deps(
"market-open-check",
Schedule::Cron(CronSchedule::parse("*/5 9-15 * * Mon-Fri").unwrap()),
Task::from_async(|_| async move {
// Call your async command here.
Ok(())
}),
)
.with_max_runs(1);
let report = scheduler.run(job).await.unwrap();
println!("history length: {}", report.history.len());
}
```
## Example: job execution window
```rust
use chrono::{NaiveTime, TimeDelta, Utc, Weekday};
use chrono_tz::Asia::Shanghai;
use scheduler::{InMemoryStateStore, Job, JobTimeWindow, Schedule, Scheduler, SchedulerConfig, Task, TimeWindowSegment};
let window = JobTimeWindow {
timezone: Some(Shanghai),
weekdays: vec![Weekday::Mon, Weekday::Tue, Weekday::Wed, Weekday::Thu, Weekday::Fri],
segments: vec![
TimeWindowSegment::new(
NaiveTime::from_hms_opt(9, 0, 0).unwrap(),
NaiveTime::from_hms_opt(12, 0, 0).unwrap(),
),
TimeWindowSegment::new(
NaiveTime::from_hms_opt(13, 0, 0).unwrap(),
NaiveTime::from_hms_opt(18, 0, 0).unwrap(),
),
],
};
let scheduler = Scheduler::new(SchedulerConfig::default(), InMemoryStateStore::new());
let job = Job::without_deps(
"windowed-job",
Schedule::AtTimes(vec![Utc::now().with_timezone(&Shanghai) + TimeDelta::seconds(5)]),
Task::from_async(|_| async move { Ok(()) }),
)
.with_time_window(window);
let report = scheduler.run(job).await.unwrap();
println!("skip reason: {:?}", report.last_skip_reason);
```
## Example: task with RunContext
```rust
use chrono::{TimeDelta, Utc};
use chrono_tz::Asia::Shanghai;
use scheduler::{
InMemoryStateStore, Job, MissedRunPolicy, OverlapPolicy, Schedule, Scheduler,
SchedulerConfig, Task,
};
#[tokio::main]
async fn main() {
let scheduler = Scheduler::new(SchedulerConfig::default(), InMemoryStateStore::new());
let job = Job::without_deps(
"refresh-a-shares",
Schedule::AtTimes(vec![
Utc::now().with_timezone(&Shanghai) + TimeDelta::seconds(5),
Utc::now().with_timezone(&Shanghai) + TimeDelta::seconds(10),
]),
Task::from_async(|context| async move {
println!("scheduled for {}", context.run.scheduled_at);
// Call your idempotent refresh command here.
Ok(())
}),
)
.with_missed_run_policy(MissedRunPolicy::CatchUpOnce)
.with_overlap_policy(OverlapPolicy::Forbid);
let report = scheduler.run(job).await.unwrap();
println!("final state: {:?}", report.state);
println!("history length: {}", report.history.len());
}
```
## Example: injected dependencies
Use `deps` to carry any number of business parameters as a struct or tuple.
```rust
use std::sync::Arc;
use scheduler::{InMemoryStateStore, Job, Schedule, Scheduler, SchedulerConfig, Task, TaskContext};
#[derive(Debug)]
struct RefreshDeps {
market: &'static str,
}
#[tokio::main]
async fn main() {
let scheduler = Scheduler::new(SchedulerConfig::default(), InMemoryStateStore::new());
let job = Job::new(
"refresh-market",
Schedule::AtTimes(Vec::new()),
RefreshDeps { market: "A-share" },
Task::from_async(|context: TaskContext<RefreshDeps>| async move {
let deps: Arc<RefreshDeps> = context.deps.clone();
println!("market: {}", deps.market);
println!("scheduled for {}", context.run.scheduled_at);
Ok(())
}),
);
let report = scheduler.run(job).await.unwrap();
println!("history length: {}", report.history.len());
}
```
## Example: recover state across restarts
Share the same store instance across scheduler instances to resume from `next_run_at`.
`pause()/resume()` follow the same split: legacy schedulers pause only the local instance, while coordinated schedulers persist a shared paused flag per `job_id`.
```rust
use std::sync::Arc;
use chrono::{TimeDelta, Utc};
use chrono_tz::Asia::Shanghai;
use scheduler::{InMemoryStateStore, Job, Schedule, Scheduler, SchedulerConfig, Task};
#[tokio::main]
async fn main() {
let store = Arc::new(InMemoryStateStore::new());
let scheduler = Scheduler::new(SchedulerConfig::default(), store.clone());
let job = Job::without_deps(
"resume-me",
Schedule::AtTimes(vec![Utc::now().with_timezone(&Shanghai) + TimeDelta::seconds(30)]),
Task::from_async(|_| async move { Ok(()) }),
);
let _ = scheduler.run(job).await;
let scheduler = Scheduler::new(SchedulerConfig::default(), store.clone());
let job = Job::without_deps(
"resume-me",
Schedule::AtTimes(vec![Utc::now().with_timezone(&Shanghai) + TimeDelta::seconds(30)]),
Task::from_async(|_| async move { Ok(()) }),
);
let _ = scheduler.run(job).await;
}
```
## Example: Valkey-backed state
This persists `JobState` in a single key per `job_id`. It improves restart recovery, but does not by itself provide distributed locking across multiple scheduler instances.
The current Rust client still uses the `redis://` URI scheme for RESP servers, including Valkey.
By default, keys are written under the `scheduler:valkey:job-state:` prefix. Loads also check the legacy `scheduler:job-state:` prefix so existing persisted state can still be resumed.
For connection-class failures, `ValkeyStateStore::resilient(...)` downgrades to an in-memory mirror. Codec/data errors still fail the run.
```rust
use std::time::Duration;
use scheduler::{Job, Schedule, Scheduler, SchedulerConfig, Task, ValkeyStateStore};
#[tokio::main]
async fn main() {
let store = ValkeyStateStore::new("redis://127.0.0.1/").await.unwrap();
let scheduler = Scheduler::new(SchedulerConfig::default(), store);
let job = Job::without_deps(
"refresh-cache",
Schedule::Interval(Duration::from_secs(5)),
Task::from_async(|_| async move { Ok(()) }),
)
.with_max_runs(1);
let report = scheduler.run(job).await.unwrap();
println!("next run: {:?}", report.state.next_run_at);
}
```
## Example: Valkey-backed execution guard
This keeps `StateStore` and distributed mutual exclusion separate. The guard lease key is based on `job_id + scheduled_at`, so `OverlapPolicy::AllowParallel` can still run different occurrences concurrently.
```rust
use std::time::Duration;
use scheduler::{
InMemoryStateStore, Job, Schedule, Scheduler, SchedulerConfig, Task, ValkeyExecutionGuard,
ValkeyLeaseConfig,
};
#[tokio::main]
async fn main() {
let guard = ValkeyExecutionGuard::new(
"redis://127.0.0.1/",
ValkeyLeaseConfig {
ttl: Duration::from_secs(30),
renew_interval: Duration::from_secs(10),
},
)
.await
.unwrap();
let scheduler = Scheduler::with_execution_guard(
SchedulerConfig::default(),
InMemoryStateStore::new(),
guard,
);
let job = Job::without_deps(
"refresh-cache",
Schedule::Interval(Duration::from_secs(5)),
Task::from_async(|_| async move { Ok(()) }),
)
.with_max_runs(1);
let report = scheduler.run(job).await.unwrap();
println!("history length: {}", report.history.len());
}
```
## Integration test
The Valkey integration tests are marked `ignored` so normal CI stays hermetic. Run them explicitly with a reachable server:
```bash
SCHEDULER_VALKEY_URL=redis://127.0.0.1:6379/ cargo test --features valkey-store --test valkey_store -- --ignored
```
For the execution guard integration tests:
```bash
SCHEDULER_VALKEY_URL=redis://127.0.0.1:6379/ cargo test --features valkey-guard --test execution_guard -- --ignored
```
In Gitea Actions, set the `SCHEDULER_VALKEY_URL` secret to enable the external Valkey integration test step in `.gitea/workflows/ci.yml`.
## Scheduling semantics
- `Schedule::AtTimes` never executes immediately on startup. The first run waits until the first planned time.
- `Schedule::AtTimes(Vec::new())` is a valid no-op schedule and exits without running.
- `Schedule::Interval` schedules the first run at `now + interval`.
- `Schedule::Cron` evaluates a standard 5-field expression in `SchedulerConfig::timezone`.
- `max_runs` applies to interval schedules, explicit `AtTimes` schedules, and cron schedules.
- `with_max_runs(0)` exits immediately without running any task.
- `Task::from_sync` is for lightweight synchronous logic. Use `Task::from_blocking` for blocking I/O or CPU-heavy synchronous work.
- `MissedRunPolicy::Skip` drops missed occurrences and waits for the next future trigger.
- `MissedRunPolicy::CatchUpOnce` runs one immediate compensating execution for missed occurrences.
- `MissedRunPolicy::ReplayAll` replays each missed occurrence in schedule order.
- `OverlapPolicy::Forbid` skips triggers while a run is active.
- `OverlapPolicy::QueueOne` keeps at most one pending trigger while a run is active.
- `OverlapPolicy::AllowParallel` spawns overlapping runs.
- `JobTimeWindow` is checked at execution time; outside-window occurrences are consumed, skipped, and reported as `outside_time_window`.
- `SchedulerConfig::timezone` is forwarded to `RunContext`, drives cron evaluation, and does not rewrite absolute `AtTimes` timestamps.
- State recovery is keyed by `job_id`; restarting with the same `job_id` resumes from the stored `next_run_at`.
- `ExecutionGuard` is keyed by trigger occurrence, not only by `job_id`, so different `scheduled_at` values can acquire separate leases.
- Corrupted persisted interval state with `next_run_at = None` is repaired automatically if the job is not actually terminal.
- `SchedulerConfig::terminal_state_policy = Delete` removes persisted terminal state once the job finishes.
- `ResilientStateStore` masks connection-class failures by switching permanently to its in-process mirror; degradation can be observed via `SchedulerObserver`.
- `StateStore` and `ExecutionGuard` are intentionally separate: state persistence does not imply distributed mutual exclusion.
- Jobs without `JobTimeWindow` keep the existing behavior.