Crate job

Crate job 

Source
Expand description

job is an async, Postgres-backed job scheduler and runner for Rust applications. It coordinates distributed workers, tracks job history, and handles retries with predictable backoff. Inspired by earlier systems like Sidekiq, it focuses on running your application code asynchronously, outside of request/response paths while keeping business logic in familiar Rust async functions. The crate uses sqlx for database access and forbids unsafe.

§Documentation

§Highlights

  • Durable Postgres-backed storage so jobs survive restarts and crashes.
  • Automatic exponential backoff with jitter, plus opt-in infinite retries.
  • Concurrency controls that let many worker instances share the workload, configurable through JobPollerConfig.
  • Optional at-most-one-per-type queueing via JobSpawner::spawn_unique.
  • Built-in migrations that you can run automatically or embed into your own migration workflow.

§Core Concepts

  • Jobs serviceJobs owns registration, polling, and shutdown.
  • InitializerJobInitializer registers a job type and builds a JobRunner for each execution. Defines the associated Config type.
  • SpawnerJobSpawner is returned from registration and provides type-safe job creation methods. Parameterized by the config type.
  • RunnerJobRunner performs the work using the provided CurrentJob context.
  • Current jobCurrentJob exposes attempt counts, execution state, and access to the Postgres pool during a run.
  • CompletionJobCompletion returns the outcome: finish, retry, or reschedule at a later time.

§Lifecycle

  1. Initialize the service with Jobs::init
  2. Register initializers with Jobs::add_initializer – returns a JobSpawner
  3. Start polling with Jobs::start_poll
  4. Use spawners to create jobs throughout your application
  5. Shut down gracefully with Jobs::shutdown

§Example

use async_trait::async_trait;
use job::{
    CurrentJob, Job, JobCompletion, JobId, JobInitializer, JobRunner,
    JobSpawner, JobSvcConfig, JobType, Jobs,
};
use serde::{Deserialize, Serialize};

// 1. Define your config (serialized to the database)
#[derive(Debug, Serialize, Deserialize)]
struct MyConfig {
    value: i32,
}

// 2. Define your initializer
struct MyInitializer;

impl JobInitializer for MyInitializer {
    type Config = MyConfig;

    fn job_type(&self) -> JobType {
        JobType::new("my-job")
    }

    fn init(&self, job: &Job) -> Result<Box<dyn JobRunner>, Box<dyn std::error::Error>> {
        let config: MyConfig = job.config()?;
        Ok(Box::new(MyRunner { value: config.value }))
    }
}

// 3. Define your runner
struct MyRunner {
    value: i32,
}

#[async_trait]
impl JobRunner for MyRunner {
    async fn run(
        &self,
        _current_job: CurrentJob,
    ) -> Result<JobCompletion, Box<dyn std::error::Error>> {
        println!("Processing value: {}", self.value);
        Ok(JobCompletion::Complete)
    }
}

// 4. Wire it up
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = JobSvcConfig::builder()
        .pg_con("postgres://user:pass@localhost/db")
        .build()?;

    let mut jobs = Jobs::init(config).await?;

    // Registration returns a type-safe spawner
    let spawner: JobSpawner<MyConfig> = jobs.add_initializer(MyInitializer);

    jobs.start_poll().await?;

    // Use the spawner to create jobs
    spawner.spawn(JobId::new(), MyConfig { value: 42 }).await?;

    Ok(())
}

§Scheduling

Jobs run immediately once a poller claims them. If you need a future start time, schedule it up front with JobSpawner::spawn_at_in_op. After a run completes, return JobCompletion::Complete for one-off work or use the JobCompletion::Reschedule* variants to book the next execution.

§Retries

Retry behaviour comes from JobInitializer::retry_on_error_settings. Once attempts are exhausted the job is marked as errored and removed from the queue.

impl JobInitializer for MyInitializer {
    // ...

    fn retry_on_error_settings(&self) -> RetrySettings {
        RetrySettings {
            n_attempts: Some(5),
            min_backoff: Duration::from_secs(10),
            max_backoff: Duration::from_secs(300),
            ..Default::default()
        }
    }
}

§Uniqueness

For at-most-one semantics, use JobSpawner::spawn_unique. This method consumes the spawner, enforcing at the type level that only one job of this type can exist:

let cleanup_spawner = jobs.add_initializer(CleanupInitializer);

// Consumes spawner - can't accidentally spawn twice
cleanup_spawner.spawn_unique(JobId::new(), CleanupConfig::default()).await?;

§Parameterized Job Types

For cases where the job type is configured at runtime (e.g., multi-tenant inboxes), store the job type in your initializer and return it from the instance method:

struct TenantJobInitializer {
    job_type: JobType,
    tenant_id: String,
}

impl JobInitializer for TenantJobInitializer {
    type Config = TenantJobConfig;

    fn job_type(&self) -> JobType {
        self.job_type.clone()  // From instance, not hardcoded
    }

    // ...
}

§Database migrations

See the setup guide for migration options and examples.

§Feature flags

  • es-entity enables advanced integration with the es_entity crate, allowing runners to finish with DbOp handles and enriching tracing/event metadata.

§Testing with simulated time

For deterministic testing of time-dependent behavior (e.g., backoff strategies), inject an artificial clock via JobSvcConfig::clock:

use job::{JobSvcConfig, ClockHandle, ArtificialClockConfig};

let (clock, controller) = ClockHandle::artificial(ArtificialClockConfig::manual());
let config = JobSvcConfig::builder()
    .pool(pool)
    .clock(clock)
    .build()?;

// Advance time deterministically
controller.advance(Duration::from_secs(60)).await;

Modules§

error
Error type returned by the job service and helpers.

Structs§

ArtificialClockConfig
Configuration for artificial time.
Clock
Global clock access - like Utc::now() but testable.
ClockController
Controller for artificial time operations.
ClockHandle
A handle to a clock for getting time and performing time-based operations.
CurrentJob
Context provided to a JobRunner while a job is executing.
Job
Entity capturing immutable job metadata and lifecycle events.
JobId
JobPollerConfig
Controls how the background poller balances work across processes.
JobRegistry
Keeps track of registered job types and their retry behaviour.
JobSpawner
A handle for spawning jobs of a specific type.
JobSvcConfig
Configuration consumed by Jobs::init. Build with JobSvcConfig::builder.
JobSvcConfigBuilder
Builder for JobSvcConfig.
JobType
Identifier describing a job type or class of work.
Jobs
Primary entry point for interacting with the Job crate. Provides APIs to register job handlers, manage configuration, and control scheduling and execution.
RetrySettings
Controls retry attempt limits, telemetry escalation thresholds, and exponential backoff behaviour. Use RetrySettings::n_warn_attempts to decide how many failures remain WARN events before escalation. Set it to None to keep every retry at WARN.

Enums§

ArtificialMode
How artificial time advances.
JobCompletion
Result returned by JobRunner::run describing how to progress the job.
JobSvcConfigBuilderError
Error type for JobSvcConfigBuilder

Traits§

IncludeMigrations
Extend an sqlx::migrate!() call with Job’s migrations.
JobInitializer
Describes how to construct a JobRunner for a given job type.
JobRunner
Implemented by job executors that perform the actual work.