simple_queue/lib.rs
1/*!
2This crate provides a persisted job queue backed by PostgreSQL with a simple to use interface.
3Its written as an easy to use async job queue library without macros or generics shenanigans.
4
5Usage boils down to 3 points:
61. Implement `Handler` trait
72. Initialize the queue with a `PgPool` and start it
83. Insert jobs
9
10Default configurations for jobs and queues are.. defaults, so make sure to read through appropriate Job and SimpleQueue documentation.
11
12## Features
13
14- Simple handler interface
15- Job scheduling and rescheduling
16- Job retry support
17- Job crash handling
18- Job cancellation
19- Job fingerprinting (soft identifier)
20- Existing jobs deduplication (unique key with noop on job reinsert - only for live jobs)
21- Configurable global and per-queue backoff strategy (linear and exponential provided, custom supported)
22- 3 tiered job permits (global permit, per queue permit, handler owned limit)
23- Stalled job recovery (heartbeat)
24- Archive and DLQ (requires `janitor` feature)
25- Poison job detection (`reaper`)
26- Wait for job completion (requires `wait-for-job` feature; oneshot channel notified on first processing attempt, regardless of success or failure)
27
28## Usage
29
30### Handler Implementation
31```no_run
32// handlers.rs
33use simple_queue::prelude::*;
34
35struct MyHandler {
36 counter: std::sync::atomic::AtomicUsize,
37};
38
39impl Handler for MyHandler {
40 const QUEUE: &'static str = "my-queue";
41
42 async fn process(&self, queue: &SimpleQueue, job: &Job) -> Result<JobResult, BoxDynError> {
43 self.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
44 Ok(JobResult::Success)
45 }
46}
47```
48
49### Queue Initialization
50```no_run
51// main.rs
52# use sqlx::PgPool;
53# use std::sync::Arc;
54# use serde_json::json;
55use simple_queue::prelude::*;
56# struct MyHandler { counter: std::sync::atomic::AtomicUsize }
57# impl Handler for MyHandler {
58# const QUEUE: &'static str = "my-queue";
59#
60# async fn process(&self, queue: &SimpleQueue, job: &Job) -> Result<JobResult, BoxDynError> {
61# self.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
62# Ok(JobResult::Success)
63# }
64# }
65
66#[tokio::main]
67async fn main() {
68 # let PG_URL = "postgres://user:password@localhost:5432/dbname";
69 let pool: PgPool = PgPool::connect(PG_URL).await.unwrap();
70 let queue = Arc::new(SimpleQueue::new(pool));
71 let handler = MyHandler { counter: std::sync::atomic::AtomicUsize::new(0) };
72 // Deosn't have to happen before `simple_queue::start`
73 queue.register_handler(handler);
74
75 // Keep clone for insertion
76 simple_queue::start(queue.clone()).await;
77
78 let job = Job::new("my-queue", json!({}));
79 queue.insert_job(job).await;
80}
81```
82
83## Thread Structure
84```text
85 ┌─────────────┐
86 │ Entry Point │
87 └──────┬──────┘
88 │ spawns
89 ┌─────────────────────┼──────────────────────┐
90 ▼ ▼ ▼
91┌────────────────┐ ┌────────────────┐ ┌────────────────┐
92│Queue Processor │ │ Janitor │ │ Reaper │
93│ / Poller │ └────────────────┘ └────────────────┘
94└───────┬────────┘
95 │
96 │ wait global permit
97 ▼
98 ┌─────────┐
99 │ run() │
100 └────┬────┘
101 │ job obtained
102 │
103 ├──────────────────────┐
104 │ │ spawn first
105 │ ▼
106 │ ┌───────────────┐
107 │ │ Heartbeat │
108 │ │ Thread │
109 │ └───────┬───────┘
110 │ │ ownership
111 ▼ │
112 wait queue permit │
113 │ │
114 ▼ │
115 ┌──────────────────────┐ │
116 │ Job Thread │◄───┘ heartbeat passed in
117 │ │ (drop job == drop heartbeat)
118 │ wait handler permit │
119 │ │ │
120 │ ▼ │
121 │ ┌─────────────┐ │
122 │ │ Process │ │
123 │ │ Job │ │
124 │ └─────────────┘ │
125 └──────────────────────┘
126```
127
128## Features
129
130- `wait-for-job` — Enables `insert_job_and_wait` / `insert_jobs_and_wait` methods that return a oneshot receiver notified when the job is first processed (success or failure).
131- `janitor` — Enables a background janitor task that periodically archives completed jobs and moves failed jobs to a dead-letter queue. Enabled by default.
132
133## Future Work
134
135- Distributed Semaphores using PostgreSQL
136- Temporary store on connection loss
137- Job templates
138- Queue interactor interface
139- PG Listener (to decrease job-to-queue latency)
140- Job Queue partitioning (dead tuples accumulation prevention)
141
142## Decisions
143- JSON for job data for inspectability of DLQ/Archive
144- Poll mechanism / non-distributed semaphores as a good enough (for now)
145
146*/
147#![cfg_attr(docsrs, feature(doc_cfg))]
148
149use std::sync::Arc;
150
151use sqlx::PgPool;
152use tokio::sync::Semaphore;
153
154mod handler;
155pub(crate) mod heartbeat;
156#[cfg_attr(docsrs, doc(cfg(feature = "janitor")))]
157#[cfg(feature = "janitor")]
158mod janitor;
159mod job;
160pub mod queue;
161pub mod reaper;
162mod result;
163pub mod sync;
164
165/// Any error that can be returned by a job handler.
166pub type BoxDynError = Box<dyn std::error::Error + 'static + Send + Sync>;
167pub mod prelude {
168 pub use crate::BoxDynError;
169 pub use crate::handler::Handler;
170 pub use crate::job::Job;
171 pub use crate::job::JobExt;
172 pub use crate::queue::SimpleQueue;
173 pub use crate::result::JobResult;
174}
175
176pub use prelude::*;
177/// Sets up the queue schema in the database.
178///
179/// It's recommended to run this once during application startup.
180///
181/// Note: Separation from the main database is recommended but not required.
182pub async fn setup(pool: &PgPool) -> Result<(), BoxDynError> {
183 sqlx::raw_sql(include_str!("../migrations/0001_queue_init.sql"))
184 .execute(pool)
185 .await?;
186 Ok(())
187}
188/// Sets up the queue schema in the database using a PostgreSQL URL.
189pub async fn setup_from_url(url: &str) -> Result<(), BoxDynError> {
190 let pool = sqlx::PgPool::connect(url).await?;
191 sqlx::raw_sql(include_str!("../migrations/0001_queue_init.sql"))
192 .execute(&pool)
193 .await?;
194 Ok(())
195}
196
197/// Queue starting function.
198///
199/// It starts:
200/// - A reaper task that reclaims stalled jobs
201/// - A janitor task that periodically checks the queue, archives old jobs and moves errored jobs to a dead queue
202///
203/// It returns a [`tokio::task::JoinSet`] that can be used to wait for the tasks to complete,
204/// and it does so only AFTER the queue has started polling.
205pub async fn start(jq: Arc<SimpleQueue>) -> tokio::task::JoinSet<()> {
206 #[cfg(feature = "janitor")]
207 {
208 start_with_janitor(jq).await
209 }
210 #[cfg(not(feature = "janitor"))]
211 {
212 start_without_janitor(jq).await
213 }
214}
215#[cfg(feature = "janitor")]
216pub async fn start_with_janitor(jq: Arc<SimpleQueue>) -> tokio::task::JoinSet<()> {
217 tracing::info!("Starting queue with reaper and janitor");
218 let mut joinset = tokio::task::JoinSet::new();
219 let mut reaper: reaper::Reaper = jq.reaper().await;
220 let mut janitor: janitor::Janitor = jq.janitor().await;
221 let _reaper_h = joinset.spawn(async move {
222 reaper.run().await;
223 });
224 let _janitor_h = joinset.spawn(async move {
225 janitor.run().await;
226 });
227 let start_semaphore = Arc::new(Semaphore::new(1));
228 let queue_start_permit = start_semaphore
229 .clone()
230 .acquire_owned()
231 .await
232 .expect("failed to acquire queue start permit from a just created semaphore");
233 let _queue_h = joinset.spawn(async move {
234 let _ = jq.run(Some(queue_start_permit)).await;
235 });
236 joinset
237}
238
239/// Starts the queue without a janitor task.
240///
241/// For cases when you don't need a janitor task, i.e. completed and error queues should stay in the same table.
242/// Useful for development and testing.
243///
244/// It returns a [`tokio::task::JoinSet`] that can be used to wait for the tasks to complete,
245/// and it does so only AFTER the queue has started polling.
246pub async fn start_without_janitor(jq: Arc<SimpleQueue>) -> tokio::task::JoinSet<()> {
247 let mut joinset = tokio::task::JoinSet::new();
248 let mut reaper: reaper::Reaper = jq.reaper().await;
249 let _reaper_h = joinset.spawn(async move {
250 reaper.run().await;
251 });
252 let start_semaphore = Arc::new(Semaphore::new(1));
253 let queue_start_permit = start_semaphore
254 .clone()
255 .acquire_owned()
256 .await
257 .expect("failed to acquire queue start permit from a just created semaphore");
258 let _queue_h = joinset.spawn(async move {
259 let _ = jq.run(Some(queue_start_permit)).await;
260 });
261 let _ = start_semaphore.acquire().await.unwrap();
262 joinset
263}