Skip to main content

Crate forge_jobs

Crate forge_jobs 

Source
Expand description

Sidekiq-style job queue with embedded SQLite and pluggable Postgres.

Domain crate — host-agnostic. Consumers register handlers, enqueue jobs, and the runtime claims/runs/finalizes them across N worker tasks. The same code path runs single-process on SQLite (local desktop) or multi-replica on Postgres (deployed service).

§What it gives you

  • Backend-agnostic Storage traits (JobQueue, ProcessRegistry, QueueConfig, CronStorage, RateLimitStorage) — one row of indirection between the runtime and the database
  • Per-queue worker pools, cooperative shutdown, stale-heartbeat reaper
  • Cron schedules with lease-elected leader (only one replica fires each tick); same lease gates the retention sweep and metrics roller
  • Per-queue exponential backoff with a configurable on/off toggle; the Failed and Throttled arms both respect it
  • Cancellation that survives across replicas: QueueHandle::request_cancel short-circuits in-process; cross-pod cancels flow through a DB flag the heartbeat tick observes
  • Cluster-wide rate-limit budget — handlers acquire("slack") / acquire("gh") against a server-side token bucket; one upstream 429 drains the bucket so sibling pods don’t each fire their own

§Minimal consumer

use std::sync::Arc;
use forge_jobs::storage::{DatabaseConfig, PathsError, QueuePaths};
use forge_jobs::{
    DefaultRouter, EnqueueRequest, HandlerRegistry, NoopEcho, QueueRuntime,
};

#[derive(Debug)]
struct EnvPaths;
impl QueuePaths for EnvPaths {
    fn config_dir(&self) -> Result<std::path::PathBuf, PathsError> {
        Ok("./jobs/config".into())
    }
    fn data_dir(&self) -> Result<std::path::PathBuf, PathsError> {
        Ok("./jobs/data".into())
    }
}

let paths = EnvPaths;
let storage = DatabaseConfig::load(&paths)?.open_storage(&paths).await?;
let mut handlers = HandlerRegistry::new();
handlers.register(NoopEcho);
let runtime = QueueRuntime::new(storage, handlers, Arc::new(DefaultRouter))
    .with_queues(["default".to_owned()]); // required: which queues this worker runs
runtime.ensure_queue("default", 2).await?;
runtime
    .enqueue(
        EnqueueRequest::new(forge_jobs::NOOP_ECHO_KIND, serde_json::json!({}))
            .on_queue("default"),
    )
    .await?;
let handle = runtime.start().await?;
// ... handle.shutdown_graceful(...) at exit

See examples/minimal.rs in the crate root for the runnable version.

§Handler cancellation contract

Handlers that take longer than ~1s should periodically check ctx.cancel.is_cancelled() between .await points. A user click on the Mission Control “delete” button fires QueueHandle::request_cancel; the worker’s heartbeat picks up the cross-pod variant via the DB cancel_requested_at flag within HEARTBEAT_INTERVAL (10s). User- cancelled jobs route straight to Dead without burning the retry budget.

§Optional features

  • postgres — enables the Postgres storage adapter. Off by default; service / k8s deploys flip this on.
  • legacy-scheduler — re-exports a smaller cooperative recurring-job Scheduler (with Job, JobStore, Clock, parse_cron, etc.) that predates the queue subsystem. Used by the originating project’s LLM idempotency cache pruning and a few similar internal cron tasks. New code should use QueueRuntime with a cron schedule instead.

Re-exports§

pub use cron_expr::parse_cron;
pub use runtime::AcquireOutcome;
pub use runtime::CLEANUP_TICK;
pub use runtime::CMD_EXEC_KIND;
pub use runtime::CRON_TICK;
pub use runtime::CleanupReport;
pub use runtime::CmdExecHandler;
pub use runtime::CmdExecPayload;
pub use runtime::CronTickReport;
pub use runtime::DEFAULT_QUEUE_WORKERS;
pub use runtime::DEFAULT_RATE_LIMIT_SCOPES;
pub use runtime::DEFAULT_SHUTDOWN_TIMEOUT;
pub use runtime::DefaultRouter;
pub use runtime::HandlerRegistry;
pub use runtime::JobCtx;
pub use runtime::JobHandler;
pub use runtime::JobOutcome;
pub use runtime::KindPrefixRouter;
pub use runtime::METRICS_BUCKET_SECS;
pub use runtime::METRICS_TICK;
pub use runtime::NOOP_ECHO_KIND;
pub use runtime::NoopEcho;
pub use runtime::QUEUES_ENV;
pub use runtime::QueueHandle;
pub use runtime::QueueRuntime;
pub use runtime::REAPER_TICK;
pub use runtime::REBALANCE_TICK;
pub use runtime::RateLimiter;
pub use runtime::Router;
pub use runtime::WORKER_ID_ENV;
pub use runtime::WORKER_PREFIX_ENV;
pub use runtime::WORKER_VERSION_ENV;
pub use runtime::WorkerPoolConfig;
pub use runtime::WorkerPoolHandler;
pub use runtime::cleanup_once;
pub use runtime::cron_tick_once;
pub use runtime::ensure_default_rate_limits;
pub use runtime::ensure_schedules;
pub use runtime::metrics_roll_once;
pub use runtime::queues_from_env;
pub use runtime::reap_stale_jobs;
pub use runtime::rebalance_once;
pub use runtime::worker_id_from_env;
pub use runtime::worker_prefix_from_env;
pub use runtime::worker_version_from_env;
pub use storage::CronScheduleRecord;
pub use storage::CronStorage;
pub use storage::DeleteOutcome;
pub use storage::DrainedSamples;
pub use storage::EnqueueOutcome;
pub use storage::EnqueueRequest;
pub use storage::FinalizeOutcome;
pub use storage::HeartbeatStatus;
pub use storage::JobId;
pub use storage::JobLatency;
pub use storage::JobQueue;
pub use storage::JobRecord;
pub use storage::JobStatus;
pub use storage::MetricBucket;
pub use storage::NewCronSchedule;
pub use storage::NewJob;
pub use storage::PROCESS_WIDE_QUEUE;
pub use storage::PodRecord;
pub use storage::ProcessRecord;
pub use storage::ProcessRegistry;
pub use storage::QueueConfig;
pub use storage::QueueConfigRow;
pub use storage::QueueCounts;
pub use storage::RateLimitOutcome;
pub use storage::RateLimitStorage;
pub use storage::SlotAssignment;
pub use storage::SqliteStorage;
pub use storage::Storage;
pub use storage::StorageError;
pub use storage::StorageInfo;
pub use storage::TimelineEvent;
pub use storage::TimelineEventType;
pub use storage::metric;

Modules§

cron_expr
6-field cron-expression parser, shared by the queue’s runtime::cron and (when the legacy-scheduler feature is on) the cooperative Scheduler.
runtime
QueueRuntime — supervisors + workers + reaper + cleanup + cron.
storage
Backend-agnostic storage layer for the queue subsystem.

Functions§

format_error_chain
Format an error with its full Error::source() chain as "top: middle: root".