Crate turnkeeper

Crate turnkeeper 

Source
Expand description

TurnKeeper: A Flexible Recurring Job Scheduler

Provides a flexible scheduler for running recurring tasks based on weekday/time schedules, CRON expressions, or fixed intervals, with support for retries, configurable scheduling mechanisms, metrics, querying, and cancellation.

§Features

  • Schedule jobs using:
    • Multiple (Weekday, NaiveTime) pairs (UTC).
    • Standard CRON expressions (UTC interpretation, requires the cron_schedule feature).
    • Fixed intervals (e.g., every 5 minutes).
    • One-time execution at a specific DateTime<Utc>.
  • Configurable maximum retry attempts with exponential backoff.
  • Choice of scheduling backend via the builder:
    • BinaryHeap: Standard library, lazy cancellation check.
    • HandleBased: Supports proactive cancellation removal and future job updates (requires priority_queue_handle_based feature).
  • Non-blocking job submission (try_add_job) with backpressure signaling.
  • Asynchronous job submission (add_job_async).
  • Blocking job submission (add_job).
  • Query job details (JobDetails) and list summaries (JobSummary).
  • Built-in metrics collection (queryable snapshot using MetricsSnapshot).
  • Graceful and forced shutdown procedures (with optional timeout).
  • Cancellation of job lineages.
  • Optional task-local job context (JobContext) for execution functions (requires job_context feature).
  • Optional Serde support for public types (requires serde_support feature).

§Usage

use turnkeeper::{
    TurnKeeper,
    job::{TKJobRequest, Schedule}, // Import Schedule if using directly
    scheduler::PriorityQueueType
};
use chrono::{NaiveTime, Weekday, Duration as ChronoDuration, Utc};
use std::time::Duration as StdDuration;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use uuid::Uuid; // Import Uuid if storing IDs

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Basic tracing setup (optional)
    // tracing_subscriber::fmt().with_env_filter("warn,turnkeeper=info").init();

    println!("Building scheduler...");
    let scheduler = TurnKeeper::builder()
        .max_workers(2) // Example: 2 concurrent jobs
        .priority_queue(PriorityQueueType::HandleBased) // Assumes 'priority_queue_handle_based' feature
        .build()?;
    println!("Scheduler built.");

    let job_counter = Arc::new(AtomicUsize::new(0));
    let job_id_store = Arc::new(parking_lot::Mutex::new(None::<Uuid>));

    // --- Add a job (Example using WeekdayTimes via helper) ---
    let mut job_req = TKJobRequest::from_week_day(
        "Weekday Job",
        vec![(Weekday::Mon, NaiveTime::from_hms_opt(9, 0, 0).unwrap())], // Monday 9 AM UTC
        3 // Max retries
    );
    // Schedule the *first* run immediately for demonstration
    job_req.with_initial_run_time(Utc::now() + ChronoDuration::seconds(1));

    // --- Add an interval job ---
    let interval_req = TKJobRequest::from_interval(
        "Interval Job",
        StdDuration::from_secs(30), // Run every 30 seconds
        1 // Max retries
    );
    // First run will be calculated as Now + Interval by the scheduler.

    // --- Add a CRON job (runs every minute) ---
    // Requires the `cron_schedule` feature to be enabled
    #[cfg(feature = "cron_schedule")]
    let cron_req = TKJobRequest::from_cron(
        "Cron Job",
        "0 * * * * * *", // Every minute at second 0 (adjust as needed)
        0 // No retries
    );

    let counter_clone = job_counter.clone();
    let id_store_clone = job_id_store.clone();

    // Job function must be async and match the BoxedExecFn signature
    let exec_fn = move || {
        let current_count = counter_clone.fetch_add(1, Ordering::SeqCst);
        println!("Weekday Job running! Count: {}", current_count + 1);

        // --- Optional: Access Job Context (requires `job_context` feature) ---
        #[cfg(feature = "job_context")]
        {
            use turnkeeper::try_get_current_job_context;
            if let Some(ctx) = try_get_current_job_context() {
                println!("  Context: Job {}, Instance {}", ctx.tk_job_id, ctx.instance_id);
            }
        }
        // --- End Optional Context Access ---

        // Must return a Pinned Future resolving to bool (true=success)
        Box::pin(async move {
            tokio::time::sleep(StdDuration::from_millis(50)).await; // Simulate work
            true // Indicate success
        }) as std::pin::Pin<Box<dyn std::future::Future<Output = bool> + Send>>
    };

    println!("Submitting Weekday job...");
    match scheduler.try_add_job(job_req.clone(), exec_fn) { // Clone exec_fn if reused
         Ok(job_id) => {
             println!("Weekday Job submitted successfully with ID: {}", job_id);
             let mut locked_id = id_store_clone.lock();
             *locked_id = Some(job_id); // Store the ID for later use (cancellation)
         },
         Err(e) => {
             eprintln!("Failed to submit weekday job initially: {:?}", e);
             // Handle staging buffer full error (e.g., retry later)
         }
    }

    // Submit the interval job
    match scheduler.add_job_async(interval_req, || Box::pin(async {
        println!("Interval Job Executing!"); true
        })).await {
        Ok(id) => println!("Interval Job submitted with ID: {}", id),
        Err(e) => eprintln!("Failed to submit interval job: {:?}", e),
    }

    // Submit the CRON job (only if feature is enabled)
    #[cfg(feature = "cron_schedule")]
    match scheduler.add_job_async(cron_req, || Box::pin(async {
        println!("Cron Job Executing!"); true
        })).await {
        Ok(id) => println!("Cron Job submitted with ID: {}", id),
        Err(e) => eprintln!("Failed to submit cron job: {:?}", e),
    }

    // --- Wait for jobs to potentially run ---
    tokio::time::sleep(StdDuration::from_secs(5)).await;

    // --- Query Metrics ---
    println!("Querying metrics...");
    match scheduler.get_metrics_snapshot().await {
        Ok(metrics) => println!("Current Metrics: {:#?}", metrics), // Pretty print metrics
        Err(e) => eprintln!("Failed to get metrics: {:?}", e),
    }

    // --- Cancel Job (by Lineage ID) ---
    let maybe_job_id = *job_id_store.lock();
    if let Some(job_id_to_cancel) = maybe_job_id {
        println!("Requesting cancellation for job {}...", job_id_to_cancel);
        match scheduler.cancel_job(job_id_to_cancel).await {
            Ok(()) => println!("Cancellation requested successfully."),
            Err(e) => eprintln!("Failed to cancel job {}: {:?}", job_id_to_cancel, e),
        }
    } else {
        println!("Job ID not stored, skipping cancellation.")
    }

    // --- Query Details of a specific job ---
    // (Replace with a known ID from submitting interval/cron job if needed)
    // if let Ok(details) = scheduler.get_job_details(known_interval_job_id).await {
    //     println!("Interval Job Details: {:#?}", details);
    // }

    // --- Shutdown ---
    println!("Requesting graceful shutdown...");
    // Pass an optional timeout for shutdown completion
    match scheduler.shutdown_graceful(Some(StdDuration::from_secs(10))).await {
        Ok(()) => println!("Scheduler shut down complete."),
        Err(e) => eprintln!("Shutdown failed: {:?}", e),
    }

    Ok(())
}

