cloudiful-scheduler
cloudiful-scheduler is a single-job async scheduling library for background ingestion tasks.
The published package name is cloudiful-scheduler, while the Rust library import stays scheduler.
Links:
- GitHub: https://github.com/cloudiful/scheduler
- docs.rs: https://docs.rs/cloudiful-scheduler
- crates.io: https://crates.io/crates/cloudiful-scheduler
Version 0.3.5 exposes:
- explicit schedules via
Schedule::Interval,Schedule::AtTimes, orSchedule::Cron - job-level execution windows via
JobTimeWindow - missed-run handling via
MissedRunPolicy - overlap control via
OverlapPolicy - persistent job state via
StateStore - optional distributed per-occurrence execution leases via
ExecutionGuard - optional terminal-state cleanup via
TerminalStatePolicy - optional runtime observation via
SchedulerObserver - bounded execution history via
SchedulerReport
Optional features:
valkey-guardaddsValkeyExecutionGuardfor Valkey-backed execution leases keyed byjob_id + scheduled_at.valkey-storeaddsValkeyStateStorefor Valkey-backed state persistence.ValkeyStateStore::resilient(...)permanently downgrades to an in-process mirror after connection-class failures.
The scheduler is responsible only for deciding when to trigger work. Domain-specific recovery, cursor logic, and idempotent refresh commands stay in the caller.
State recovery is keyed only by job_id. StateStore does not provide distributed locking or leader election by itself; use ExecutionGuard when multiple scheduler instances may see the same trigger.
Add the crate
[]
= { = "cloudiful-scheduler", = "0.3.5" }
= "0.4"
= "0.10"
= { = "1", = ["macros", "rt-multi-thread", "time"] }
Enable Valkey-backed state persistence:
[]
= { = "cloudiful-scheduler", = "0.3.5", = ["valkey-store"] }
= { = "1", = ["macros", "rt-multi-thread", "time"] }
Enable Valkey-backed execution leases:
[]
= { = "cloudiful-scheduler", = "0.3.5", = ["valkey-guard"] }
= { = "1", = ["macros", "rt-multi-thread", "time"] }
If you need to consume a tagged GitHub release directly:
[]
= { = "cloudiful-scheduler", = "https://github.com/cloudiful/scheduler.git", = "v0.3.5" }
Core concepts
Scheduler::new(config, store)creates the runtime.Scheduler::with_observer(config, store, observer)attaches structured runtime events.Scheduler::with_log_observer(config, store)adapts runtime events to thelogcrate.Scheduler::with_execution_guard(config, store, guard)adds distributed per-occurrence mutual exclusion.Scheduler::with_observer_and_execution_guard(config, store, observer, guard)combines both.Task::from_async(task)defines an async task from the fullTaskContext.Task::from_sync(task)defines a lightweight synchronous task from the fullTaskContext.Task::from_blocking(task)defines a blocking synchronous task viatokio::task::spawn_blocking.Job::without_deps(job_id, schedule, task)defines a task with no injected dependencies.Job::new(job_id, schedule, deps, task)defines a task with explicit injecteddeps.Job::with_time_window(window)restricts execution to local weekdays and time segments.Scheduler::run(job)runs until the schedule finishes or a control handle requests cancel or shutdown.SchedulerHandle::cancel()stops while waiting.SchedulerHandle::shutdown()stops accepting new work and waits for the current run to finish.SchedulerHandle::pause().awaitstops future triggers without interrupting the current run.SchedulerHandle::resume().awaitwakes the scheduler immediately and resumes with the configured missed-run policy.
Dependency injection in this crate is explicit: you pass a dependency value at job construction time. The scheduler does not auto-resolve arbitrary function parameters.
Example: async task without dependencies
use Duration;
use ;
async
Example: observe runtime events
use Duration;
use ;
async
Example: cron schedule
Cron expressions use the standard 5-field form: minute, hour, day-of-month, month, day-of-week.
The expression is evaluated in SchedulerConfig::timezone.
use ;
async
Example: job execution window
use ;
use Shanghai;
use ;
let window = JobTimeWindow ;
let scheduler = new;
let job = without_deps
.with_time_window;
let report = scheduler.run.await.unwrap;
println!;
Example: task with RunContext
use ;
use Shanghai;
use ;
async
Example: injected dependencies
Use deps to carry any number of business parameters as a struct or tuple.
use Arc;
use ;
async
Example: recover state across restarts
Share the same store instance across scheduler instances to resume from next_run_at.
pause()/resume() follow the same split: legacy schedulers pause only the local instance, while coordinated schedulers persist a shared paused flag per job_id.
use Arc;
use ;
use Shanghai;
use ;
async
Example: Valkey-backed state
This persists JobState in a single key per job_id. It improves restart recovery, but does not by itself provide distributed locking across multiple scheduler instances.
The current Rust client still uses the redis:// URI scheme for RESP servers, including Valkey.
By default, keys are written under the scheduler:valkey:job-state: prefix. Loads also check the legacy scheduler:job-state: prefix so existing persisted state can still be resumed.
For connection-class failures, ValkeyStateStore::resilient(...) downgrades to an in-memory mirror. Codec/data errors still fail the run.
use Duration;
use ;
async
Example: Valkey-backed execution guard
This keeps StateStore and distributed mutual exclusion separate. The guard lease key is based on job_id + scheduled_at, so OverlapPolicy::AllowParallel can still run different occurrences concurrently.
use Duration;
use ;
async
Integration test
The Valkey integration tests are marked ignored so normal CI stays hermetic. Run them explicitly with a reachable server:
SCHEDULER_VALKEY_URL=redis://127.0.0.1:6379/
For the execution guard integration tests:
SCHEDULER_VALKEY_URL=redis://127.0.0.1:6379/
In Gitea Actions, set the SCHEDULER_VALKEY_URL secret to enable the external Valkey integration test step in .gitea/workflows/ci.yml.
Scheduling semantics
Schedule::AtTimesnever executes immediately on startup. The first run waits until the first planned time.Schedule::AtTimes(Vec::new())is a valid no-op schedule and exits without running.Schedule::Intervalschedules the first run atnow + interval.Schedule::Cronevaluates a standard 5-field expression inSchedulerConfig::timezone.max_runsapplies to interval schedules, explicitAtTimesschedules, and cron schedules.with_max_runs(0)exits immediately without running any task.Task::from_syncis for lightweight synchronous logic. UseTask::from_blockingfor blocking I/O or CPU-heavy synchronous work.MissedRunPolicy::Skipdrops missed occurrences and waits for the next future trigger.MissedRunPolicy::CatchUpOnceruns one immediate compensating execution for missed occurrences.MissedRunPolicy::ReplayAllreplays each missed occurrence in schedule order.OverlapPolicy::Forbidskips triggers while a run is active.OverlapPolicy::QueueOnekeeps at most one pending trigger while a run is active.OverlapPolicy::AllowParallelspawns overlapping runs.JobTimeWindowis checked at execution time; outside-window occurrences are consumed, skipped, and reported asoutside_time_window.SchedulerConfig::timezoneis forwarded toRunContext, drives cron evaluation, and does not rewrite absoluteAtTimestimestamps.- State recovery is keyed by
job_id; restarting with the samejob_idresumes from the storednext_run_at. ExecutionGuardis keyed by trigger occurrence, not only byjob_id, so differentscheduled_atvalues can acquire separate leases.- Corrupted persisted interval state with
next_run_at = Noneis repaired automatically if the job is not actually terminal. SchedulerConfig::terminal_state_policy = Deleteremoves persisted terminal state once the job finishes.ResilientStateStoremasks connection-class failures by switching permanently to its in-process mirror; degradation can be observed viaSchedulerObserver.StateStoreandExecutionGuardare intentionally separate: state persistence does not imply distributed mutual exclusion.- Jobs without
JobTimeWindowkeep the existing behavior.