forge_jobs/lib.rs
1#![forbid(unsafe_code)]
2#![allow(clippy::missing_errors_doc)]
3
4//! Sidekiq-style job queue with embedded `SQLite` and pluggable Postgres.
5//!
6//! Domain crate — host-agnostic. Consumers register handlers, enqueue jobs,
7//! and the runtime claims/runs/finalizes them across N worker tasks. The
8//! same code path runs single-process on `SQLite` (local desktop) or
9//! multi-replica on Postgres (deployed service).
10//!
11//! ## What it gives you
12//!
13//! - Backend-agnostic [`Storage`] traits (`JobQueue`, `ProcessRegistry`,
14//! `QueueConfig`, `CronStorage`, `RateLimitStorage`) — one row of
15//! indirection between the runtime and the database
16//! - Per-queue worker pools, cooperative shutdown, stale-heartbeat reaper
17//! - Cron schedules with lease-elected leader (only one replica fires
18//! each tick); same lease gates the retention sweep and metrics roller
19//! - Per-queue exponential backoff with a configurable on/off toggle; the
20//! `Failed` and `Throttled` arms both respect it
21//! - Cancellation that survives across replicas: [`QueueHandle::request_cancel`]
22//! short-circuits in-process; cross-pod cancels flow through a DB flag
23//! the heartbeat tick observes
24//! - Cluster-wide rate-limit budget — handlers `acquire("slack")` /
25//! `acquire("gh")` against a server-side token bucket; one upstream
26//! 429 drains the bucket so sibling pods don't each fire their own
27//!
28//! ## Minimal consumer
29//!
30//! ```ignore
31//! use std::sync::Arc;
32//! use forge_jobs::storage::{DatabaseConfig, PathsError, QueuePaths};
33//! use forge_jobs::{
34//! DefaultRouter, EnqueueRequest, HandlerRegistry, NoopEcho, QueueRuntime,
35//! };
36//!
37//! #[derive(Debug)]
38//! struct EnvPaths;
39//! impl QueuePaths for EnvPaths {
40//! fn config_dir(&self) -> Result<std::path::PathBuf, PathsError> {
41//! Ok("./jobs/config".into())
42//! }
43//! fn data_dir(&self) -> Result<std::path::PathBuf, PathsError> {
44//! Ok("./jobs/data".into())
45//! }
46//! }
47//!
48//! # async fn run() -> forge_jobs::storage::Result<()> {
49//! let paths = EnvPaths;
50//! let storage = DatabaseConfig::load(&paths)?.open_storage(&paths).await?;
51//! let mut handlers = HandlerRegistry::new();
52//! handlers.register(NoopEcho);
53//! let runtime = QueueRuntime::new(storage, handlers, Arc::new(DefaultRouter))
54//! .with_queues(["default".to_owned()]); // required: which queues this worker runs
55//! runtime.ensure_queue("default", 2).await?;
56//! runtime
57//! .enqueue(
58//! EnqueueRequest::new(forge_jobs::NOOP_ECHO_KIND, serde_json::json!({}))
59//! .on_queue("default"),
60//! )
61//! .await?;
62//! let handle = runtime.start().await?;
63//! // ... handle.shutdown_graceful(...) at exit
64//! # let _ = handle;
65//! # Ok(())
66//! # }
67//! ```
68//!
69//! See `examples/minimal.rs` in the crate root for the runnable version.
70//!
71//! ## Handler cancellation contract
72//!
73//! Handlers that take longer than ~1s should periodically check
74//! `ctx.cancel.is_cancelled()` between `.await` points. A user click on
75//! the Mission Control "delete" button fires [`QueueHandle::request_cancel`];
76//! the worker's heartbeat picks up the cross-pod variant via the DB
77//! `cancel_requested_at` flag within `HEARTBEAT_INTERVAL` (10s). User-
78//! cancelled jobs route straight to `Dead` without burning the retry budget.
79//!
80//! ## Optional features
81//!
82//! - **`postgres`** — enables the Postgres storage adapter. Off by
83//! default; service / k8s deploys flip this on.
84//! - **`legacy-scheduler`** — re-exports a smaller cooperative
85//! recurring-job `Scheduler` (with `Job`, `JobStore`, `Clock`,
86//! `parse_cron`, etc.) that predates the queue subsystem. Used by
87//! the originating project's LLM idempotency cache pruning and a
88//! few similar internal cron tasks. New code should use
89//! [`QueueRuntime`] with a cron schedule instead.
90
91pub mod cron_expr;
92pub mod runtime;
93pub mod storage;
94
95pub use cron_expr::parse_cron;
96
97// Legacy cron `Scheduler` modules. Only compiled when the
98// `legacy-scheduler` feature is on. The shared `parse_cron` helper
99// the queue's `runtime::cron` needs lives in `cron_expr` (always
100// compiled).
101#[cfg(feature = "legacy-scheduler")]
102mod clock;
103#[cfg(feature = "legacy-scheduler")]
104mod error;
105#[cfg(feature = "legacy-scheduler")]
106mod job;
107#[cfg(feature = "legacy-scheduler")]
108mod scheduler;
109#[cfg(feature = "legacy-scheduler")]
110mod store;
111
112#[cfg(feature = "legacy-scheduler")]
113pub use clock::{Clock, SystemClock};
114#[cfg(feature = "legacy-scheduler")]
115pub use error::{JobError, Result};
116// The legacy `JobCtx` is re-exported as `SchedulerJobCtx` so the
117// queue's `JobCtx` (the one handler authors actually use) can keep
118// the short name.
119#[cfg(feature = "legacy-scheduler")]
120pub use job::{Job, JobCtx as SchedulerJobCtx, Schedule};
121#[cfg(feature = "legacy-scheduler")]
122pub use scheduler::Scheduler;
123#[cfg(feature = "legacy-scheduler")]
124pub use store::{JobStateRecord, JobStore};
125
126// Queue subsystem — runtime + storage trait surface.
127//
128// Consumers (handlers, host setup, IPC) get everything they need
129// from these two re-export groups. Swapping the storage backend
130// later (e.g. Redis) only changes which `Storage`-implementing type
131// the host constructs in `setup()`; nothing in `runtime::*` or in
132// downstream handler crates moves.
133pub use runtime::{
134 AcquireOutcome, CLEANUP_TICK, CMD_EXEC_KIND, CRON_TICK, CleanupReport, CmdExecHandler,
135 CmdExecPayload, CronTickReport, DEFAULT_QUEUE_WORKERS, DEFAULT_RATE_LIMIT_SCOPES,
136 DEFAULT_SHUTDOWN_TIMEOUT, DefaultRouter, HandlerRegistry, JobCtx, JobHandler, JobOutcome,
137 KindPrefixRouter, METRICS_BUCKET_SECS, METRICS_TICK, NOOP_ECHO_KIND, NoopEcho, QUEUES_ENV,
138 QueueHandle, QueueRuntime, REAPER_TICK, REBALANCE_TICK, RateLimiter, Router, WORKER_ID_ENV,
139 WORKER_PREFIX_ENV, WORKER_VERSION_ENV, WorkerPoolConfig, WorkerPoolHandler, cleanup_once,
140 cron_tick_once, ensure_default_rate_limits, ensure_schedules, metrics_roll_once,
141 queues_from_env, reap_stale_jobs, rebalance_once, worker_id_from_env, worker_prefix_from_env,
142 worker_version_from_env,
143};
144#[cfg(feature = "postgres")]
145pub use storage::PostgresStorage;
146pub use storage::{
147 CronScheduleRecord, CronStorage, DeleteOutcome, DrainedSamples, EnqueueOutcome, EnqueueRequest,
148 FinalizeOutcome, HeartbeatStatus, JobId, JobLatency, JobQueue, JobRecord, JobStatus,
149 MetricBucket, NewCronSchedule, NewJob, PROCESS_WIDE_QUEUE, PodRecord, ProcessRecord,
150 ProcessRegistry, QueueConfig, QueueConfigRow, QueueCounts, RateLimitOutcome, RateLimitStorage,
151 SlotAssignment, SqliteStorage, Storage, StorageError, StorageInfo, TimelineEvent,
152 TimelineEventType, metric,
153};
154
155/// Format an error with its full `Error::source()` chain as
156/// `"top: middle: root"`.
157///
158/// `Display` only shows the outermost variant — for thiserror enums with
159/// `#[from]` that drops the cause entirely (e.g. a `GhError::Octocrab`
160/// reads as `"github: <generic>"` instead of `"github: <generic>: 422
161/// Validation Failed: field X required"`). Use this when building the
162/// `JobOutcome::Failed` / `Dead` message and when logging handler
163/// failures so the cause survives into `last_error` and into the CLI.
164#[must_use]
165pub fn format_error_chain(e: &(dyn std::error::Error + 'static)) -> String {
166 let mut out = e.to_string();
167 let mut src = e.source();
168 while let Some(s) = src {
169 out.push_str(": ");
170 out.push_str(&s.to_string());
171 src = s.source();
172 }
173 out
174}
175
176#[cfg(test)]
177mod format_error_chain_tests {
178 use super::format_error_chain;
179
180 #[derive(Debug, thiserror::Error)]
181 #[error("root")]
182 struct Root;
183
184 #[derive(Debug, thiserror::Error)]
185 #[error("middle")]
186 struct Middle(#[from] Root);
187
188 #[derive(Debug, thiserror::Error)]
189 #[error("top")]
190 struct Top(#[from] Middle);
191
192 #[test]
193 fn chain_walks_every_source() {
194 let e = Top(Middle(Root));
195 assert_eq!(format_error_chain(&e), "top: middle: root");
196 }
197
198 #[test]
199 fn chain_returns_just_top_when_no_source() {
200 let e = Root;
201 assert_eq!(format_error_chain(&e), "root");
202 }
203}