Expand description
This crate provides a persisted job queue backed by PostgreSQL with a simple to use interface. Its written as an easy to use async job queue library without macros or generics shenanigans.
Usage boils down to 3 points:
- Implement
Handlertrait - Initialize the queue with a
PgPooland start it - Insert jobs
Default configurations for jobs and queues are.. defaults, so make sure to read through appropriate Job and SimpleQueue documentation.
§Features
- Simple handler interface
- Job scheduling and rescheduling
- Job retry support
- Job crash handling
- Job cancellation
- Job fingerprinting (soft identifier)
- Existing jobs deduplication (unique key with noop on job reinsert - only for live jobs)
- Configurable global and per-queue backoff strategy (linear and exponential provided, custom supported)
- 3 tiered job permits (global permit, per queue permit, handler owned limit)
- Stalled job recovery (heartbeat)
- Archive and DLQ (requires
janitorfeature) - Poison job detection (
reaper) - Wait for job completion (requires
wait-for-jobfeature; oneshot channel notified on first processing attempt, regardless of success or failure)
§Usage
§Handler Implementation
// handlers.rs
use simple_queue::prelude::*;
struct MyHandler {
counter: std::sync::atomic::AtomicUsize,
};
impl Handler for MyHandler {
const QUEUE: &'static str = "my-queue";
async fn process(&self, queue: &SimpleQueue, job: &Job) -> Result<JobResult, BoxDynError> {
self.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Ok(JobResult::Success)
}
}§Queue Initialization
// main.rs
use simple_queue::prelude::*;
#[tokio::main]
async fn main() {
let pool: PgPool = PgPool::connect(PG_URL).await.unwrap();
let queue = Arc::new(SimpleQueue::new(pool));
let handler = MyHandler { counter: std::sync::atomic::AtomicUsize::new(0) };
// Deosn't have to happen before `simple_queue::start`
queue.register_handler(handler);
// Keep clone for insertion
simple_queue::start(queue.clone()).await;
let job = Job::new("my-queue", json!({}));
queue.insert_job(job).await;
}§Thread Structure
┌─────────────┐
│ Entry Point │
└──────┬──────┘
│ spawns
┌─────────────────────┼──────────────────────┐
▼ ▼ ▼
┌────────────────┐ ┌────────────────┐ ┌────────────────┐
│Queue Processor │ │ Janitor │ │ Reaper │
│ / Poller │ └────────────────┘ └────────────────┘
└───────┬────────┘
│
│ wait global permit
▼
┌─────────┐
│ run() │
└────┬────┘
│ job obtained
│
├──────────────────────┐
│ │ spawn first
│ ▼
│ ┌───────────────┐
│ │ Heartbeat │
│ │ Thread │
│ └───────┬───────┘
│ │ ownership
▼ │
wait queue permit │
│ │
▼ │
┌──────────────────────┐ │
│ Job Thread │◄───┘ heartbeat passed in
│ │ (drop job == drop heartbeat)
│ wait handler permit │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Process │ │
│ │ Job │ │
│ └─────────────┘ │
└──────────────────────┘§Features
wait-for-job— Enablesinsert_job_and_wait/insert_jobs_and_waitmethods that return a oneshot receiver notified when the job is first processed (success or failure).janitor— Enables a background janitor task that periodically archives completed jobs and moves failed jobs to a dead-letter queue. Enabled by default.
§Future Work
- Distributed Semaphores using PostgreSQL
- Temporary store on connection loss
- Job templates
- Queue interactor interface
- PG Listener (to decrease job-to-queue latency)
- Job Queue partitioning (dead tuples accumulation prevention)
§Decisions
- JSON for job data for inspectability of DLQ/Archive
- Poll mechanism / non-distributed semaphores as a good enough (for now)
Re-exports§
pub use prelude::*;
Modules§
Functions§
- setup
- Sets up the queue schema in the database.
- setup_
from_ url - Sets up the queue schema in the database using a PostgreSQL URL.
- start
- Queue starting function.
- start_
with_ janitor janitor - start_
without_ janitor - Starts the queue without a janitor task.
Type Aliases§
- BoxDyn
Error - Any error that can be returned by a job handler.