sqlxmq/lib.rs
1#![deny(missing_docs, unsafe_code)]
2//! # sqlxmq
3//!
4//! A job queue built on `sqlx` and `PostgreSQL`.
5//!
6//! This library allows a CRUD application to run background jobs without complicating its
7//! deployment. The only runtime dependency is `PostgreSQL`, so this is ideal for applications
8//! already using a `PostgreSQL` database.
9//!
10//! Although using a SQL database as a job queue means compromising on latency of
11//! delivered jobs, there are several show-stopping issues present in ordinary job
12//! queues which are avoided altogether.
13//!
14//! With most other job queues, in-flight jobs are state that is not covered by normal
15//! database backups. Even if jobs _are_ backed up, there is no way to restore both
16//! a database and a job queue to a consistent point-in-time without manually
17//! resolving conflicts.
18//!
19//! By storing jobs in the database, existing backup procedures will store a perfectly
20//! consistent state of both in-flight jobs and persistent data. Additionally, jobs can
21//! be spawned and completed as part of other transactions, making it easy to write correct
22//! application code.
23//!
24//! Leveraging the power of `PostgreSQL`, this job queue offers several features not
25//! present in other job queues.
26//!
27//! # Features
28//!
29//! - **Send/receive multiple jobs at once.**
30//!
31//! This reduces the number of queries to the database.
32//!
33//! - **Send jobs to be executed at a future date and time.**
34//!
35//! Avoids the need for a separate scheduling system.
36//!
37//! - **Reliable delivery of jobs.**
38//!
39//! - **Automatic retries with exponential backoff.**
40//!
41//! Number of retries and initial backoff parameters are configurable.
42//!
43//! - **Transactional sending of jobs.**
44//!
45//! Avoids sending spurious jobs if a transaction is rolled back.
46//!
47//! - **Transactional completion of jobs.**
48//!
49//! If all side-effects of a job are updates to the database, this provides
50//! true exactly-once execution of jobs.
51//!
52//! - **Transactional check-pointing of jobs.**
53//!
54//! Long-running jobs can check-point their state to avoid having to restart
55//! from the beginning if there is a failure: the next retry can continue
56//! from the last check-point.
57//!
58//! - **Opt-in strictly ordered job delivery.**
59//!
60//! Jobs within the same channel will be processed strictly in-order
61//! if this option is enabled for the job.
62//!
63//! - **Fair job delivery.**
64//!
65//! A channel with a lot of jobs ready to run will not starve a channel with fewer
66//! jobs.
67//!
68//! - **Opt-in two-phase commit.**
69//!
70//! This is particularly useful on an ordered channel where a position can be "reserved"
71//! in the job order, but not committed until later.
72//!
73//! - **JSON and/or binary payloads.**
74//!
75//! Jobs can use whichever is most convenient.
76//!
77//! - **Automatic keep-alive of jobs.**
78//!
79//! Long-running jobs will automatically be "kept alive" to prevent them being
80//! retried whilst they're still ongoing.
81//!
82//! - **Concurrency limits.**
83//!
84//! Specify the minimum and maximum number of concurrent jobs each runner should
85//! handle.
86//!
87//! - **Built-in job registry via an attribute macro.**
88//!
89//! Jobs can be easily registered with a runner, and default configuration specified
90//! on a per-job basis.
91//!
92//! - **Implicit channels.**
93//!
94//! Channels are implicitly created and destroyed when jobs are sent and processed,
95//! so no setup is required.
96//!
97//! - **Channel groups.**
98//!
99//! Easily subscribe to multiple channels at once, thanks to the separation of
100//! channel name and channel arguments.
101//!
102//! - **NOTIFY-based polling.**
103//!
104//! This saves resources when few jobs are being processed.
105//!
106//! # Getting started
107//!
108//! ## Database schema
109//!
110//! This crate expects certain database tables and stored procedures to exist.
111//! You can copy the migration files from this crate into your own migrations
112//! folder.
113//!
114//! All database items created by this crate are prefixed with `mq`, so as not
115//! to conflict with your own schema.
116//!
117//! ## Defining jobs
118//!
119//! The first step is to define a function to be run on the job queue.
120//!
121//! ```rust
122//! use std::error::Error;
123//!
124//! use sqlxmq::{job, CurrentJob};
125//!
126//! // Arguments to the `#[job]` attribute allow setting default job options.
127//! #[job(channel_name = "foo")]
128//! async fn example_job(
129//! // The first argument should always be the current job.
130//! mut current_job: CurrentJob,
131//! // Additional arguments are optional, but can be used to access context
132//! // provided via [`JobRegistry::set_context`].
133//! message: &'static str,
134//! ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
135//! // Decode a JSON payload
136//! let who: Option<String> = current_job.json()?;
137//!
138//! // Do some work
139//! println!("{}, {}!", message, who.as_deref().unwrap_or("world"));
140//!
141//! // Mark the job as complete
142//! current_job.complete().await?;
143//!
144//! Ok(())
145//! }
146//! ```
147//!
148//! ## Listening for jobs
149//!
150//! Next we need to create a job runner: this is what listens for new jobs
151//! and executes them.
152//!
153//! ```rust,no_run
154//! use std::error::Error;
155//!
156//! use sqlxmq::JobRegistry;
157//!
158//! # use sqlxmq::{job, CurrentJob};
159//! #
160//! # #[job]
161//! # async fn example_job(
162//! # current_job: CurrentJob,
163//! # ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> { Ok(()) }
164//! #
165//! # async fn connect_to_db() -> sqlx::Result<sqlx::Pool<sqlx::Postgres>> {
166//! # unimplemented!()
167//! # }
168//!
169//! #[tokio::main]
170//! async fn main() -> Result<(), Box<dyn Error>> {
171//! // You'll need to provide a Postgres connection pool.
172//! let pool = connect_to_db().await?;
173//!
174//! // Construct a job registry from our single job.
175//! let mut registry = JobRegistry::new(&[example_job]);
176//! // Here is where you can configure the registry
177//! // registry.set_error_handler(...)
178//!
179//! // And add context
180//! registry.set_context("Hello");
181//!
182//! let runner = registry
183//! // Create a job runner using the connection pool.
184//! .runner(&pool)
185//! // Here is where you can configure the job runner
186//! // Aim to keep 10-20 jobs running at a time.
187//! .set_concurrency(10, 20)
188//! // Start the job runner in the background.
189//! .run()
190//! .await?;
191//!
192//! // The job runner will continue listening and running
193//! // jobs until `runner` is dropped.
194//! Ok(())
195//! }
196//! ```
197//!
198//! ## Spawning a job
199//!
200//! The final step is to actually run a job.
201//!
202//! ```rust
203//! # use std::error::Error;
204//! # use sqlxmq::{job, CurrentJob};
205//! #
206//! # #[job]
207//! # async fn example_job(
208//! # current_job: CurrentJob,
209//! # ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> { Ok(()) }
210//! #
211//! # async fn example(
212//! # pool: sqlx::Pool<sqlx::Postgres>
213//! # ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
214//! example_job.builder()
215//! // This is where we can override job configuration
216//! .set_channel_name("bar")
217//! .set_json("John")?
218//! .spawn(&pool)
219//! .await?;
220//! # Ok(())
221//! # }
222//! ```
223
224#[doc(hidden)]
225pub mod hidden;
226mod registry;
227mod runner;
228mod spawn;
229mod utils;
230
231pub use registry::*;
232pub use runner::*;
233pub use spawn::*;
234pub use sqlxmq_macros::job;
235pub use utils::OwnedHandle;
236
237/// Helper function to determine if a particular error condition is retryable.
238///
239/// For best results, database operations should be automatically retried if one
240/// of these errors is returned.
241pub fn should_retry(error: &sqlx::Error) -> bool {
242 if let Some(db_error) = error.as_database_error() {
243 // It's more readable as a match
244 #[allow(clippy::match_like_matches_macro)]
245 match (db_error.code().as_deref(), db_error.constraint()) {
246 // Foreign key constraint violation on ordered channel
247 (Some("23503"), Some("mq_msgs_after_message_id_fkey")) => true,
248 // Unique constraint violation on ordered channel
249 (Some("23505"), Some("mq_msgs_channel_name_channel_args_after_message_id_idx")) => true,
250 // Serialization failure
251 (Some("40001"), _) => true,
252 // Deadlock detected
253 (Some("40P01"), _) => true,
254 // Other
255 _ => false,
256 }
257 } else {
258 false
259 }
260}
261
262#[cfg(test)]
263mod tests {
264 use super::*;
265 use crate as sqlxmq;
266
267 use std::env;
268 use std::error::Error;
269 use std::future::Future;
270 use std::ops::Deref;
271 use std::sync::atomic::{AtomicUsize, Ordering};
272 use std::sync::{Arc, Once};
273 use std::time::Duration;
274
275 use futures::channel::mpsc;
276 use futures::StreamExt;
277 use sqlx::{Pool, Postgres};
278 use tokio::sync::{Mutex, MutexGuard};
279 use tokio::task;
280
281 // field 0 is never read, but its drop is important
282 #[allow(dead_code)]
283 struct TestGuard<T>(MutexGuard<'static, ()>, T);
284
285 impl<T> Deref for TestGuard<T> {
286 type Target = T;
287
288 fn deref(&self) -> &T {
289 &self.1
290 }
291 }
292
293 async fn test_pool() -> TestGuard<Pool<Postgres>> {
294 static INIT_LOGGER: Once = Once::new();
295 static TEST_MUTEX: Mutex<()> = Mutex::const_new(());
296
297 let guard = TEST_MUTEX.lock().await;
298
299 let _ = dotenvy::dotenv();
300
301 INIT_LOGGER.call_once(pretty_env_logger::init);
302
303 let pool = Pool::connect(&env::var("DATABASE_URL").unwrap())
304 .await
305 .unwrap();
306
307 sqlx::query("TRUNCATE TABLE mq_payloads")
308 .execute(&pool)
309 .await
310 .unwrap();
311 sqlx::query("DELETE FROM mq_msgs WHERE id != uuid_nil()")
312 .execute(&pool)
313 .await
314 .unwrap();
315
316 TestGuard(guard, pool)
317 }
318
319 async fn test_job_runner<F: Future + Send + 'static>(
320 pool: &Pool<Postgres>,
321 f: impl (Fn(CurrentJob) -> F) + Send + Sync + 'static,
322 ) -> (JobRunnerHandle, Arc<AtomicUsize>)
323 where
324 F::Output: Send + 'static,
325 {
326 let counter = Arc::new(AtomicUsize::new(0));
327 let counter2 = counter.clone();
328 let runner = JobRunnerOptions::new(pool, move |job| {
329 counter2.fetch_add(1, Ordering::SeqCst);
330 task::spawn(f(job));
331 })
332 .run()
333 .await
334 .unwrap();
335 (runner, counter)
336 }
337
338 fn job_proto<'a, 'b>(builder: &'a mut JobBuilder<'b>) -> &'a mut JobBuilder<'b> {
339 builder.set_channel_name("bar")
340 }
341
342 #[job(channel_name = "foo", ordered, retries = 3, backoff_secs = 2.0)]
343 async fn example_job1(
344 mut current_job: CurrentJob,
345 ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
346 current_job.complete().await?;
347 Ok(())
348 }
349
350 #[job(proto(job_proto))]
351 async fn example_job2(
352 mut current_job: CurrentJob,
353 ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
354 current_job.complete().await?;
355 Ok(())
356 }
357
358 #[job]
359 async fn example_job_with_ctx(
360 mut current_job: CurrentJob,
361 ctx1: i32,
362 ctx2: &'static str,
363 ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
364 assert_eq!(ctx1, 42);
365 assert_eq!(ctx2, "Hello, world!");
366 current_job.complete().await?;
367 Ok(())
368 }
369
370 async fn named_job_runner(pool: &Pool<Postgres>) -> JobRunnerHandle {
371 let mut registry = JobRegistry::new(&[example_job1, example_job2, example_job_with_ctx]);
372 registry.set_context(42).set_context("Hello, world!");
373 registry.runner(pool).run().await.unwrap()
374 }
375
376 fn is_ci() -> bool {
377 std::env::var("CI").ok().is_some()
378 }
379
380 fn default_pause() -> u64 {
381 if is_ci() {
382 1000
383 } else {
384 200
385 }
386 }
387
388 async fn pause() {
389 pause_ms(default_pause()).await;
390 }
391
392 async fn pause_ms(ms: u64) {
393 tokio::time::sleep(Duration::from_millis(ms)).await;
394 }
395
396 #[tokio::test]
397 async fn it_can_spawn_job() {
398 {
399 let pool = &*test_pool().await;
400 let (_runner, counter) =
401 test_job_runner(pool, |mut job| async move { job.complete().await }).await;
402
403 assert_eq!(counter.load(Ordering::SeqCst), 0);
404 JobBuilder::new("foo").spawn(pool).await.unwrap();
405 pause().await;
406 assert_eq!(counter.load(Ordering::SeqCst), 1);
407 }
408 pause().await;
409 }
410
411 #[tokio::test]
412 async fn it_can_clear_jobs() {
413 {
414 let pool = &*test_pool().await;
415 JobBuilder::new("foo")
416 .set_channel_name("foo")
417 .spawn(pool)
418 .await
419 .unwrap();
420 JobBuilder::new("foo")
421 .set_channel_name("foo")
422 .spawn(pool)
423 .await
424 .unwrap();
425 JobBuilder::new("foo")
426 .set_channel_name("bar")
427 .spawn(pool)
428 .await
429 .unwrap();
430 JobBuilder::new("foo")
431 .set_channel_name("bar")
432 .spawn(pool)
433 .await
434 .unwrap();
435 JobBuilder::new("foo")
436 .set_channel_name("baz")
437 .spawn(pool)
438 .await
439 .unwrap();
440 JobBuilder::new("foo")
441 .set_channel_name("baz")
442 .spawn(pool)
443 .await
444 .unwrap();
445
446 sqlxmq::clear(pool, &["foo", "baz"]).await.unwrap();
447
448 let (_runner, counter) =
449 test_job_runner(pool, |mut job| async move { job.complete().await }).await;
450
451 pause().await;
452 assert_eq!(counter.load(Ordering::SeqCst), 2);
453 }
454 pause().await;
455 }
456
457 #[tokio::test]
458 async fn it_runs_jobs_in_order() {
459 {
460 let pool = &*test_pool().await;
461 let (tx, mut rx) = mpsc::unbounded();
462
463 let (_runner, counter) = test_job_runner(pool, move |job| {
464 let tx = tx.clone();
465 async move {
466 tx.unbounded_send(job).unwrap();
467 }
468 })
469 .await;
470
471 assert_eq!(counter.load(Ordering::SeqCst), 0);
472 JobBuilder::new("foo")
473 .set_ordered(true)
474 .spawn(pool)
475 .await
476 .unwrap();
477 JobBuilder::new("bar")
478 .set_ordered(true)
479 .spawn(pool)
480 .await
481 .unwrap();
482
483 pause().await;
484 assert_eq!(counter.load(Ordering::SeqCst), 1);
485
486 let mut job = rx.next().await.unwrap();
487 job.complete().await.unwrap();
488
489 pause().await;
490 assert_eq!(counter.load(Ordering::SeqCst), 2);
491 }
492 pause().await;
493 }
494
495 #[tokio::test]
496 async fn it_runs_jobs_in_parallel() {
497 {
498 let pool = &*test_pool().await;
499 let (tx, mut rx) = mpsc::unbounded();
500
501 let (_runner, counter) = test_job_runner(pool, move |job| {
502 let tx = tx.clone();
503 async move {
504 tx.unbounded_send(job).unwrap();
505 }
506 })
507 .await;
508
509 assert_eq!(counter.load(Ordering::SeqCst), 0);
510 JobBuilder::new("foo").spawn(pool).await.unwrap();
511 JobBuilder::new("bar").spawn(pool).await.unwrap();
512
513 pause().await;
514 assert_eq!(counter.load(Ordering::SeqCst), 2);
515
516 for _ in 0..2 {
517 let mut job = rx.next().await.unwrap();
518 job.complete().await.unwrap();
519 }
520 }
521 pause().await;
522 }
523
524 #[tokio::test]
525 async fn it_retries_failed_jobs() {
526 {
527 let pool = &*test_pool().await;
528 let (_runner, counter) = test_job_runner(pool, move |_| async {}).await;
529
530 let backoff = default_pause() + 300;
531
532 assert_eq!(counter.load(Ordering::SeqCst), 0);
533 JobBuilder::new("foo")
534 .set_retry_backoff(Duration::from_millis(backoff))
535 .set_retries(2)
536 .spawn(pool)
537 .await
538 .unwrap();
539
540 // First attempt
541 pause().await;
542 assert_eq!(counter.load(Ordering::SeqCst), 1);
543
544 // Second attempt
545 pause_ms(backoff).await;
546 pause().await;
547 assert_eq!(counter.load(Ordering::SeqCst), 2);
548
549 // Third attempt
550 pause_ms(backoff * 2).await;
551 pause().await;
552 assert_eq!(counter.load(Ordering::SeqCst), 3);
553
554 // No more attempts
555 pause_ms(backoff * 5).await;
556 assert_eq!(counter.load(Ordering::SeqCst), 3);
557 }
558 pause().await;
559 }
560
561 #[tokio::test]
562 async fn it_can_checkpoint_jobs() {
563 {
564 let pool = &*test_pool().await;
565 let (_runner, counter) = test_job_runner(pool, move |mut current_job| async move {
566 let state: bool = current_job.json().unwrap().unwrap();
567 if state {
568 current_job.complete().await.unwrap();
569 } else {
570 current_job
571 .checkpoint(Checkpoint::new().set_json(&true).unwrap())
572 .await
573 .unwrap();
574 }
575 })
576 .await;
577
578 let backoff = default_pause();
579
580 assert_eq!(counter.load(Ordering::SeqCst), 0);
581 JobBuilder::new("foo")
582 .set_retry_backoff(Duration::from_millis(backoff))
583 .set_retries(5)
584 .set_json(&false)
585 .unwrap()
586 .spawn(pool)
587 .await
588 .unwrap();
589
590 // First attempt
591 pause().await;
592 assert_eq!(counter.load(Ordering::SeqCst), 1);
593
594 // Second attempt
595 pause_ms(backoff).await;
596 assert_eq!(counter.load(Ordering::SeqCst), 2);
597
598 // No more attempts
599 pause_ms(backoff * 3).await;
600 assert_eq!(counter.load(Ordering::SeqCst), 2);
601 }
602 pause().await;
603 }
604
605 #[tokio::test]
606 async fn it_can_use_registry() {
607 {
608 let pool = &*test_pool().await;
609 let _runner = named_job_runner(pool).await;
610
611 example_job1.builder().spawn(pool).await.unwrap();
612 example_job2.builder().spawn(pool).await.unwrap();
613 example_job_with_ctx.builder().spawn(pool).await.unwrap();
614 pause().await;
615 }
616 pause().await;
617 }
618}