#![allow(clippy::expect_used, clippy::unused_async)]
mod common;
use std::{
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
use bonsaidb::local::{
config::{Builder, StorageConfiguration},
AsyncDatabase,
};
use bonsaimq::{job_registry, CurrentJob, JobRegister, JobRunner, MessageQueueSchema};
use color_eyre::Result;
static COUNT: AtomicUsize = AtomicUsize::new(0);
async fn counter(_job: CurrentJob) -> Result<()> {
let val = COUNT.fetch_add(1, Ordering::SeqCst);
tracing::info!("Counting execution nr. {}", val + 1);
panic!("I am panicking even!");
}
static KEPT_ALIVE: AtomicUsize = AtomicUsize::new(0);
async fn keep_alive(_job: CurrentJob) -> Result<()> {
KEPT_ALIVE.fetch_add(1, Ordering::Relaxed);
tokio::time::sleep(Duration::from_secs(9999)).await;
Ok(())
}
job_registry!(JobRegistry, {
Counter: "counter" => counter,
KeepAlive: "keep-alive" => keep_alive,
});
#[tokio::test]
#[ntest::timeout(10000)]
async fn same_id() -> Result<()> {
common::init();
let db_path = "same-id-test.bonsaidb";
tokio::fs::remove_dir_all(db_path).await.ok();
let db = AsyncDatabase::open::<MessageQueueSchema>(StorageConfiguration::new(db_path)).await?;
let id = JobRegistry::Counter.builder().delay(Duration::from_secs(999)).spawn(&db).await?;
let res = JobRegistry::Counter.builder().id(id).spawn(&db).await;
assert!(res.is_err());
tokio::fs::remove_dir_all(db_path).await?;
Ok(())
}
#[tokio::test]
#[ntest::timeout(60000)]
async fn retrying() -> Result<()> {
common::init();
let db_path = "retrying-test.bonsaidb";
tokio::fs::remove_dir_all(db_path).await.ok();
let db = AsyncDatabase::open::<MessageQueueSchema>(StorageConfiguration::new(db_path)).await?;
let job_runner = JobRunner::new(db.clone()).run::<JobRegistry>();
let n = 5;
let id = JobRegistry::Counter
.builder()
.id(123_456_789)
.max_executions(n)
.retry_timing(bonsaimq::RetryTiming::Fixed(Duration::from_millis(10)))
.spawn(&db)
.await?;
bonsaimq::await_job(id, 100, &db).await?;
assert_eq!(COUNT.load(Ordering::SeqCst), n as usize);
job_runner.abort();
tokio::fs::remove_dir_all(db_path).await?;
Ok(())
}
#[tokio::test]
async fn kept_alive() -> Result<()> {
common::init();
let db_path = "keep-alive-test.bonsaidb";
tokio::fs::remove_dir_all(db_path).await.ok();
let db = AsyncDatabase::open::<MessageQueueSchema>(StorageConfiguration::new(db_path)).await?;
let job_runner = JobRunner::new(db.clone()).run::<JobRegistry>();
JobRegistry::KeepAlive
.builder()
.retry_timing(bonsaimq::RetryTiming::Fixed(Duration::from_millis(500)))
.spawn(&db)
.await?;
tokio::time::sleep(Duration::from_secs(10)).await;
assert_eq!(KEPT_ALIVE.load(Ordering::Relaxed), 1);
job_runner.abort();
tokio::fs::remove_dir_all(db_path).await?;
Ok(())
}