TurnKeeper
TurnKeeper is a flexible, asynchronous recurring job scheduler for Rust built on the Tokio runtime. It allows scheduling tasks based on various time specifications, handles retries with exponential backoff or fixed delays, supports job cancellation and updates, and provides observability through metrics and state querying.
It uses a central coordinator task and a configurable pool of worker tasks, communicating via efficient asynchronous channels.
Features
- Flexible Scheduling: Schedule jobs using:
- Multiple
(Weekday, NaiveTime)pairs (interpreted as UTC) viafrom_week_day. - Standard CRON expressions (interpreted as UTC) via
from_cron(requires thecron_schedulefeature). - Fixed intervals (e.g., every 5 minutes) via
from_interval. - One-time execution at a specific
DateTime<Utc>viafrom_once. - No automatic scheduling via
never(requires explicit trigger or initial run time).
- Multiple
- Configurable Retries: Set maximum retry attempts for failed jobs with exponential backoff (default) or fixed delays via
with_fixed_retry_delay. - Concurrent Execution: Run multiple jobs concurrently using a configurable worker pool (
max_workers). - Flexible Scheduling Backend: Choose between:
BinaryHeap(Standard Library): Minimal dependencies, cancellation checks occur lazily when a job is next to run. No efficient job updates supported.HandleBased(priority-queuecrate): Supports proactive cancellation removal (O log n). Required for job updates (update_job). Adds dependency (requirespriority_queue_handle_basedfeature, enabled by default).
- Asynchronous API: Designed for integration into
tokio-based applications. Providesasync, blocking (add_job), and non-blocking (try_add_job) submission methods. - Non-Blocking Submission:
try_add_jobprovides backpressure signaling if the internal buffer is full, returningErr(SubmitError::StagingFull). - Job Management:
- Request cancellation of job lineages (
cancel_job). Operation is idempotent. - Update existing job schedules or retry configurations (
update_job, requiresHandleBasedPQ). - Manually trigger a job to run immediately (
trigger_job_now).
- Request cancellation of job lineages (
- Observability:
- Query job details (
get_job_details) and list summaries (list_all_jobs). SeeJobDetailswhich includes theScheduletype. - Retrieve internal metrics snapshots (
get_metrics_snapshot). SeeMetricsSnapshotfor available counters/gauges. - Integrates with the
tracingecosystem for detailed logging.
- Query job details (
- Helper Macro:
job_fn!macro simplifies creating job execution functions. - Job Context: (Optional
job_contextfeature) Access job lineage ID and instance ID within the execution function via task-locals. - Graceful & Forced Shutdown: Control scheduler termination with optional timeouts.
Installation
Add TurnKeeper and its core dependencies to your Cargo.toml. Select features as needed.
[]
= { = "1.1.0", = ["full"] } # Use "full" or select features individually
= { = "1", = ["rt-multi-thread", "macros", "time"] } # Or features needed by your app
= { = "0.4", = ["serde"] } # If using serde feature
= { = "1", = ["v4", "serde"] } # If using serde feature
# Optional, but recommended for logging/debugging
= "0.1"
= { = "0.3", = ["env-filter"] }
# Optional dependencies controlled by features:
# cron = { version = "0.12", optional = true } # Needed by "cron_schedule" feature
# priority-queue = { version = "2", optional = true } # Needed by "priority_queue_handle_based" feature (default)
# serde = { version = "1.0", features = ["derive"], optional = true } # Needed by "serde" feature
TurnKeeper's features:
full: Enablesdefault,cron_schedule, andserde.default: Enablesjob_contextandpriority_queue_handle_based.job_context: Enables task-local job context access.priority_queue_handle_based: Enables theHandleBasedpriority queue (required forupdate_job).cron_schedule: EnablesTKJobRequest::from_cron(requirescrondependency).serde: Enables Serde support for query result types (JobDetails,JobSummary,MetricsSnapshot) and some internal types (requiresserdedependency).
Quick Start Example
use ;
use ;
use Duration as StdDuration;
use ;
use Arc;
use ; // Use tracing macros
async
Configuration (SchedulerBuilder)
Use TurnKeeper::builder() to configure the scheduler before starting it via .build():
.max_workers(usize): Required. Sets the maximum number of concurrently running jobs (must be > 0)..priority_queue(PriorityQueueType): Optional. Choose betweenBinaryHeapandHandleBased(default, requirespriority_queue_handle_basedfeature). SeePriorityQueueTypedocs for functional differences..staging_buffer_size(usize): Optional. Size of the incoming job submission buffer. Default: 128..command_buffer_size(usize): Optional. Size of the internal command buffer (queries, etc.). Default: 128..job_dispatch_buffer_size(usize): Optional. Size of the coordinator-to-worker dispatch channel. Must be >= 1. Default: 1 (provides backpressure).
Defining Jobs (TKJobRequest)
Create a TKJobRequest using specific constructors:
from_week_day(...): Takesname,Vec<(Weekday, NaiveTime)>(schedule),max_retries. Schedule times are interpreted as UTC.from_cron(...): Takesname,&str(cron expression),max_retries. Requires thecron_schedulefeature. Expression interpreted as UTC.from_interval(...): Takesname,std::time::Duration(interval),max_retries. Interval starts after the previous scheduled/run time.from_once(...): Takesname,DateTime<Utc>(run time),max_retries.never(...): Takesname,max_retries. Job has no automatic schedule.with_fixed_retry_delay(...): Alternative constructor that takes aScheduleand aStdDurationfor fixed retry delays.
Use .with_initial_run_time(DateTime<Utc>) to set a specific first execution time. This overrides the schedule calculation for the first run and is required for Schedule::Never jobs to run without a manual trigger.
The schedule type itself is defined by the Schedule enum.
Job Function (BoxedExecFn)
The function executed by the worker must match the BoxedExecFn type alias:
use Pin;
use Future;
type BoxedExecFn = ;
- It must be an
asyncfunction or closure returning aPin<Box<dyn Future>>. Thejob_fn!macro simplifies this. - The
Futuremust resolve tobool(true= success,false= logical failure). - The function/closure and the
Futuremust beSend + Sync + 'static. UseArcfor shared state captured by closures. - Panics within the function are caught and treated as failures by the scheduler, triggering retries if configured.
API Highlights
See the API Reference Documentation on docs.rs for full details.
add_job_async/try_add_job/add_job: Submit jobs usingTKJobRequest, returnsResult<TKJobId, SubmitError>. IDs (TKJobId) are generated before sending to the coordinator.cancel_job: Request lineage cancellation byTKJobId.update_job: Update schedule/retries for existing job (HandleBasedPQ required).trigger_job_now: Manually run a job instance now.get_job_details/list_all_jobs: Query job status byTKJobIdor list all. Returns details including theSchedule.get_metrics_snapshot: Get performance counters and gauges. Includes distinct counts for lineage cancellation vs. discarded instances.shutdown_graceful/shutdown_force: Control termination with optional timeout.
Cancellation & Updates
cancel_jobmarks a job lineage (TKJobId) for cancellation.- If using
PriorityQueueType::HandleBased(default), the scheduler attempts to proactively remove the currently scheduled instance from the queue (O log n). - If using
PriorityQueueType::BinaryHeap, the scheduled instance is only discarded when it reaches the front of the queue and is checked before dispatch. update_joballows changing theScheduleandmax_retriesof a non-cancelled job. Requires theHandleBasedPQ type. If the schedule changes, the next instance is rescheduled accordingly.
License
This project is licensed under the Mozilla Public License Version 2.0 (LICENSE or https://opensource.org/licenses/MPL-2.0).