§Configuration

Use the SchedulerBuilder to configure the scheduler:

  • max_workers: Set the concurrency limit.
  • priority_queue: Choose between PriorityQueueType::BinaryHeap and PriorityQueueType::HandleBased. See PriorityQueueType docs for details. HandleBased requires the priority_queue_handle_based feature.
  • staging_buffer_size, command_buffer_size, job_dispatch_buffer_size: Configure internal channel capacities.

§Job Lifecycle & State

  • Jobs are defined by TKJobRequest, specifying the schedule type via Schedule.
  • Use constructors like from_week_day, from_interval, from_once, never.
  • Use from_cron requires the cron_schedule feature.
  • The scheduler manages job state internally, including retry counts and the next scheduled run time (next_run_time in JobDetails).
  • Workers execute job functions (BoxedExecFn).
  • Job outcomes (success, failure, panic) trigger rescheduling or permanent failure logic.
  • Cancellation marks a job lineage; its handling depends on the chosen PriorityQueueType.

§Observability

Re-exports§

pub use scheduler::PriorityQueueType;
pub use scheduler::SchedulerBuilder;
pub use scheduler::TurnKeeper;
pub use error::BuildError;
pub use error::QueryError;
pub use error::ShutdownError;
pub use error::SubmitError;
pub use job::BoxedExecFn;
pub use job::InstanceId;
pub use job::JobDetails;
pub use job::JobSummary;
pub use job::TKJobId;
pub use job::TKJobRequest;
pub use job::Schedule;
pub use job::context::JobContext;
pub use job::context::try_get_current_job_context;
pub use metrics::MetricsSnapshot;
pub use metrics::SchedulerMetrics;

Modules§

command
coordinator
error
job
job_context_docs
Accessing Job Context (job_context feature)
metrics
scheduler
worker

Macros§

job_context
Retrieves the current JobContext, panicking if called outside a TurnKeeper-managed job task where the context has not been set.
job_fn
Macro to simplify creating a BoxedExecFn compatible closure.