TurnKeeper
TurnKeeper is a flexible, asynchronous recurring job scheduler for Rust built on Tokio. It allows scheduling tasks based on various time specifications, handles retries with exponential backoff, supports job cancellation, and provides observability through metrics and state querying.
It uses a central coordinator task and a configurable pool of worker tasks, communicating via efficient asynchronous channels.
Features
- Flexible Scheduling: Schedule jobs using:
- Multiple
(Weekday, NaiveTime)pairs (UTC) viafrom_week_day. - Standard CRON expressions (UTC interpretation) via
from_cron. Requires thecroncrate. - Fixed intervals (e.g., every 5 minutes) via
from_interval. - One-time execution at a specific
DateTime<Utc>viafrom_once. - No automatic scheduling via
never(requires explicit trigger or initial run time).
- Multiple
- Configurable Retries: Set maximum retry attempts for failed jobs with exponential backoff.
- Concurrent Execution: Run multiple jobs concurrently using a configurable worker pool (
max_workers). - Flexible Scheduling Backend: Choose between:
BinaryHeap(Standard Library): Minimal dependencies, cancellation checks occur when job is next to run.HandleBased(priority-queuecrate): Supports proactive cancellation removal (O(log n)), enables efficient job updates (adds dependency).
- Asynchronous API: Designed for integration into
tokio-based applications. - Non-Blocking Submission:
try_add_jobprovides backpressure signaling if the internal buffer is full, returningErr(SubmitError::StagingFull). - Job Cancellation: Request cancellation of job lineages via
cancel_job. Operation is idempotent. - Observability:
- Query job details (
get_job_details) and list summaries (list_all_jobs). SeeJobDetailswhich includes theScheduletype. - Retrieve internal metrics snapshots (
get_metrics_snapshot). SeeMetricsSnapshotfor available counters/gauges. - Integrates with the
tracingecosystem for detailed logging.
- Query job details (
- Graceful & Forced Shutdown: Control scheduler termination with optional timeouts.
Installation
Add TurnKeeper and its core dependencies to your Cargo.toml:
[]
= "0.1.0" # TODO: Replace with the actual desired version
= { = "1", = ["rt-multi-thread", "macros", "time"] } # Tokio is required
= { = "0.4", = ["serde"] } # For time/date handling & Serde support in job types
= { = "1", = ["v4", "serde"] } # For job IDs & Serde support
# Required for Cron scheduling and potentially other features
= "0.12" # Or latest compatible version
# Optional, but recommended for logging/debugging
= "0.1"
= { = "0.3", = ["env-filter"] }
# Required if using PriorityQueueType::HandleBased (default)
= "1.3" # Or latest compatible version
# Optional for serde support in job detail/summary structs
= { = "1.0", = ["derive"], = true }
[]
= []
# Enable this feature if you need JobDetails/JobSummary serialization
= ["dep:serde", "chrono/serde", "uuid/serde"]
Ensure you have the necessary Tokio features enabled for your application.
Add the cron crate.
Enable the serde feature if you need serialization for query results.
Quick Start Example
use ;
use ;
use Duration as StdDuration;
use ;
use Arc;
use ; // Use tracing macros
async
Configuration (SchedulerBuilder)
Use TurnKeeper::builder() to configure the scheduler before starting it via .build():
.max_workers(usize): Required. Sets the maximum number of concurrently running jobs (must be > 0)..priority_queue(PriorityQueueType): Optional. Choose betweenBinaryHeapandHandleBased(default). SeePriorityQueueTypedocs for functional differences..staging_buffer_size(usize): Optional. Size of the incoming job submission buffer. Default: 128..command_buffer_size(usize): Optional. Size of the internal command buffer (queries, etc.). Default: 128..job_dispatch_buffer_size(usize): Optional. Size of the coordinator-to-worker dispatch channel. Must be >= 1. Default: 1 (provides backpressure).
Defining Jobs (RecurringJobRequest)
Create a RecurringJobRequest using specific constructors:
from_week_day(...): Takesname,Vec<(Weekday, NaiveTime)>(schedule),max_retries. Schedule is UTC.from_cron(...): Takesname,&str(cron expression),max_retries. Requirescroncrate.from_interval(...): Takesname,std::time::Duration(interval),max_retries. Interval starts after the previous scheduled/run time.from_once(...): Takesname,DateTime<Utc>(run time),max_retries.never(...): Takesname,max_retries. Job has no automatic schedule.
Use .with_initial_run_time(DateTime<Utc>) to set a specific first execution time. This overrides the schedule calculation for the first run and is required for Schedule::Never jobs to run at all.
The schedule type itself is defined by the Schedule enum.
Job Function (BoxedExecFn)
The function executed by the worker must match the BoxedExecFn type alias:
use Pin;
use Future;
type BoxedExecFn = ;
- It must be an
asyncfunction or closure returning aPin<Box<dyn Future>>. - The
Futuremust resolve tobool(true= success,false= logical failure). - The function/closure and the
Futuremust beSend + Sync + 'static. UseArcfor shared state captured by closures. - Panics within the function are caught and treated as failures by the scheduler.
API Highlights
See the API Reference Documentation (or link to docs.rs) for full details.
add_job_async/try_add_job: Submit jobs usingRecurringJobRequest, returnsResult<RecurringJobId, SubmitError>.cancel_job: Request lineage cancellation byRecurringJobId.get_job_details/list_all_jobs: Query job status byRecurringJobIdor list all. Returns details including theSchedule.get_metrics_snapshot: Get performance counters and gauges.shutdown_graceful/shutdown_force: Control termination with optional timeout.
Cancellation & Updates
cancel_jobmarks a job lineage (RecurringJobId) for cancellation.- If using
PriorityQueueType::HandleBased, the scheduler attempts to proactively remove the currently scheduled instance from the queue (O log n). - If using
PriorityQueueType::BinaryHeap, the scheduled instance is only discarded when it reaches the front of the queue and is checked before dispatch. - Updating job parameters (schedule, retries, function) after submission is not directly supported in this version.
HandleBasedprovides a foundation for potential future implementation.
License
This project is licensed under the Mozilla Public License Version 2.0 (LICENSE-MPL-2.0 or https://opensource.org/licenses/MPL-2.0).