mod run;
mod types;
use crate::Job;
use crate::runtime::{Runtime, SelectedRuntime};
use crate::task::ClaimerBuilder;
use redis::aio::MultiplexedConnection;
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::atomic::Ordering;
pub use types::{NoDlqError, NoDlqFn, NoDlqFut, Queue, QueueBuilder, QueueHandle};
impl<I, E, F, Fut> QueueBuilder<I, E, F, Fut, NoDlqError, NoDlqFn<I>, NoDlqFut>
where
I: Job + Send + Sync + 'static,
F: Fn(I) -> Fut + 'static + Send + Sync,
E: std::fmt::Display + Send + 'static,
Fut: Future<Output = Result<(), E>> + Send,
{
pub fn new(
name: impl Into<String>,
consumer_group: impl Into<String>,
consumer_id: impl Into<String>,
worker: F,
conn: MultiplexedConnection,
read_conn: MultiplexedConnection,
) -> Self {
Self {
name: name.into(),
consumer_group: consumer_group.into(),
consumer_id: consumer_id.into(),
block_timeout: 5000,
max_concurrent_tasks: 1,
worker: Arc::new(worker),
claimer: None,
conn,
read_conn,
_marker: PhantomData,
}
}
}
impl<I, E, F, Fut, DE, DF, DFut> QueueBuilder<I, E, F, Fut, DE, DF, DFut>
where
I: Job + Send + Sync + 'static,
F: Fn(I) -> Fut + 'static + Send + Sync,
E: std::fmt::Display + Send + 'static,
Fut: Future<Output = Result<(), E>> + Send,
DF: Fn(I, usize) -> DFut + 'static + Send + Sync,
DE: std::fmt::Display + Send + 'static,
DFut: Future<Output = Result<(), DE>> + Send,
{
pub fn block_timeout(mut self, ms: usize) -> Self {
self.block_timeout = ms;
self
}
pub fn max_concurrent_tasks(mut self, n: usize) -> Self {
self.max_concurrent_tasks = n;
self
}
pub fn claimer<DE2, DF2, DFut2>(
self,
claimer: ClaimerBuilder<I, DE2, DF2, DFut2>,
) -> QueueBuilder<I, E, F, Fut, DE2, DF2, DFut2>
where
DF2: Fn(I, usize) -> DFut2 + 'static + Send + Sync,
DE2: std::fmt::Display + Send + 'static,
DFut2: Future<Output = Result<(), DE2>> + Send,
{
QueueBuilder {
name: self.name,
consumer_group: self.consumer_group,
consumer_id: self.consumer_id,
block_timeout: self.block_timeout,
max_concurrent_tasks: self.max_concurrent_tasks,
worker: self.worker,
claimer: Some(claimer),
conn: self.conn,
read_conn: self.read_conn,
_marker: PhantomData,
}
}
}
impl QueueHandle {
pub async fn shutdown(self) {
self.shutdown.store(true, Ordering::Relaxed);
SelectedRuntime::join(self.main_join).await;
if let Some(claimer_join) = self.claimer_join {
SelectedRuntime::join(claimer_join).await;
}
}
}