mod types;
use crate::{
Job,
task::{Queue, QueueHandle},
};
use anyhow::Error;
use redis::RedisResult;
use redis::aio::MultiplexedConnection;
pub use types::{QueueGroup, QueueGroupHandle};
impl QueueGroupHandle {
pub async fn shutdown(self) {
let handles = self.handles;
futures_util::future::join_all(handles.into_iter().map(|h| h.shutdown())).await;
}
}
mod sealed {
pub trait Sealed {}
}
pub trait RunnableQueue: sealed::Sealed {
fn run(self: Box<Self>) -> QueueHandle;
fn name(&self) -> &str;
fn consumer_group(&self) -> &str;
fn starting_id(&self) -> Option<&str>;
fn conn(&self) -> MultiplexedConnection;
}
impl<I, E, F, Fut, DE, DF, DFut> sealed::Sealed for Queue<I, E, F, Fut, DE, DF, DFut>
where
I: Job + Send + Sync + 'static,
F: Fn(I) -> Fut + 'static + Send + Sync,
E: std::fmt::Display + Send + 'static,
Fut: Future<Output = Result<(), E>> + Send,
DF: Fn(I, usize) -> DFut + 'static + Send + Sync,
DE: std::fmt::Display + Send + 'static,
DFut: Future<Output = Result<(), DE>> + Send,
{
}
impl<I, E, F, Fut, DE, DF, DFut> RunnableQueue for Queue<I, E, F, Fut, DE, DF, DFut>
where
I: Job + Send + Sync + 'static,
F: Fn(I) -> Fut + 'static + Send + Sync,
E: std::fmt::Display + Send + 'static,
Fut: Future<Output = Result<(), E>> + Send,
DF: Fn(I, usize) -> DFut + 'static + Send + Sync,
DE: std::fmt::Display + Send + 'static,
DFut: Future<Output = Result<(), DE>> + Send,
{
fn run(self: Box<Self>) -> QueueHandle {
Queue::run(*self)
}
fn name(&self) -> &str {
Queue::name(self)
}
fn consumer_group(&self) -> &str {
Queue::consumer_group(self)
}
fn starting_id(&self) -> Option<&str> {
Queue::starting_id(self)
}
fn conn(&self) -> MultiplexedConnection {
Queue::conn(self)
}
}
impl QueueGroup {
pub fn push(&mut self, queue: impl RunnableQueue + 'static) {
self.queues.push(Box::new(queue));
}
pub fn run_all(self) -> QueueGroupHandle {
QueueGroupHandle {
handles: self.queues.into_iter().map(RunnableQueue::run).collect(),
}
}
pub async fn init_all(&self) -> Result<(), Error> {
let Some(first) = self.queues.first() else {
return Ok(());
};
let mut conn = first.conn();
let mut pipe = redis::pipe();
for queue in self.queues.iter() {
pipe.xgroup_create_mkstream(
queue.name(),
queue.consumer_group(),
queue.starting_id().unwrap_or("0"),
);
}
let results: Vec<RedisResult<()>> = pipe.query_async(&mut conn).await?;
for r in results {
match r {
Ok(_) => {}
Err(e) if e.code() == Some("BUSYGROUP") => {}
Err(e) => return Err(anyhow::anyhow!(e)),
}
}
Ok(())
}
}