turnkeeper 1.2.7

An asynchronous, recurring job scheduler for Tokio with support for CRON, interval, and weekday/time schedules, plus retries, cancellation, and observability.
Documentation
# TurnKeeper API Reference

This document provides a quick reference to the main public API of the `TurnKeeper` crate.

## Core Struct

### `TurnKeeper`

The main scheduler struct. Manages job lifecycle, worker pool, and state.

**Creation:**

```rust
use turnkeeper::{TurnKeeper, scheduler::PriorityQueueType, job::TKJobId, error::BuildError};
use std::time::Duration;

# async fn example() -> Result<(), BuildError> {
// Use the builder pattern
let builder = TurnKeeper::builder();

// Configure (examples)
let scheduler = builder
    .max_workers(4) // Required: Max concurrent jobs
    .priority_queue(PriorityQueueType::HandleBased) // Optional: Choose PQ type (default)
    .staging_buffer_size(16) // Optional: Set buffer sizes (default 128)
    .build()?; // Returns Result<TurnKeeper, BuildError>
# Ok(())
# }
```

See [`SchedulerBuilder`](#schedulerbuilder) for all configuration options.

**Methods:**

*   `TurnKeeper::builder() -> SchedulerBuilder`
    *   Creates a new builder instance to configure the scheduler.

*   `async fn add_job_async<F>(&self, request: TKJobRequest, exec_fn: F) -> Result<TKJobId, SubmitError>`
    *   Submits a job for scheduling. Waits asynchronously if the internal staging buffer is full.
    *   `request`: A [`TKJobRequest`] defining the job's schedule and parameters. Use the relevant constructors (`from_week_day`, `from_cron`, etc.).
    *   `exec_fn`: An async function or closure matching [`BoxedExecFn`].
    *   Returns the unique `TKJobId` (lineage ID) assigned to the job upon successful submission to the staging queue. This ID is generated by the scheduler handle *before* sending to the internal coordinator and used consistently internally.
    *   **Errors:** [`SubmitError::ChannelClosed`] - Contains the original `(request, exec_fn)` tuple.

*   `fn try_add_job<F>(&self, request: TKJobRequest, exec_fn: F) -> Result<TKJobId, SubmitError>`
    *   Attempts to submit a job non-blockingly.
    *   Returns `Ok(TKJobId)` on immediate successful submission to the staging queue. The ID is generated by the scheduler handle *before* sending to the internal coordinator and used consistently internally.
    *   **Errors:** [`SubmitError::StagingFull`], [`SubmitError::ChannelClosed`] - Both contain the original `(request, exec_fn)` tuple.

*   `fn add_job<F>(&self, request: TKJobRequest, exec_fn: F) -> Result<TKJobId, SubmitError>`
    *   Submits a job for scheduling using **blocking** send. Waits if the internal staging buffer is full until space is available. *Panics if called within an asynchronous execution context.*
    *   Returns the unique `TKJobId` (lineage ID) assigned to the job upon successful submission to the staging queue. This ID is generated by the scheduler handle *before* sending to the internal coordinator and used consistently internally.
    *   **Errors:** [`SubmitError::ChannelClosed`] - Contains the original `(request, exec_fn)` tuple.

*   `async fn cancel_job(&self, job_id: TKJobId) -> Result<(), QueryError>`
    *   Requests cancellation of a job lineage. Operation is idempotent.
    *   Effect depends on [`PriorityQueueType`] used during build (`HandleBased` attempts proactive removal).
    *   **Errors:** [`QueryError::SchedulerShutdown`], [`QueryError::ResponseFailed`], [`QueryError::JobNotFound`]

*   `async fn update_job(&self, job_id: TKJobId, schedule: Option<Schedule>, max_retries: Option<MaxRetries>) -> Result<(), QueryError>`
    *   Updates the configuration (schedule, max retries) of an existing job lineage.
    *   **Requires `PriorityQueueType::HandleBased`**.
    *   If the schedule is updated, the currently scheduled instance (if any) is removed and a new instance is scheduled based on the new schedule, provided the job is not cancelled.
    *   **Errors:** [`QueryError::SchedulerShutdown`], [`QueryError::ResponseFailed`], [`QueryError::JobNotFound`], [`QueryError::UpdateRequiresHandleBasedPQ`], [`QueryError::UpdateFailed`]

*   `async fn trigger_job_now(&self, job_id: TKJobId) -> Result<(), QueryError>`
    *   Manually triggers a job lineage to run as soon as possible.
    *   **Behavior:**
        *   **Preemption:** If the job is currently scheduled for the *future*, that run is cancelled/preempted and replaced by this immediate trigger.
        *   **Idempotency:** If the job is currently **executing** or queued for **immediate** execution (pending), the request is rejected to prevent duplicate/stacked runs.
    *   **Errors:**
        *   [`QueryError::TriggerFailedJobScheduled`]: Returned if the job is already running or queued for immediate execution.
        *   [`QueryError::TriggerFailedJobCancelled`]: Returned if the job is marked as cancelled.
        *   [`QueryError::JobNotFound`], [`QueryError::SchedulerShutdown`], [`QueryError::ResponseFailed`].

*   `async fn get_job_details(&self, job_id: TKJobId) -> Result<JobDetails, QueryError>`
    *   Retrieves detailed information about a specific job lineage.
    *   **History:** Includes jobs that have **completed** or **failed permanently**. These are retained in an internal history cache (default TTL: 1 hour) after execution finishes.
    *   **Errors:** [`QueryError::SchedulerShutdown`], [`QueryError::ResponseFailed`], [`QueryError::JobNotFound`] if ID is unknown or expired from history.

*   `async fn list_all_jobs(&self) -> Result<Vec<JobSummary>, QueryError>`
    *   Retrieves summary information for all active job lineages.
    *   **History:** Also includes **completed** or **permanently failed** jobs currently stored in the internal history cache.
    *   **Errors:** [`QueryError::SchedulerShutdown`], [`QueryError::ResponseFailed`]

*   `async fn get_metrics_snapshot(&self) -> Result<MetricsSnapshot, QueryError>`
    *   Retrieves a snapshot of the scheduler's current internal metrics.
    *   **Errors:** [`QueryError::SchedulerShutdown`], [`QueryError::ResponseFailed`]

*   `async fn shutdown_graceful(&self, timeout: Option<Duration>) -> Result<(), ShutdownError>`
    *   Initiates graceful shutdown, stopping new job acceptance and waiting for active jobs to complete.
    *   Waits for tasks to exit or until the optional `timeout` (from `std::time::Duration`) elapses.
    *   **Errors:** [`ShutdownError::SignalFailed`], [`ShutdownError::Timeout`], [`ShutdownError::TaskPanic`]

*   `async fn shutdown_force(&self, timeout: Option<Duration>) -> Result<(), ShutdownError>`
    *   Initiates immediate shutdown, potentially interrupting active jobs.
    *   Waits for tasks to attempt exit or until the optional `timeout` (from `std::time::Duration`) elapses.
    *   **Errors:** [`ShutdownError::SignalFailed`], [`ShutdownError::Timeout`], [`ShutdownError::TaskPanic`]

## Configuration

### `SchedulerBuilder`

Used to configure and build a `TurnKeeper` instance. Get via `TurnKeeper::builder()`.

**Methods:**

*   `max_workers(usize) -> Self`: (Required) Sets the maximum number of concurrently running jobs. Must be > 0 (unless you intend no jobs to run).
*   `priority_queue(PriorityQueueType) -> Self`: (Optional) Sets the internal queue type (`BinaryHeap` or `HandleBased`). Default: `HandleBased`.
*   `staging_buffer_size(usize) -> Self`: (Optional) Sets the capacity of the incoming job submission buffer. Default: 128.
*   `command_buffer_size(usize) -> Self`: (Optional) Sets the capacity of the internal command buffer (queries, etc.). Default: 128.
*   `job_dispatch_buffer_size(usize) -> Self`: (Optional) Sets the capacity of the coordinator-to-worker dispatch channel. Must be >= 1. Default: 1.
*   `build() -> Result<TurnKeeper, BuildError>`: Consumes the builder and attempts to create and start the `TurnKeeper`.

### `PriorityQueueType` (Enum)

Determines the internal scheduling mechanism.

*   `BinaryHeap`: Standard library implementation. Lazy cancellation (job discarded when it's next to run). No efficient job updates before execution. Minimal dependencies.
*   `HandleBased`: Uses `priority-queue` crate. Allows proactive cancellation removal (attempts direct removal from queue). Required for job updates (`update_job`). Adds dependency (requires `priority_queue_handle_based` feature).

## Job Definition

### `TKJobRequest`

Defines a job's configuration and schedule. Passed to `add_job` methods.

**Creation (Examples):**

```rust
use turnkeeper::job::{TKJobRequest, Schedule, MaxRetries}; // Import types
use chrono::{Weekday, NaiveTime, Utc, DateTime, Duration as ChronoDuration};
use std::time::Duration as StdDuration;

// Weekday/Time schedule
let mut req_weekday = TKJobRequest::from_week_day(
    "Weekday Job",
    vec![(Weekday::Mon, NaiveTime::from_hms_opt(9,0,0).unwrap())],
    3 // max_retries
);

// CRON schedule (every 5 minutes) - Requires `cron_schedule` feature
let req_cron = TKJobRequest::from_cron(
    "Cron Job",
    "0 */5 * * * * *", // Cron expression (includes seconds)
    1
);

// Interval schedule (every 30 seconds)
let mut req_interval = TKJobRequest::from_interval(
    "Interval Job",
    StdDuration::from_secs(30),
    2
);

// One-time schedule
let run_at = Utc::now() + ChronoDuration::minutes(10);
let req_once = TKJobRequest::from_once(
    "One Time Task",
    run_at,
    0
);

// No automatic schedule (needs manual trigger or initial run time)
let mut req_never = TKJobRequest::never(
    "Manual Trigger Job",
    0
);

// With fixed retry delay instead of exponential backoff
let req_fixed_retry = TKJobRequest::with_fixed_retry_delay(
    "Fixed Retry",
    Schedule::Once(Utc::now()), // Example schedule
    3, // max_retries
    StdDuration::from_secs(10) // Fixed delay
);


// Set an initial run time (useful for Interval, Never, or delaying first run)
let first_run_time = Utc::now() + ChronoDuration::seconds(5);
req_interval.with_initial_run_time(first_run_time);
req_never.with_initial_run_time(first_run_time);

```

**Constructors & Methods:**

*   `from_week_day(name: &str, weekday_times: Vec<(Weekday, NaiveTime)>, max_retries: u32) -> Self`: Creates a job using a `Schedule::WeekdayTimes`.
*   `from_cron(name: &str, cron_expression: &str, max_retries: u32) -> Self`: Creates a job using `Schedule::Cron`. Requires the `cron_schedule` feature.
*   `from_interval(name: &str, interval: StdDuration, max_retries: u32) -> Self`: Creates a job using `Schedule::FixedInterval`. First run typically needs `with_initial_run_time` or occurs near `Now + Interval` based on when the coordinator processes it.
*   `from_once(name: &str, run_at: DateTime<Utc>, max_retries: u32) -> Self`: Creates a job using `Schedule::Once`. The first run time is set automatically.
*   `never(name: &str, max_retries: u32) -> Self`: Creates a job using `Schedule::Never`. It will only run if `with_initial_run_time` is called or it is manually triggered.
*   `with_fixed_retry_delay(name: &str, schedule: Schedule, max_retries: MaxRetries, retry_delay: StdDuration) -> Self`: Creates a job request that uses a fixed delay between retries instead of exponential backoff.
*   `with_initial_run_time(&mut self, run_time: DateTime<Utc>)`: Sets a specific first run time, overriding any schedule calculation for the *first* run only. Modifies the request in place.

### `Schedule` (Enum)

Represents the different ways a job can be scheduled. Held within `TKJobRequest`.

*   `WeekdayTimes(Vec<(Weekday, NaiveTime)>)`: Run at specific UTC times on given weekdays.
*   `Cron(String)`: Run based on a standard CRON expression (interpreted as UTC). Requires the `cron_schedule` feature.
*   `FixedInterval(StdDuration)`: Run repeatedly at a fixed interval *after* the last scheduled/run time.
*   `Once(DateTime<Utc>)`: Run only once at the specified time.
*   `Never`: No automatic schedule. Requires `with_initial_run_time` or manual trigger to run even once.

### `BoxedExecFn` (Type Alias)

The required signature for the job execution function:

```rust
use std::pin::Pin;
use std::future::Future;
use std::sync::Arc; // Often used with closures

// Must be async, Send + Sync + 'static
type BoxedExecFn = Box<
    dyn Fn() -> Pin<Box<dyn Future<Output = bool> + Send + 'static>>
    + Send + Sync + 'static,
>;
// Return `true` for success, `false` for logical failure (triggers retry).
// Panics within the function are caught and treated as failures.
```

**Example Job Function (using `job_fn!` macro):**

```rust
use turnkeeper::job_fn; // Import the macro
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
use std::time::Duration;

let counter = Arc::new(AtomicUsize::new(0));

let job_fn = job_fn! { // Macro simplifies boxing/pinning
    {
        // Optional: Setup block for cloning Arcs etc.
        let job_counter = counter.clone();
    }
    {
        // Main async logic block
        let count = job_counter.fetch_add(1, Ordering::SeqCst) + 1;
        println!("Job running! Count: {}", count);
        tokio::time::sleep(Duration::from_millis(100)).await;
        // Perform work...
        let success = true; // Determine success/failure
        success
    }
};
```

## IDs

*   `TKJobId = uuid::Uuid`: Identifies a job lineage (the definition). Returned by `add_job` methods. Used in `cancel_job`, `update_job`, `trigger_job_now`, `get_job_details`.
*   `InstanceId = uuid::Uuid`: Identifies a specific scheduled *instance* of a job in the queue or executing. Seen in internal logs, `JobContext`, and `JobDetails.next_run_instance`.

## Job Context (`job_context` feature)

If the `job_context` feature is enabled:

*   `JobContext`: Struct containing `tk_job_id: TKJobId` and `instance_id: InstanceId`.
*   `try_get_current_job_context() -> Option<JobContext>`: Safely attempts to get context within a job function.
*   `job_context!() -> JobContext`: Gets context within a job function; **panics** if called outside the job's task-local scope.

## Query Results & Metrics

*   `JobDetails`: Struct with detailed configuration and current state of a job lineage (includes `schedule: Schedule`, `max_retries`, `retry_count`, `retry_delay`, `next_run_instance`, `next_run_time`, `is_cancelled`).
*   `JobSummary`: Struct with summary information for listing jobs (`id`, `name`, `next_run`, `retry_count`, `is_cancelled`).
*   `MetricsSnapshot`: Struct holding a point-in-time snapshot of internal counters and gauges. Includes:
    *   Job Counters: `jobs_submitted`, `jobs_executed_success`, `jobs_executed_fail`, `jobs_panicked`, `jobs_retried`, `jobs_permanently_failed`
    *   Cancellation Counters:
        *   `jobs_lineage_cancelled`: Count of `cancel_job` calls that successfully marked a lineage.
        *   `jobs_instance_discarded_cancelled`: Count of instances popped from queue & discarded due to cancellation.
    *   Staging Counters: `staging_submitted_total`, `staging_rejected_full`
    *   Gauges: `job_queue_scheduled_current`, `job_staging_buffer_current`, `workers_active_current`
    *   Histogram Data: `job_execution_duration_count`, `job_execution_duration_sum_micros`, `job_queue_wait_duration_count`, `job_queue_wait_duration_sum_micros`. Helper methods like `mean_execution_duration()` are available.

## Errors

*   `BuildError`: Errors during scheduler construction (`SchedulerBuilder::build`).
*   `SubmitError<(TKJobRequest, Arc<BoxedExecFn>)>`: Errors during job submission (`try_add_job`, `add_job_async`, `add_job`). Contains original job request/fn on failure.
*   `QueryError`: Errors during queries, cancellation, update, or trigger requests (`get_...`, `list_...`, `cancel_job`, `update_job`, `trigger_job_now`). Variants include `SchedulerShutdown`, `ResponseFailed`, `JobNotFound`, `UpdateRequiresHandleBasedPQ`, `UpdateFailed`, `TriggerFailedJobScheduled`, `TriggerFailedJobCancelled`, `TriggerFailed`.
*   `ShutdownError`: Errors during shutdown (`shutdown_graceful`, `shutdown_force`). Variants include `SignalFailed`, `Timeout`, `TaskPanic`.