#[macro_export]
macro_rules! queue_store_suite {
($mod_name:ident, $make_store:expr) => {
mod $mod_name {
use super::*;
use kiomq::{JobOptions, JobState, KioResult, Queue, QueueOpts, Store};
use uuid::Uuid;
type D = i32;
type R = i32;
type P = i32;
async fn make_store() -> KioResult<impl Store<D, R, P> + Clone + Send + 'static> {
($make_store).await
}
#[tokio::test(flavor = "multi_thread")]
async fn add_and_fetch_job_single() -> KioResult<()> {
let queue_opts = QueueOpts::default();
let store = make_store().await?;
let queue = Queue::<D, R, P, _>::new(store, Some(queue_opts)).await?;
let job = queue.add_job("test", 1, None).await?;
let metrics = queue.get_metrics().await?;
let waiting = metrics.waiting.load();
assert_eq!(waiting, 1);
let expected_id = metrics.last_id.load();
let fetched_job = queue.get_job(expected_id).await;
if let Some(fetched) = fetched_job {
assert_eq!(job.id, fetched.id);
}
queue.obliterate().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn add_bulk_jobs() -> KioResult<()> {
let queue_opts = QueueOpts::default();
let store = make_store().await?;
let queue = Queue::<D, R, P, _>::new(store, Some(queue_opts)).await?;
let job_iterator = (0..4).map(|i| (i.to_string(), None, i));
let jobs = queue.bulk_add(job_iterator).await?;
let metrics = queue.get_metrics().await?;
assert_eq!(metrics.waiting.load(), jobs.len() as u64,);
queue.obliterate().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn obliterate() -> KioResult<()> {
let queue_opts = QueueOpts::default();
let store = make_store().await?;
let queue = Queue::<D, R, P, _>::new(store, Some(queue_opts)).await?;
let job_iterator = (0..4).map(|i| (i.to_string(), None, i));
let jobs = queue.bulk_add(job_iterator).await?;
let metrics = queue.get_metrics().await?;
assert_eq!(metrics.waiting.load(), jobs.len() as u64,);
queue.obliterate().await?;
assert_eq!(queue.current_metrics.waiting.load(), 0);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn add_delayed_jobs() -> KioResult<()> {
let queue_opts = QueueOpts::default();
let job_opts = JobOptions {
delay: 200.into(),
..Default::default()
};
let store = make_store().await?;
let queue = Queue::<D, R, P, _>::new(store, Some(queue_opts)).await?;
let job = queue.add_job("delay", 1, Some(job_opts)).await?;
let metrics = queue.get_metrics().await?;
let delayed = metrics.delayed.load();
let expected_id = metrics.last_id.load();
let fetched_job = queue.get_job(expected_id).await;
assert!(metrics.has_delayed());
assert_eq!(delayed, 1);
if let Some(fetched) = fetched_job {
assert_eq!(job.id, fetched.id);
assert_eq!(fetched.delay, 200);
assert_eq!(fetched.state, JobState::Delayed);
assert_eq!(job.opts.delay, fetched.opts.delay);
}
queue.obliterate().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn add_prioritized() -> KioResult<()> {
let queue_opts = QueueOpts::default();
let job_opts = JobOptions {
priority: 2,
..Default::default()
};
let store = make_store().await?;
let queue = Queue::<D, R, P, _>::new(store, Some(queue_opts)).await?;
let job = queue.add_job("Priorized", 1, Some(job_opts)).await?;
let metrics = queue.get_metrics().await?;
let expected_id = metrics.last_id.load();
let fetched_job = queue.get_job(expected_id).await;
if let Some(fetched) = fetched_job {
assert_eq!(job.id, fetched.id);
assert_eq!(fetched.priority, 2);
assert_eq!(fetched.state, JobState::Prioritized);
assert_eq!(job.opts.delay, fetched.opts.delay);
}
queue.obliterate().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn pause_and_resume() -> KioResult<()> {
let queue_opts = QueueOpts {
event_mode: Some(kiomq::QueueEventMode::PubSub),
..Default::default()
};
let name = Uuid::new_v4().to_string();
let store = make_store().await?;
let queue = Queue::<D, R, P, _>::new(store, Some(queue_opts)).await?;
let _job = queue.add_job(&name, 1, None).await?;
let metrics = queue.get_metrics().await?;
assert_eq!(metrics.waiting.load(), 1);
queue.pause_or_resume().await?;
let metrics = queue.get_metrics().await?;
assert!(metrics.is_paused.load());
assert_eq!(metrics.waiting.load(), 0);
assert_eq!(metrics.paused.load(), 1, "paused is empty");
queue.pause_or_resume().await?;
assert!(!queue.is_paused());
let metrics = queue.get_metrics().await?;
assert_eq!(metrics.waiting.load(), 1, "waiting is empty");
queue.obliterate().await?;
Ok(())
}
}
};
}
pub use crate::queue_store_suite;