Expand description
job is an async, Postgres-backed job scheduler and runner for Rust
applications. It coordinates distributed workers, tracks job history, and
handles retries with predictable backoff. Inspired by earlier systems like
Sidekiq, it focuses on running your
application code asynchronously, outside of request/response paths while keeping business
logic in familiar Rust async functions. The crate uses sqlx for
database access and forbids unsafe.
§Documentation
§Highlights
- Durable Postgres-backed storage so jobs survive restarts and crashes.
- Automatic exponential backoff with jitter, plus opt-in infinite retries.
- Concurrency controls that let many worker instances share the workload,
configurable through
JobPollerConfig. - Optional at-most-one-per-type queueing via
JobSpawner::spawn_unique. - Built-in migrations that you can run automatically or embed into your own migration workflow.
§Core Concepts
- Jobs service –
Jobsowns registration, polling, and shutdown. - Initializer –
JobInitializerregisters a job type and builds aJobRunnerfor each execution. Defines the associatedConfigtype. - Spawner –
JobSpawneris returned from registration and provides type-safe job creation methods. Parameterized by the config type. - Runner –
JobRunnerperforms the work using the providedCurrentJobcontext. - Current job –
CurrentJobexposes attempt counts, execution state, and access to the Postgres pool during a run. - Completion –
JobCompletionreturns the outcome: finish, retry, or reschedule at a later time.
§Lifecycle
- Initialize the service with
Jobs::init - Register initializers with
Jobs::add_initializer– returns aJobSpawner - Start polling with
Jobs::start_poll - Use spawners to create jobs throughout your application
- Shut down gracefully with
Jobs::shutdown
§Example
use async_trait::async_trait;
use job::{
CurrentJob, Job, JobCompletion, JobId, JobInitializer, JobRunner,
JobSpawner, JobSvcConfig, JobType, Jobs,
};
use serde::{Deserialize, Serialize};
// 1. Define your config (serialized to the database)
#[derive(Debug, Serialize, Deserialize)]
struct MyConfig {
value: i32,
}
// 2. Define your initializer
struct MyInitializer;
impl JobInitializer for MyInitializer {
type Config = MyConfig;
fn job_type(&self) -> JobType {
JobType::new("my-job")
}
fn init(&self, job: &Job) -> Result<Box<dyn JobRunner>, Box<dyn std::error::Error>> {
let config: MyConfig = job.config()?;
Ok(Box::new(MyRunner { value: config.value }))
}
}
// 3. Define your runner
struct MyRunner {
value: i32,
}
#[async_trait]
impl JobRunner for MyRunner {
async fn run(
&self,
_current_job: CurrentJob,
) -> Result<JobCompletion, Box<dyn std::error::Error>> {
println!("Processing value: {}", self.value);
Ok(JobCompletion::Complete)
}
}
// 4. Wire it up
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = JobSvcConfig::builder()
.pg_con("postgres://user:pass@localhost/db")
.build()?;
let mut jobs = Jobs::init(config).await?;
// Registration returns a type-safe spawner
let spawner: JobSpawner<MyConfig> = jobs.add_initializer(MyInitializer);
jobs.start_poll().await?;
// Use the spawner to create jobs
spawner.spawn(JobId::new(), MyConfig { value: 42 }).await?;
Ok(())
}§Scheduling
Jobs run immediately once a poller claims them. If you need a future start
time, schedule it up front with JobSpawner::spawn_at_in_op. After a
run completes, return JobCompletion::Complete for one-off work or use the
JobCompletion::Reschedule* variants to book the next execution.
§Retries
Retry behaviour comes from JobInitializer::retry_on_error_settings. Once
attempts are exhausted the job is marked as errored and removed from the
queue.
impl JobInitializer for MyInitializer {
// ...
fn retry_on_error_settings(&self) -> RetrySettings {
RetrySettings {
n_attempts: Some(5),
min_backoff: Duration::from_secs(10),
max_backoff: Duration::from_secs(300),
..Default::default()
}
}
}§Uniqueness
For at-most-one semantics, use JobSpawner::spawn_unique. This method
consumes the spawner, enforcing at the type level that only one job of
this type can exist:
let cleanup_spawner = jobs.add_initializer(CleanupInitializer);
// Consumes spawner - can't accidentally spawn twice
cleanup_spawner.spawn_unique(JobId::new(), CleanupConfig::default()).await?;§Parameterized Job Types
For cases where the job type is configured at runtime (e.g., multi-tenant inboxes), store the job type in your initializer and return it from the instance method:
struct TenantJobInitializer {
job_type: JobType,
tenant_id: String,
}
impl JobInitializer for TenantJobInitializer {
type Config = TenantJobConfig;
fn job_type(&self) -> JobType {
self.job_type.clone() // From instance, not hardcoded
}
// ...
}§Database migrations
See the setup guide for migration options and examples.
§Feature flags
es-entityenables advanced integration with thees_entitycrate, allowing runners to finish withDbOphandles and enriching tracing/event metadata.
§Testing with simulated time
For deterministic testing of time-dependent behavior (e.g., backoff strategies),
inject an artificial clock via JobSvcConfig::clock:
use job::{JobSvcConfig, ClockHandle, ArtificialClockConfig};
let (clock, controller) = ClockHandle::artificial(ArtificialClockConfig::manual());
let config = JobSvcConfig::builder()
.pool(pool)
.clock(clock)
.build()?;
// Advance time deterministically
controller.advance(Duration::from_secs(60)).await;Modules§
- error
- Error type returned by the job service and helpers.
Structs§
- Artificial
Clock Config - Configuration for artificial time.
- Clock
- Global clock access - like
Utc::now()but testable. - Clock
Controller - Controller for artificial time operations.
- Clock
Handle - A handle to a clock for getting time and performing time-based operations.
- Current
Job - Context provided to a
JobRunnerwhile a job is executing. - Job
- Entity capturing immutable job metadata and lifecycle events.
- JobId
- JobPoller
Config - Controls how the background poller balances work across processes.
- JobRegistry
- Keeps track of registered job types and their retry behaviour.
- JobSpawner
- A handle for spawning jobs of a specific type.
- JobSvc
Config - Configuration consumed by
Jobs::init. Build withJobSvcConfig::builder. - JobSvc
Config Builder - Builder for
JobSvcConfig. - JobType
- Identifier describing a job type or class of work.
- Jobs
- Primary entry point for interacting with the Job crate. Provides APIs to register job handlers, manage configuration, and control scheduling and execution.
- Retry
Settings - Controls retry attempt limits, telemetry escalation thresholds, and exponential backoff behaviour.
Use
RetrySettings::n_warn_attemptsto decide how many failures remainWARNevents before escalation. Set it toNoneto keep every retry atWARN.
Enums§
- Artificial
Mode - How artificial time advances.
- JobCompletion
- Result returned by
JobRunner::rundescribing how to progress the job. - JobSvc
Config Builder Error - Error type for JobSvcConfigBuilder
Traits§
- Include
Migrations - Extend an
sqlx::migrate!()call with Job’s migrations. - JobInitializer
- Describes how to construct a
JobRunnerfor a given job type. - JobRunner
- Implemented by job executors that perform the actual work.