Expand description
Sidekiq-style job queue with embedded SQLite and pluggable Postgres.
Domain crate — host-agnostic. Consumers register handlers, enqueue jobs,
and the runtime claims/runs/finalizes them across N worker tasks. The
same code path runs single-process on SQLite (local desktop) or
multi-replica on Postgres (deployed service).
§What it gives you
- Backend-agnostic
Storagetraits (JobQueue,ProcessRegistry,QueueConfig,CronStorage,RateLimitStorage) — one row of indirection between the runtime and the database - Per-queue worker pools, cooperative shutdown, stale-heartbeat reaper
- Cron schedules with lease-elected leader (only one replica fires each tick); same lease gates the retention sweep and metrics roller
- Per-queue exponential backoff with a configurable on/off toggle; the
FailedandThrottledarms both respect it - Cancellation that survives across replicas:
QueueHandle::request_cancelshort-circuits in-process; cross-pod cancels flow through a DB flag the heartbeat tick observes - Cluster-wide rate-limit budget — handlers
acquire("slack")/acquire("gh")against a server-side token bucket; one upstream 429 drains the bucket so sibling pods don’t each fire their own
§Minimal consumer
use std::sync::Arc;
use forge_jobs::storage::{DatabaseConfig, PathsError, QueuePaths};
use forge_jobs::{
DefaultRouter, EnqueueRequest, HandlerRegistry, NoopEcho, QueueRuntime,
};
#[derive(Debug)]
struct EnvPaths;
impl QueuePaths for EnvPaths {
fn config_dir(&self) -> Result<std::path::PathBuf, PathsError> {
Ok("./jobs/config".into())
}
fn data_dir(&self) -> Result<std::path::PathBuf, PathsError> {
Ok("./jobs/data".into())
}
}
let paths = EnvPaths;
let storage = DatabaseConfig::load(&paths)?.open_storage(&paths).await?;
let mut handlers = HandlerRegistry::new();
handlers.register(NoopEcho);
let runtime = QueueRuntime::new(storage, handlers, Arc::new(DefaultRouter))
.with_queues(["default".to_owned()]); // required: which queues this worker runs
runtime.ensure_queue("default", 2).await?;
runtime
.enqueue(
EnqueueRequest::new(forge_jobs::NOOP_ECHO_KIND, serde_json::json!({}))
.on_queue("default"),
)
.await?;
let handle = runtime.start().await?;
// ... handle.shutdown_graceful(...) at exitSee examples/minimal.rs in the crate root for the runnable version.
§Handler cancellation contract
Handlers that take longer than ~1s should periodically check
ctx.cancel.is_cancelled() between .await points. A user click on
the Mission Control “delete” button fires QueueHandle::request_cancel;
the worker’s heartbeat picks up the cross-pod variant via the DB
cancel_requested_at flag within HEARTBEAT_INTERVAL (10s). User-
cancelled jobs route straight to Dead without burning the retry budget.
§Optional features
postgres— enables the Postgres storage adapter. Off by default; service / k8s deploys flip this on.legacy-scheduler— re-exports a smaller cooperative recurring-jobScheduler(withJob,JobStore,Clock,parse_cron, etc.) that predates the queue subsystem. Used by the originating project’s LLM idempotency cache pruning and a few similar internal cron tasks. New code should useQueueRuntimewith a cron schedule instead.
Re-exports§
pub use cron_expr::parse_cron;pub use runtime::AcquireOutcome;pub use runtime::CLEANUP_TICK;pub use runtime::CMD_EXEC_KIND;pub use runtime::CRON_TICK;pub use runtime::CleanupReport;pub use runtime::CmdExecHandler;pub use runtime::CmdExecPayload;pub use runtime::CronTickReport;pub use runtime::DEFAULT_QUEUE_WORKERS;pub use runtime::DEFAULT_RATE_LIMIT_SCOPES;pub use runtime::DEFAULT_SHUTDOWN_TIMEOUT;pub use runtime::DefaultRouter;pub use runtime::HandlerRegistry;pub use runtime::JobCtx;pub use runtime::JobHandler;pub use runtime::JobOutcome;pub use runtime::KindPrefixRouter;pub use runtime::METRICS_BUCKET_SECS;pub use runtime::METRICS_TICK;pub use runtime::NOOP_ECHO_KIND;pub use runtime::NoopEcho;pub use runtime::QUEUES_ENV;pub use runtime::QueueHandle;pub use runtime::QueueRuntime;pub use runtime::REAPER_TICK;pub use runtime::REBALANCE_TICK;pub use runtime::RateLimiter;pub use runtime::Router;pub use runtime::WORKER_ID_ENV;pub use runtime::WORKER_PREFIX_ENV;pub use runtime::WORKER_VERSION_ENV;pub use runtime::WorkerPoolConfig;pub use runtime::WorkerPoolHandler;pub use runtime::cleanup_once;pub use runtime::cron_tick_once;pub use runtime::ensure_default_rate_limits;pub use runtime::ensure_schedules;pub use runtime::metrics_roll_once;pub use runtime::queues_from_env;pub use runtime::reap_stale_jobs;pub use runtime::rebalance_once;pub use runtime::worker_id_from_env;pub use runtime::worker_prefix_from_env;pub use runtime::worker_version_from_env;pub use storage::CronScheduleRecord;pub use storage::CronStorage;pub use storage::DeleteOutcome;pub use storage::DrainedSamples;pub use storage::EnqueueOutcome;pub use storage::EnqueueRequest;pub use storage::FinalizeOutcome;pub use storage::HeartbeatStatus;pub use storage::JobId;pub use storage::JobLatency;pub use storage::JobQueue;pub use storage::JobRecord;pub use storage::JobStatus;pub use storage::MetricBucket;pub use storage::NewCronSchedule;pub use storage::NewJob;pub use storage::PROCESS_WIDE_QUEUE;pub use storage::PodRecord;pub use storage::ProcessRecord;pub use storage::ProcessRegistry;pub use storage::QueueConfig;pub use storage::QueueConfigRow;pub use storage::QueueCounts;pub use storage::RateLimitOutcome;pub use storage::RateLimitStorage;pub use storage::SlotAssignment;pub use storage::SqliteStorage;pub use storage::Storage;pub use storage::StorageError;pub use storage::StorageInfo;pub use storage::TimelineEvent;pub use storage::TimelineEventType;pub use storage::metric;
Modules§
- cron_
expr - 6-field cron-expression parser, shared by the queue’s
runtime::cronand (when thelegacy-schedulerfeature is on) the cooperativeScheduler. - runtime
QueueRuntime— supervisors + workers + reaper + cleanup + cron.- storage
- Backend-agnostic storage layer for the queue subsystem.
Functions§
- format_
error_ chain - Format an error with its full
Error::source()chain as"top: middle: root".