#![deny(missing_docs, unsafe_code)]
#[doc(hidden)]
pub mod hidden;
mod registry;
mod runner;
mod spawn;
mod utils;
pub use registry::*;
pub use runner::*;
pub use spawn::*;
pub use sqlxmq_macros::job;
pub use utils::OwnedHandle;
pub fn should_retry(error: &sqlx::Error) -> bool {
if let Some(db_error) = error.as_database_error() {
#[allow(clippy::match_like_matches_macro)]
match (db_error.code().as_deref(), db_error.constraint()) {
(Some("23503"), Some("mq_msgs_after_message_id_fkey")) => true,
(Some("23505"), Some("mq_msgs_channel_name_channel_args_after_message_id_idx")) => true,
(Some("40001"), _) => true,
(Some("40P01"), _) => true,
_ => false,
}
} else {
false
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate as sqlxmq;
use std::env;
use std::error::Error;
use std::future::Future;
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Once};
use std::time::Duration;
use futures::channel::mpsc;
use futures::StreamExt;
use sqlx::{Pool, Postgres};
use tokio::sync::{Mutex, MutexGuard};
use tokio::task;
struct TestGuard<T>(MutexGuard<'static, ()>, T);
impl<T> Deref for TestGuard<T> {
type Target = T;
fn deref(&self) -> &T {
&self.1
}
}
async fn test_pool() -> TestGuard<Pool<Postgres>> {
static INIT_LOGGER: Once = Once::new();
static TEST_MUTEX: Mutex<()> = Mutex::const_new(());
let guard = TEST_MUTEX.lock().await;
let _ = dotenv::dotenv();
INIT_LOGGER.call_once(pretty_env_logger::init);
let pool = Pool::connect(&env::var("DATABASE_URL").unwrap())
.await
.unwrap();
sqlx::query("TRUNCATE TABLE mq_payloads")
.execute(&pool)
.await
.unwrap();
sqlx::query("DELETE FROM mq_msgs WHERE id != uuid_nil()")
.execute(&pool)
.await
.unwrap();
TestGuard(guard, pool)
}
async fn test_job_runner<F: Future + Send + 'static>(
pool: &Pool<Postgres>,
f: impl (Fn(CurrentJob) -> F) + Send + Sync + 'static,
) -> (OwnedHandle, Arc<AtomicUsize>)
where
F::Output: Send + 'static,
{
let counter = Arc::new(AtomicUsize::new(0));
let counter2 = counter.clone();
let runner = JobRunnerOptions::new(pool, move |job| {
counter2.fetch_add(1, Ordering::SeqCst);
task::spawn(f(job));
})
.run()
.await
.unwrap();
(runner, counter)
}
fn job_proto<'a, 'b>(builder: &'a mut JobBuilder<'b>) -> &'a mut JobBuilder<'b> {
builder.set_channel_name("bar")
}
#[job(channel_name = "foo", ordered, retries = 3, backoff_secs = 2.0)]
async fn example_job1(
mut current_job: CurrentJob,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
current_job.complete().await?;
Ok(())
}
#[job(proto(job_proto))]
async fn example_job2(
mut current_job: CurrentJob,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
current_job.complete().await?;
Ok(())
}
#[job]
async fn example_job_with_ctx(
mut current_job: CurrentJob,
ctx1: i32,
ctx2: &'static str,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
assert_eq!(ctx1, 42);
assert_eq!(ctx2, "Hello, world!");
current_job.complete().await?;
Ok(())
}
async fn named_job_runner(pool: &Pool<Postgres>) -> OwnedHandle {
let mut registry = JobRegistry::new(&[example_job1, example_job2, example_job_with_ctx]);
registry.set_context(42).set_context("Hello, world!");
registry.runner(pool).run().await.unwrap()
}
async fn pause() {
pause_ms(200).await;
}
async fn pause_ms(ms: u64) {
tokio::time::sleep(Duration::from_millis(ms)).await;
}
#[tokio::test]
async fn it_can_spawn_job() {
{
let pool = &*test_pool().await;
let (_runner, counter) =
test_job_runner(pool, |mut job| async move { job.complete().await }).await;
assert_eq!(counter.load(Ordering::SeqCst), 0);
JobBuilder::new("foo").spawn(pool).await.unwrap();
pause().await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
pause().await;
}
#[tokio::test]
async fn it_can_clear_jobs() {
{
let pool = &*test_pool().await;
JobBuilder::new("foo")
.set_channel_name("foo")
.spawn(pool)
.await
.unwrap();
JobBuilder::new("foo")
.set_channel_name("foo")
.spawn(pool)
.await
.unwrap();
JobBuilder::new("foo")
.set_channel_name("bar")
.spawn(pool)
.await
.unwrap();
JobBuilder::new("foo")
.set_channel_name("bar")
.spawn(pool)
.await
.unwrap();
JobBuilder::new("foo")
.set_channel_name("baz")
.spawn(pool)
.await
.unwrap();
JobBuilder::new("foo")
.set_channel_name("baz")
.spawn(pool)
.await
.unwrap();
sqlxmq::clear(pool, &["foo", "baz"]).await.unwrap();
let (_runner, counter) =
test_job_runner(pool, |mut job| async move { job.complete().await }).await;
pause().await;
assert_eq!(counter.load(Ordering::SeqCst), 2);
}
pause().await;
}
#[tokio::test]
async fn it_runs_jobs_in_order() {
{
let pool = &*test_pool().await;
let (tx, mut rx) = mpsc::unbounded();
let (_runner, counter) = test_job_runner(pool, move |job| {
let tx = tx.clone();
async move {
tx.unbounded_send(job).unwrap();
}
})
.await;
assert_eq!(counter.load(Ordering::SeqCst), 0);
JobBuilder::new("foo")
.set_ordered(true)
.spawn(pool)
.await
.unwrap();
JobBuilder::new("bar")
.set_ordered(true)
.spawn(pool)
.await
.unwrap();
pause().await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
let mut job = rx.next().await.unwrap();
job.complete().await.unwrap();
pause().await;
assert_eq!(counter.load(Ordering::SeqCst), 2);
}
pause().await;
}
#[tokio::test]
async fn it_runs_jobs_in_parallel() {
{
let pool = &*test_pool().await;
let (tx, mut rx) = mpsc::unbounded();
let (_runner, counter) = test_job_runner(pool, move |job| {
let tx = tx.clone();
async move {
tx.unbounded_send(job).unwrap();
}
})
.await;
assert_eq!(counter.load(Ordering::SeqCst), 0);
JobBuilder::new("foo").spawn(pool).await.unwrap();
JobBuilder::new("bar").spawn(pool).await.unwrap();
pause().await;
assert_eq!(counter.load(Ordering::SeqCst), 2);
for _ in 0..2 {
let mut job = rx.next().await.unwrap();
job.complete().await.unwrap();
}
}
pause().await;
}
#[tokio::test]
async fn it_retries_failed_jobs() {
{
let pool = &*test_pool().await;
let (_runner, counter) = test_job_runner(pool, move |_| async {}).await;
let backoff = 500;
assert_eq!(counter.load(Ordering::SeqCst), 0);
JobBuilder::new("foo")
.set_retry_backoff(Duration::from_millis(backoff))
.set_retries(2)
.spawn(pool)
.await
.unwrap();
pause().await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
pause_ms(backoff).await;
pause().await;
assert_eq!(counter.load(Ordering::SeqCst), 2);
pause_ms(backoff * 2).await;
pause().await;
assert_eq!(counter.load(Ordering::SeqCst), 3);
pause_ms(backoff * 5).await;
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
pause().await;
}
#[tokio::test]
async fn it_can_checkpoint_jobs() {
{
let pool = &*test_pool().await;
let (_runner, counter) = test_job_runner(pool, move |mut current_job| async move {
let state: bool = current_job.json().unwrap().unwrap();
if state {
current_job.complete().await.unwrap();
} else {
current_job
.checkpoint(Checkpoint::new().set_json(&true).unwrap())
.await
.unwrap();
}
})
.await;
let backoff = 200;
assert_eq!(counter.load(Ordering::SeqCst), 0);
JobBuilder::new("foo")
.set_retry_backoff(Duration::from_millis(backoff))
.set_retries(5)
.set_json(&false)
.unwrap()
.spawn(pool)
.await
.unwrap();
pause().await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
pause_ms(backoff).await;
pause().await;
assert_eq!(counter.load(Ordering::SeqCst), 2);
pause_ms(backoff * 3).await;
assert_eq!(counter.load(Ordering::SeqCst), 2);
}
pause().await;
}
#[tokio::test]
async fn it_can_use_registry() {
{
let pool = &*test_pool().await;
let _runner = named_job_runner(pool).await;
example_job1.builder().spawn(pool).await.unwrap();
example_job2.builder().spawn(pool).await.unwrap();
example_job_with_ctx.builder().spawn(pool).await.unwrap();
pause().await;
}
pause().await;
}
}