Expand description
TurnKeeper: A Flexible Recurring Job Scheduler
Provides a flexible scheduler for running recurring tasks based on weekday/time schedules, CRON expressions, or fixed intervals, with support for retries, configurable scheduling mechanisms, metrics, querying, and cancellation.
§Features
- Schedule jobs using:
- Multiple
(Weekday, NaiveTime)
pairs (UTC). - Standard CRON expressions (UTC interpretation, requires the
cron_schedule
feature). - Fixed intervals (e.g., every 5 minutes).
- One-time execution at a specific
DateTime<Utc>
.
- Multiple
- Configurable maximum retry attempts with exponential backoff.
- Choice of scheduling backend via the builder:
BinaryHeap
: Standard library, lazy cancellation check.HandleBased
: Supports proactive cancellation removal and future job updates (requirespriority_queue_handle_based
feature).
- Non-blocking job submission (
try_add_job
) with backpressure signaling. - Asynchronous job submission (
add_job_async
). - Blocking job submission (
add_job
). - Query job details (
JobDetails
) and list summaries (JobSummary
). - Built-in metrics collection (queryable snapshot using
MetricsSnapshot
). - Graceful and forced shutdown procedures (with optional timeout).
- Cancellation of job lineages.
- Optional task-local job context (
JobContext
) for execution functions (requiresjob_context
feature). - Optional Serde support for public types (requires
serde_support
feature).
§Usage
use turnkeeper::{
TurnKeeper,
job::{TKJobRequest, Schedule}, // Import Schedule if using directly
scheduler::PriorityQueueType
};
use chrono::{NaiveTime, Weekday, Duration as ChronoDuration, Utc};
use std::time::Duration as StdDuration;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use uuid::Uuid; // Import Uuid if storing IDs
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Basic tracing setup (optional)
// tracing_subscriber::fmt().with_env_filter("warn,turnkeeper=info").init();
println!("Building scheduler...");
let scheduler = TurnKeeper::builder()
.max_workers(2) // Example: 2 concurrent jobs
.priority_queue(PriorityQueueType::HandleBased) // Assumes 'priority_queue_handle_based' feature
.build()?;
println!("Scheduler built.");
let job_counter = Arc::new(AtomicUsize::new(0));
let job_id_store = Arc::new(parking_lot::Mutex::new(None::<Uuid>));
// --- Add a job (Example using WeekdayTimes via helper) ---
let mut job_req = TKJobRequest::from_week_day(
"Weekday Job",
vec![(Weekday::Mon, NaiveTime::from_hms_opt(9, 0, 0).unwrap())], // Monday 9 AM UTC
3 // Max retries
);
// Schedule the *first* run immediately for demonstration
job_req.with_initial_run_time(Utc::now() + ChronoDuration::seconds(1));
// --- Add an interval job ---
let interval_req = TKJobRequest::from_interval(
"Interval Job",
StdDuration::from_secs(30), // Run every 30 seconds
1 // Max retries
);
// First run will be calculated as Now + Interval by the scheduler.
// --- Add a CRON job (runs every minute) ---
// Requires the `cron_schedule` feature to be enabled
#[cfg(feature = "cron_schedule")]
let cron_req = TKJobRequest::from_cron(
"Cron Job",
"0 * * * * * *", // Every minute at second 0 (adjust as needed)
0 // No retries
);
let counter_clone = job_counter.clone();
let id_store_clone = job_id_store.clone();
// Job function must be async and match the BoxedExecFn signature
let exec_fn = move || {
let current_count = counter_clone.fetch_add(1, Ordering::SeqCst);
println!("Weekday Job running! Count: {}", current_count + 1);
// --- Optional: Access Job Context (requires `job_context` feature) ---
#[cfg(feature = "job_context")]
{
use turnkeeper::try_get_current_job_context;
if let Some(ctx) = try_get_current_job_context() {
println!(" Context: Job {}, Instance {}", ctx.tk_job_id, ctx.instance_id);
}
}
// --- End Optional Context Access ---
// Must return a Pinned Future resolving to bool (true=success)
Box::pin(async move {
tokio::time::sleep(StdDuration::from_millis(50)).await; // Simulate work
true // Indicate success
}) as std::pin::Pin<Box<dyn std::future::Future<Output = bool> + Send>>
};
println!("Submitting Weekday job...");
match scheduler.try_add_job(job_req.clone(), exec_fn) { // Clone exec_fn if reused
Ok(job_id) => {
println!("Weekday Job submitted successfully with ID: {}", job_id);
let mut locked_id = id_store_clone.lock();
*locked_id = Some(job_id); // Store the ID for later use (cancellation)
},
Err(e) => {
eprintln!("Failed to submit weekday job initially: {:?}", e);
// Handle staging buffer full error (e.g., retry later)
}
}
// Submit the interval job
match scheduler.add_job_async(interval_req, || Box::pin(async {
println!("Interval Job Executing!"); true
})).await {
Ok(id) => println!("Interval Job submitted with ID: {}", id),
Err(e) => eprintln!("Failed to submit interval job: {:?}", e),
}
// Submit the CRON job (only if feature is enabled)
#[cfg(feature = "cron_schedule")]
match scheduler.add_job_async(cron_req, || Box::pin(async {
println!("Cron Job Executing!"); true
})).await {
Ok(id) => println!("Cron Job submitted with ID: {}", id),
Err(e) => eprintln!("Failed to submit cron job: {:?}", e),
}
// --- Wait for jobs to potentially run ---
tokio::time::sleep(StdDuration::from_secs(5)).await;
// --- Query Metrics ---
println!("Querying metrics...");
match scheduler.get_metrics_snapshot().await {
Ok(metrics) => println!("Current Metrics: {:#?}", metrics), // Pretty print metrics
Err(e) => eprintln!("Failed to get metrics: {:?}", e),
}
// --- Cancel Job (by Lineage ID) ---
let maybe_job_id = *job_id_store.lock();
if let Some(job_id_to_cancel) = maybe_job_id {
println!("Requesting cancellation for job {}...", job_id_to_cancel);
match scheduler.cancel_job(job_id_to_cancel).await {
Ok(()) => println!("Cancellation requested successfully."),
Err(e) => eprintln!("Failed to cancel job {}: {:?}", job_id_to_cancel, e),
}
} else {
println!("Job ID not stored, skipping cancellation.")
}
// --- Query Details of a specific job ---
// (Replace with a known ID from submitting interval/cron job if needed)
// if let Ok(details) = scheduler.get_job_details(known_interval_job_id).await {
// println!("Interval Job Details: {:#?}", details);
// }
// --- Shutdown ---
println!("Requesting graceful shutdown...");
// Pass an optional timeout for shutdown completion
match scheduler.shutdown_graceful(Some(StdDuration::from_secs(10))).await {
Ok(()) => println!("Scheduler shut down complete."),
Err(e) => eprintln!("Shutdown failed: {:?}", e),
}
Ok(())
}
§Configuration
Use the SchedulerBuilder
to configure the scheduler:
max_workers
: Set the concurrency limit.priority_queue
: Choose betweenPriorityQueueType::BinaryHeap
andPriorityQueueType::HandleBased
. SeePriorityQueueType
docs for details.HandleBased
requires thepriority_queue_handle_based
feature.staging_buffer_size
,command_buffer_size
,job_dispatch_buffer_size
: Configure internal channel capacities.
§Job Lifecycle & State
- Jobs are defined by
TKJobRequest
, specifying the schedule type viaSchedule
. - Use constructors like
from_week_day
,from_interval
,from_once
,never
. - Use
from_cron
requires thecron_schedule
feature. - The scheduler manages job state internally, including retry counts and the next scheduled run time (
next_run_time
inJobDetails
). - Workers execute job functions (
BoxedExecFn
). - Job outcomes (success, failure, panic) trigger rescheduling or permanent failure logic.
- Cancellation marks a job lineage; its handling depends on the chosen
PriorityQueueType
.
§Observability
- Retrieve metrics snapshots using
TurnKeeper::get_metrics_snapshot
. SeeMetricsSnapshot
. - Query job details using
TurnKeeper::get_job_details
or list summaries withTurnKeeper::list_all_jobs
. SeeJobDetails
andJobSummary
. - Integrate with the
tracing
crate for detailed logs.
Re-exports§
pub use scheduler::PriorityQueueType;
pub use scheduler::SchedulerBuilder;
pub use scheduler::TurnKeeper;
pub use error::BuildError;
pub use error::QueryError;
pub use error::ShutdownError;
pub use error::SubmitError;
pub use job::BoxedExecFn;
pub use job::InstanceId;
pub use job::JobDetails;
pub use job::JobSummary;
pub use job::TKJobId;
pub use job::TKJobRequest;
pub use job::Schedule;
pub use job::context::JobContext;
pub use job::context::try_get_current_job_context;
pub use metrics::MetricsSnapshot;
pub use metrics::SchedulerMetrics;
Modules§
- command
- coordinator
- error
- job
- job_
context_ docs - Accessing Job Context (
job_context
feature) - metrics
- scheduler
- worker
Macros§
- job_
context - Retrieves the current
JobContext
, panicking if called outside a TurnKeeper-managed job task where the context has not been set. - job_fn
- Macro to simplify creating a
BoxedExecFn
compatible closure.