use async_trait::async_trait;
use celers_protocol::Message;
use std::time::Duration;
use crate::{Consumer, Envelope, Producer, Result};
#[async_trait]
pub trait MessageMiddleware: Send + Sync {
async fn before_publish(&self, message: &mut Message) -> Result<()>;
async fn after_consume(&self, message: &mut Message) -> Result<()>;
fn name(&self) -> &str;
}
pub struct MiddlewareChain {
middlewares: Vec<Box<dyn MessageMiddleware>>,
}
impl MiddlewareChain {
pub fn new() -> Self {
Self {
middlewares: Vec::new(),
}
}
pub fn with_middleware(mut self, middleware: Box<dyn MessageMiddleware>) -> Self {
self.middlewares.push(middleware);
self
}
pub async fn process_before_publish(&self, message: &mut Message) -> Result<()> {
for middleware in &self.middlewares {
middleware.before_publish(message).await?;
}
Ok(())
}
pub async fn process_after_consume(&self, message: &mut Message) -> Result<()> {
for middleware in &self.middlewares {
middleware.after_consume(message).await?;
}
Ok(())
}
pub fn len(&self) -> usize {
self.middlewares.len()
}
pub fn is_empty(&self) -> bool {
self.middlewares.is_empty()
}
}
impl Default for MiddlewareChain {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
pub trait MiddlewareProducer: Producer {
async fn publish_with_middleware(
&mut self,
queue: &str,
mut message: Message,
chain: &MiddlewareChain,
) -> Result<()> {
chain.process_before_publish(&mut message).await?;
self.publish(queue, message).await
}
}
#[async_trait]
pub trait MiddlewareConsumer: Consumer {
async fn consume_with_middleware(
&mut self,
queue: &str,
timeout: Duration,
chain: &MiddlewareChain,
) -> Result<Option<Envelope>> {
if let Some(mut envelope) = self.consume(queue, timeout).await? {
chain.process_after_consume(&mut envelope.message).await?;
Ok(Some(envelope))
} else {
Ok(None)
}
}
}