Skip to main content

simple_queue/
lib.rs

1/*!
2This crate provides a persisted job queue backed by PostgreSQL with a simple to use interface.
3Its written as an easy to use async job queue library without macros or generics shenanigans.
4
5Usage boils down to 3 points:
61. Implement `Handler` trait
72. Initialize the queue with a `PgPool` and start it
83. Insert jobs
9
10Default configurations for jobs and queues are.. defaults, so make sure to read through appropriate Job and SimpleQueue documentation.
11
12## Features
13
14- Simple handler interface
15- Job scheduling and rescheduling
16- Job retry support
17- Job crash handling
18- Job cancellation
19- Job fingerprinting (soft identifier)
20- Existing jobs deduplication (unique key with noop on job reinsert - only for live jobs)
21- Configurable global and per-queue backoff strategy (linear and exponential provided, custom supported)
22- 3 tiered job permits (global permit, per queue permit, handler owned limit)
23- Stalled job recovery (heartbeat)
24- Archive and DLQ (requires `janitor` feature)
25- Poison job detection (`reaper`)
26- Wait for job completion (requires `wait-for-job` feature; oneshot channel notified on first processing attempt, regardless of success or failure)
27
28## Usage
29
30### Handler Implementation
31```no_run
32// handlers.rs
33use simple_queue::prelude::*;
34
35struct MyHandler {
36    counter: std::sync::atomic::AtomicUsize,
37};
38
39impl Handler for MyHandler {
40    const QUEUE: &'static str = "my-queue";
41
42    async fn process(&self, queue: &SimpleQueue, job: &Job) -> Result<JobResult, BoxDynError> {
43        self.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
44        Ok(JobResult::Success)
45    }
46}
47```
48
49### Queue Initialization
50```no_run
51// main.rs
52# use sqlx::PgPool;
53# use std::sync::Arc;
54# use serde_json::json;
55use simple_queue::prelude::*;
56# struct MyHandler { counter: std::sync::atomic::AtomicUsize }
57# impl Handler for MyHandler {
58#     const QUEUE: &'static str = "my-queue";
59#
60#     async fn process(&self, queue: &SimpleQueue, job: &Job) -> Result<JobResult, BoxDynError> {
61#         self.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
62#         Ok(JobResult::Success)
63#     }
64# }
65
66#[tokio::main]
67async fn main() {
68    # let PG_URL = "postgres://user:password@localhost:5432/dbname";
69    let pool: PgPool = PgPool::connect(PG_URL).await.unwrap();
70    let queue = Arc::new(SimpleQueue::new(pool));
71    let handler = MyHandler { counter: std::sync::atomic::AtomicUsize::new(0) };
72    // Deosn't have to happen before `simple_queue::start`
73    queue.register_handler(handler);
74
75    //  Keep clone for insertion
76    simple_queue::start(queue.clone()).await;
77
78    let job = Job::new("my-queue", json!({}));
79    queue.insert_job(job).await;
80}
81```
82
83## Thread Structure
84```text
85                        ┌─────────────┐
86                        │ Entry Point │
87                        └──────┬──────┘
88                               │ spawns
89         ┌─────────────────────┼──────────────────────┐
90         ▼                     ▼                      ▼
91┌────────────────┐    ┌────────────────┐    ┌────────────────┐
92│Queue Processor │    │    Janitor     │    │     Reaper     │
93│   / Poller     │    └────────────────┘    └────────────────┘
94└───────┬────────┘
9596        │ wait global permit
9798   ┌─────────┐
99   │  run()  │
100   └────┬────┘
101        │ job obtained
102103        ├──────────────────────┐
104        │                      │ spawn first
105        │                      ▼
106        │              ┌───────────────┐
107        │              │   Heartbeat   │
108        │              │    Thread     │
109        │              └───────┬───────┘
110        │                      │ ownership
111        ▼                      │
112   wait queue permit           │
113        │                      │
114        ▼                      │
115   ┌──────────────────────┐    │
116   │      Job Thread      │◄───┘ heartbeat passed in
117   │                      │      (drop job == drop heartbeat)
118   │  wait handler permit │
119   │         │            │
120   │         ▼            │
121   │  ┌─────────────┐     │
122   │  │   Process   │     │
123   │  │    Job      │     │
124   │  └─────────────┘     │
125   └──────────────────────┘
126```
127
128## Features
129
130- `wait-for-job` — Enables `insert_job_and_wait` / `insert_jobs_and_wait` methods that return a oneshot receiver notified when the job is first processed (success or failure).
131- `janitor` — Enables a background janitor task that periodically archives completed jobs and moves failed jobs to a dead-letter queue. Enabled by default.
132
133## Future Work
134
135- Distributed Semaphores using PostgreSQL
136- Temporary store on connection loss
137- Job templates
138- Queue interactor interface
139- PG Listener (to decrease job-to-queue latency)
140- Job Queue partitioning (dead tuples accumulation prevention)
141
142## Decisions
143- JSON for job data for inspectability of DLQ/Archive
144- Poll mechanism / non-distributed semaphores as a good enough (for now)
145
146*/
147#![cfg_attr(docsrs, feature(doc_cfg))]
148
149use std::sync::Arc;
150
151use sqlx::PgPool;
152use tokio::sync::Semaphore;
153
154mod handler;
155pub(crate) mod heartbeat;
156#[cfg_attr(docsrs, doc(cfg(feature = "janitor")))]
157#[cfg(feature = "janitor")]
158mod janitor;
159mod job;
160pub mod queue;
161pub mod reaper;
162mod result;
163pub mod sync;
164
165/// Any error that can be returned by a job handler.
166pub type BoxDynError = Box<dyn std::error::Error + 'static + Send + Sync>;
167pub mod prelude {
168    pub use crate::BoxDynError;
169    pub use crate::handler::Handler;
170    pub use crate::job::Job;
171    pub use crate::job::JobExt;
172    pub use crate::queue::SimpleQueue;
173    pub use crate::result::JobResult;
174}
175
176pub use prelude::*;
177/// Sets up the queue schema in the database.
178///
179/// It's recommended to run this once during application startup.
180///
181/// Note: Separation from the main database is recommended but not required.
182pub async fn setup(pool: &PgPool) -> Result<(), BoxDynError> {
183    sqlx::raw_sql(include_str!("../migrations/0001_queue_init.sql"))
184        .execute(pool)
185        .await?;
186    Ok(())
187}
188/// Sets up the queue schema in the database using a PostgreSQL URL.
189pub async fn setup_from_url(url: &str) -> Result<(), BoxDynError> {
190    let pool = sqlx::PgPool::connect(url).await?;
191    sqlx::raw_sql(include_str!("../migrations/0001_queue_init.sql"))
192        .execute(&pool)
193        .await?;
194    Ok(())
195}
196
197/// Queue starting function.
198///
199/// It starts:
200/// - A reaper task that reclaims stalled jobs
201/// - A janitor task that periodically checks the queue, archives old jobs and moves errored jobs to a dead queue
202///
203/// It returns a [`tokio::task::JoinSet`] that can be used to wait for the tasks to complete,
204/// and it does so only AFTER the queue has started polling.
205pub async fn start(jq: Arc<SimpleQueue>) -> tokio::task::JoinSet<()> {
206    #[cfg(feature = "janitor")]
207    {
208        start_with_janitor(jq).await
209    }
210    #[cfg(not(feature = "janitor"))]
211    {
212        start_without_janitor(jq).await
213    }
214}
215#[cfg(feature = "janitor")]
216pub async fn start_with_janitor(jq: Arc<SimpleQueue>) -> tokio::task::JoinSet<()> {
217    tracing::info!("Starting queue with reaper and janitor");
218    let mut joinset = tokio::task::JoinSet::new();
219    let mut reaper: reaper::Reaper = jq.reaper().await;
220    let mut janitor: janitor::Janitor = jq.janitor().await;
221    let _reaper_h = joinset.spawn(async move {
222        reaper.run().await;
223    });
224    let _janitor_h = joinset.spawn(async move {
225        janitor.run().await;
226    });
227    let start_semaphore = Arc::new(Semaphore::new(1));
228    let queue_start_permit = start_semaphore
229        .clone()
230        .acquire_owned()
231        .await
232        .expect("failed to acquire queue start permit from a just created semaphore");
233    let _queue_h = joinset.spawn(async move {
234        let _ = jq.run(Some(queue_start_permit)).await;
235    });
236    joinset
237}
238
239/// Starts the queue without a janitor task.
240///
241/// For cases when you don't need a janitor task, i.e. completed and error queues should stay in the same table.
242/// Useful for development and testing.
243///
244/// It returns a [`tokio::task::JoinSet`] that can be used to wait for the tasks to complete,
245/// and it does so only AFTER the queue has started polling.
246pub async fn start_without_janitor(jq: Arc<SimpleQueue>) -> tokio::task::JoinSet<()> {
247    let mut joinset = tokio::task::JoinSet::new();
248    let mut reaper: reaper::Reaper = jq.reaper().await;
249    let _reaper_h = joinset.spawn(async move {
250        reaper.run().await;
251    });
252    let start_semaphore = Arc::new(Semaphore::new(1));
253    let queue_start_permit = start_semaphore
254        .clone()
255        .acquire_owned()
256        .await
257        .expect("failed to acquire queue start permit from a just created semaphore");
258    let _queue_h = joinset.spawn(async move {
259        let _ = jq.run(Some(queue_start_permit)).await;
260    });
261    let _ = start_semaphore.acquire().await.unwrap();
262    joinset
263}