sqlxmq/
lib.rs

1#![deny(missing_docs, unsafe_code)]
2//! # sqlxmq
3//!
4//! A job queue built on `sqlx` and `PostgreSQL`.
5//!
6//! This library allows a CRUD application to run background jobs without complicating its
7//! deployment. The only runtime dependency is `PostgreSQL`, so this is ideal for applications
8//! already using a `PostgreSQL` database.
9//!
10//! Although using a SQL database as a job queue means compromising on latency of
11//! delivered jobs, there are several show-stopping issues present in ordinary job
12//! queues which are avoided altogether.
13//!
14//! With most other job queues, in-flight jobs are state that is not covered by normal
15//! database backups. Even if jobs _are_ backed up, there is no way to restore both
16//! a database and a job queue to a consistent point-in-time without manually
17//! resolving conflicts.
18//!
19//! By storing jobs in the database, existing backup procedures will store a perfectly
20//! consistent state of both in-flight jobs and persistent data. Additionally, jobs can
21//! be spawned and completed as part of other transactions, making it easy to write correct
22//! application code.
23//!
24//! Leveraging the power of `PostgreSQL`, this job queue offers several features not
25//! present in other job queues.
26//!
27//! # Features
28//!
29//! - **Send/receive multiple jobs at once.**
30//!
31//!   This reduces the number of queries to the database.
32//!
33//! - **Send jobs to be executed at a future date and time.**
34//!
35//!   Avoids the need for a separate scheduling system.
36//!
37//! - **Reliable delivery of jobs.**
38//!
39//! - **Automatic retries with exponential backoff.**
40//!
41//!   Number of retries and initial backoff parameters are configurable.
42//!
43//! - **Transactional sending of jobs.**
44//!
45//!   Avoids sending spurious jobs if a transaction is rolled back.
46//!
47//! - **Transactional completion of jobs.**
48//!
49//!   If all side-effects of a job are updates to the database, this provides
50//!   true exactly-once execution of jobs.
51//!
52//! - **Transactional check-pointing of jobs.**
53//!
54//!   Long-running jobs can check-point their state to avoid having to restart
55//!   from the beginning if there is a failure: the next retry can continue
56//!   from the last check-point.
57//!
58//! - **Opt-in strictly ordered job delivery.**
59//!
60//!   Jobs within the same channel will be processed strictly in-order
61//!   if this option is enabled for the job.
62//!
63//! - **Fair job delivery.**
64//!
65//!   A channel with a lot of jobs ready to run will not starve a channel with fewer
66//!   jobs.
67//!
68//! - **Opt-in two-phase commit.**
69//!
70//!   This is particularly useful on an ordered channel where a position can be "reserved"
71//!   in the job order, but not committed until later.
72//!
73//! - **JSON and/or binary payloads.**
74//!
75//!   Jobs can use whichever is most convenient.
76//!
77//! - **Automatic keep-alive of jobs.**
78//!
79//!   Long-running jobs will automatically be "kept alive" to prevent them being
80//!   retried whilst they're still ongoing.
81//!
82//! - **Concurrency limits.**
83//!
84//!   Specify the minimum and maximum number of concurrent jobs each runner should
85//!   handle.
86//!
87//! - **Built-in job registry via an attribute macro.**
88//!
89//!   Jobs can be easily registered with a runner, and default configuration specified
90//!   on a per-job basis.
91//!
92//! - **Implicit channels.**
93//!
94//!   Channels are implicitly created and destroyed when jobs are sent and processed,
95//!   so no setup is required.
96//!
97//! - **Channel groups.**
98//!
99//!   Easily subscribe to multiple channels at once, thanks to the separation of
100//!   channel name and channel arguments.
101//!
102//! - **NOTIFY-based polling.**
103//!
104//!   This saves resources when few jobs are being processed.
105//!
106//! # Getting started
107//!
108//! ## Database schema
109//!
110//! This crate expects certain database tables and stored procedures to exist.
111//! You can copy the migration files from this crate into your own migrations
112//! folder.
113//!
114//! All database items created by this crate are prefixed with `mq`, so as not
115//! to conflict with your own schema.
116//!
117//! ## Defining jobs
118//!
119//! The first step is to define a function to be run on the job queue.
120//!
121//! ```rust
122//! use std::error::Error;
123//!
124//! use sqlxmq::{job, CurrentJob};
125//!
126//! // Arguments to the `#[job]` attribute allow setting default job options.
127//! #[job(channel_name = "foo")]
128//! async fn example_job(
129//!     // The first argument should always be the current job.
130//!     mut current_job: CurrentJob,
131//!     // Additional arguments are optional, but can be used to access context
132//!     // provided via [`JobRegistry::set_context`].
133//!     message: &'static str,
134//! ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
135//!     // Decode a JSON payload
136//!     let who: Option<String> = current_job.json()?;
137//!
138//!     // Do some work
139//!     println!("{}, {}!", message, who.as_deref().unwrap_or("world"));
140//!
141//!     // Mark the job as complete
142//!     current_job.complete().await?;
143//!
144//!     Ok(())
145//! }
146//! ```
147//!
148//! ## Listening for jobs
149//!
150//! Next we need to create a job runner: this is what listens for new jobs
151//! and executes them.
152//!
153//! ```rust,no_run
154//! use std::error::Error;
155//!
156//! use sqlxmq::JobRegistry;
157//!
158//! # use sqlxmq::{job, CurrentJob};
159//! #
160//! # #[job]
161//! # async fn example_job(
162//! #     current_job: CurrentJob,
163//! # ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> { Ok(()) }
164//! #
165//! # async fn connect_to_db() -> sqlx::Result<sqlx::Pool<sqlx::Postgres>> {
166//! #     unimplemented!()
167//! # }
168//!
169//! #[tokio::main]
170//! async fn main() -> Result<(), Box<dyn Error>> {
171//!     // You'll need to provide a Postgres connection pool.
172//!     let pool = connect_to_db().await?;
173//!
174//!     // Construct a job registry from our single job.
175//!     let mut registry = JobRegistry::new(&[example_job]);
176//!     // Here is where you can configure the registry
177//!     // registry.set_error_handler(...)
178//!
179//!     // And add context
180//!     registry.set_context("Hello");
181//!
182//!     let runner = registry
183//!         // Create a job runner using the connection pool.
184//!         .runner(&pool)
185//!         // Here is where you can configure the job runner
186//!         // Aim to keep 10-20 jobs running at a time.
187//!         .set_concurrency(10, 20)
188//!         // Start the job runner in the background.
189//!         .run()
190//!         .await?;
191//!
192//!     // The job runner will continue listening and running
193//!     // jobs until `runner` is dropped.
194//!     Ok(())
195//! }
196//! ```
197//!
198//! ## Spawning a job
199//!
200//! The final step is to actually run a job.
201//!
202//! ```rust
203//! # use std::error::Error;
204//! # use sqlxmq::{job, CurrentJob};
205//! #
206//! # #[job]
207//! # async fn example_job(
208//! #     current_job: CurrentJob,
209//! # ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> { Ok(()) }
210//! #
211//! # async fn example(
212//! #     pool: sqlx::Pool<sqlx::Postgres>
213//! # ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
214//! example_job.builder()
215//!     // This is where we can override job configuration
216//!     .set_channel_name("bar")
217//!     .set_json("John")?
218//!     .spawn(&pool)
219//!     .await?;
220//! #     Ok(())
221//! # }
222//! ```
223
224#[doc(hidden)]
225pub mod hidden;
226mod registry;
227mod runner;
228mod spawn;
229mod utils;
230
231pub use registry::*;
232pub use runner::*;
233pub use spawn::*;
234pub use sqlxmq_macros::job;
235pub use utils::OwnedHandle;
236
237/// Helper function to determine if a particular error condition is retryable.
238///
239/// For best results, database operations should be automatically retried if one
240/// of these errors is returned.
241pub fn should_retry(error: &sqlx::Error) -> bool {
242    if let Some(db_error) = error.as_database_error() {
243        // It's more readable as a match
244        #[allow(clippy::match_like_matches_macro)]
245        match (db_error.code().as_deref(), db_error.constraint()) {
246            // Foreign key constraint violation on ordered channel
247            (Some("23503"), Some("mq_msgs_after_message_id_fkey")) => true,
248            // Unique constraint violation on ordered channel
249            (Some("23505"), Some("mq_msgs_channel_name_channel_args_after_message_id_idx")) => true,
250            // Serialization failure
251            (Some("40001"), _) => true,
252            // Deadlock detected
253            (Some("40P01"), _) => true,
254            // Other
255            _ => false,
256        }
257    } else {
258        false
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265    use crate as sqlxmq;
266
267    use std::env;
268    use std::error::Error;
269    use std::future::Future;
270    use std::ops::Deref;
271    use std::sync::atomic::{AtomicUsize, Ordering};
272    use std::sync::{Arc, Once};
273    use std::time::Duration;
274
275    use futures::channel::mpsc;
276    use futures::StreamExt;
277    use sqlx::{Pool, Postgres};
278    use tokio::sync::{Mutex, MutexGuard};
279    use tokio::task;
280
281    // field 0 is never read, but its drop is important
282    #[allow(dead_code)]
283    struct TestGuard<T>(MutexGuard<'static, ()>, T);
284
285    impl<T> Deref for TestGuard<T> {
286        type Target = T;
287
288        fn deref(&self) -> &T {
289            &self.1
290        }
291    }
292
293    async fn test_pool() -> TestGuard<Pool<Postgres>> {
294        static INIT_LOGGER: Once = Once::new();
295        static TEST_MUTEX: Mutex<()> = Mutex::const_new(());
296
297        let guard = TEST_MUTEX.lock().await;
298
299        let _ = dotenvy::dotenv();
300
301        INIT_LOGGER.call_once(pretty_env_logger::init);
302
303        let pool = Pool::connect(&env::var("DATABASE_URL").unwrap())
304            .await
305            .unwrap();
306
307        sqlx::query("TRUNCATE TABLE mq_payloads")
308            .execute(&pool)
309            .await
310            .unwrap();
311        sqlx::query("DELETE FROM mq_msgs WHERE id != uuid_nil()")
312            .execute(&pool)
313            .await
314            .unwrap();
315
316        TestGuard(guard, pool)
317    }
318
319    async fn test_job_runner<F: Future + Send + 'static>(
320        pool: &Pool<Postgres>,
321        f: impl (Fn(CurrentJob) -> F) + Send + Sync + 'static,
322    ) -> (JobRunnerHandle, Arc<AtomicUsize>)
323    where
324        F::Output: Send + 'static,
325    {
326        let counter = Arc::new(AtomicUsize::new(0));
327        let counter2 = counter.clone();
328        let runner = JobRunnerOptions::new(pool, move |job| {
329            counter2.fetch_add(1, Ordering::SeqCst);
330            task::spawn(f(job));
331        })
332        .run()
333        .await
334        .unwrap();
335        (runner, counter)
336    }
337
338    fn job_proto<'a, 'b>(builder: &'a mut JobBuilder<'b>) -> &'a mut JobBuilder<'b> {
339        builder.set_channel_name("bar")
340    }
341
342    #[job(channel_name = "foo", ordered, retries = 3, backoff_secs = 2.0)]
343    async fn example_job1(
344        mut current_job: CurrentJob,
345    ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
346        current_job.complete().await?;
347        Ok(())
348    }
349
350    #[job(proto(job_proto))]
351    async fn example_job2(
352        mut current_job: CurrentJob,
353    ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
354        current_job.complete().await?;
355        Ok(())
356    }
357
358    #[job]
359    async fn example_job_with_ctx(
360        mut current_job: CurrentJob,
361        ctx1: i32,
362        ctx2: &'static str,
363    ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
364        assert_eq!(ctx1, 42);
365        assert_eq!(ctx2, "Hello, world!");
366        current_job.complete().await?;
367        Ok(())
368    }
369
370    async fn named_job_runner(pool: &Pool<Postgres>) -> JobRunnerHandle {
371        let mut registry = JobRegistry::new(&[example_job1, example_job2, example_job_with_ctx]);
372        registry.set_context(42).set_context("Hello, world!");
373        registry.runner(pool).run().await.unwrap()
374    }
375
376    fn is_ci() -> bool {
377        std::env::var("CI").ok().is_some()
378    }
379
380    fn default_pause() -> u64 {
381        if is_ci() {
382            1000
383        } else {
384            200
385        }
386    }
387
388    async fn pause() {
389        pause_ms(default_pause()).await;
390    }
391
392    async fn pause_ms(ms: u64) {
393        tokio::time::sleep(Duration::from_millis(ms)).await;
394    }
395
396    #[tokio::test]
397    async fn it_can_spawn_job() {
398        {
399            let pool = &*test_pool().await;
400            let (_runner, counter) =
401                test_job_runner(pool, |mut job| async move { job.complete().await }).await;
402
403            assert_eq!(counter.load(Ordering::SeqCst), 0);
404            JobBuilder::new("foo").spawn(pool).await.unwrap();
405            pause().await;
406            assert_eq!(counter.load(Ordering::SeqCst), 1);
407        }
408        pause().await;
409    }
410
411    #[tokio::test]
412    async fn it_can_clear_jobs() {
413        {
414            let pool = &*test_pool().await;
415            JobBuilder::new("foo")
416                .set_channel_name("foo")
417                .spawn(pool)
418                .await
419                .unwrap();
420            JobBuilder::new("foo")
421                .set_channel_name("foo")
422                .spawn(pool)
423                .await
424                .unwrap();
425            JobBuilder::new("foo")
426                .set_channel_name("bar")
427                .spawn(pool)
428                .await
429                .unwrap();
430            JobBuilder::new("foo")
431                .set_channel_name("bar")
432                .spawn(pool)
433                .await
434                .unwrap();
435            JobBuilder::new("foo")
436                .set_channel_name("baz")
437                .spawn(pool)
438                .await
439                .unwrap();
440            JobBuilder::new("foo")
441                .set_channel_name("baz")
442                .spawn(pool)
443                .await
444                .unwrap();
445
446            sqlxmq::clear(pool, &["foo", "baz"]).await.unwrap();
447
448            let (_runner, counter) =
449                test_job_runner(pool, |mut job| async move { job.complete().await }).await;
450
451            pause().await;
452            assert_eq!(counter.load(Ordering::SeqCst), 2);
453        }
454        pause().await;
455    }
456
457    #[tokio::test]
458    async fn it_runs_jobs_in_order() {
459        {
460            let pool = &*test_pool().await;
461            let (tx, mut rx) = mpsc::unbounded();
462
463            let (_runner, counter) = test_job_runner(pool, move |job| {
464                let tx = tx.clone();
465                async move {
466                    tx.unbounded_send(job).unwrap();
467                }
468            })
469            .await;
470
471            assert_eq!(counter.load(Ordering::SeqCst), 0);
472            JobBuilder::new("foo")
473                .set_ordered(true)
474                .spawn(pool)
475                .await
476                .unwrap();
477            JobBuilder::new("bar")
478                .set_ordered(true)
479                .spawn(pool)
480                .await
481                .unwrap();
482
483            pause().await;
484            assert_eq!(counter.load(Ordering::SeqCst), 1);
485
486            let mut job = rx.next().await.unwrap();
487            job.complete().await.unwrap();
488
489            pause().await;
490            assert_eq!(counter.load(Ordering::SeqCst), 2);
491        }
492        pause().await;
493    }
494
495    #[tokio::test]
496    async fn it_runs_jobs_in_parallel() {
497        {
498            let pool = &*test_pool().await;
499            let (tx, mut rx) = mpsc::unbounded();
500
501            let (_runner, counter) = test_job_runner(pool, move |job| {
502                let tx = tx.clone();
503                async move {
504                    tx.unbounded_send(job).unwrap();
505                }
506            })
507            .await;
508
509            assert_eq!(counter.load(Ordering::SeqCst), 0);
510            JobBuilder::new("foo").spawn(pool).await.unwrap();
511            JobBuilder::new("bar").spawn(pool).await.unwrap();
512
513            pause().await;
514            assert_eq!(counter.load(Ordering::SeqCst), 2);
515
516            for _ in 0..2 {
517                let mut job = rx.next().await.unwrap();
518                job.complete().await.unwrap();
519            }
520        }
521        pause().await;
522    }
523
524    #[tokio::test]
525    async fn it_retries_failed_jobs() {
526        {
527            let pool = &*test_pool().await;
528            let (_runner, counter) = test_job_runner(pool, move |_| async {}).await;
529
530            let backoff = default_pause() + 300;
531
532            assert_eq!(counter.load(Ordering::SeqCst), 0);
533            JobBuilder::new("foo")
534                .set_retry_backoff(Duration::from_millis(backoff))
535                .set_retries(2)
536                .spawn(pool)
537                .await
538                .unwrap();
539
540            // First attempt
541            pause().await;
542            assert_eq!(counter.load(Ordering::SeqCst), 1);
543
544            // Second attempt
545            pause_ms(backoff).await;
546            pause().await;
547            assert_eq!(counter.load(Ordering::SeqCst), 2);
548
549            // Third attempt
550            pause_ms(backoff * 2).await;
551            pause().await;
552            assert_eq!(counter.load(Ordering::SeqCst), 3);
553
554            // No more attempts
555            pause_ms(backoff * 5).await;
556            assert_eq!(counter.load(Ordering::SeqCst), 3);
557        }
558        pause().await;
559    }
560
561    #[tokio::test]
562    async fn it_can_checkpoint_jobs() {
563        {
564            let pool = &*test_pool().await;
565            let (_runner, counter) = test_job_runner(pool, move |mut current_job| async move {
566                let state: bool = current_job.json().unwrap().unwrap();
567                if state {
568                    current_job.complete().await.unwrap();
569                } else {
570                    current_job
571                        .checkpoint(Checkpoint::new().set_json(&true).unwrap())
572                        .await
573                        .unwrap();
574                }
575            })
576            .await;
577
578            let backoff = default_pause();
579
580            assert_eq!(counter.load(Ordering::SeqCst), 0);
581            JobBuilder::new("foo")
582                .set_retry_backoff(Duration::from_millis(backoff))
583                .set_retries(5)
584                .set_json(&false)
585                .unwrap()
586                .spawn(pool)
587                .await
588                .unwrap();
589
590            // First attempt
591            pause().await;
592            assert_eq!(counter.load(Ordering::SeqCst), 1);
593
594            // Second attempt
595            pause_ms(backoff).await;
596            assert_eq!(counter.load(Ordering::SeqCst), 2);
597
598            // No more attempts
599            pause_ms(backoff * 3).await;
600            assert_eq!(counter.load(Ordering::SeqCst), 2);
601        }
602        pause().await;
603    }
604
605    #[tokio::test]
606    async fn it_can_use_registry() {
607        {
608            let pool = &*test_pool().await;
609            let _runner = named_job_runner(pool).await;
610
611            example_job1.builder().spawn(pool).await.unwrap();
612            example_job2.builder().spawn(pool).await.unwrap();
613            example_job_with_ctx.builder().spawn(pool).await.unwrap();
614            pause().await;
615        }
616        pause().await;
617    }
618}