use async_trait::async_trait;
use std::sync::Arc;
use crate::error::WorkerError;
use crate::message::ReceivedMessage;
pub mod ack_nack;
pub mod batch;
pub mod circuit_breaker;
pub mod processing_timeout;
#[cfg(feature = "rate-limit")]
pub mod rate_limit;
pub mod retry_handler;
pub mod tracing;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MiddlewareResult {
Acknowledged,
Continue,
}
impl MiddlewareResult {
pub fn is_acknowledged(&self) -> bool {
matches!(self, Self::Acknowledged)
}
pub fn is_continue(&self) -> bool {
matches!(self, Self::Continue)
}
}
#[async_trait]
pub trait MessageHandler: Send + Sync {
async fn handle(
&self,
message: ReceivedMessage<serde_json::Value>,
) -> Result<MiddlewareResult, WorkerError>;
}
#[async_trait]
pub trait Middleware: Send + Sync {
fn name(&self) -> &str;
async fn handle(
&self,
message: ReceivedMessage<serde_json::Value>,
next: Box<dyn MessageHandler>,
) -> Result<MiddlewareResult, WorkerError>;
}
pub struct MiddlewareChain {
middlewares: Vec<Box<dyn Middleware>>,
final_handler: Box<dyn MessageHandler>,
}
impl MiddlewareChain {
pub fn new(
middlewares: Vec<Box<dyn Middleware>>,
final_handler: Box<dyn MessageHandler>,
) -> Self {
Self {
middlewares,
final_handler,
}
}
pub fn build(self) -> Box<dyn MessageHandler> {
let mut handler: Arc<dyn MessageHandler> = Arc::new(ArcHandlerWrapper(self.final_handler));
for middleware in self.middlewares.into_iter().rev() {
handler = Arc::new(MiddlewareWrapper {
middleware,
inner: handler,
});
}
Box::new(ArcHandler(handler))
}
}
struct MiddlewareWrapper {
middleware: Box<dyn Middleware>,
inner: Arc<dyn MessageHandler>,
}
#[async_trait]
impl MessageHandler for MiddlewareWrapper {
async fn handle(
&self,
message: ReceivedMessage<serde_json::Value>,
) -> Result<MiddlewareResult, WorkerError> {
let next = self.inner.clone();
self.middleware
.handle(message, Box::new(ArcHandler(next)))
.await
}
}
struct ArcHandler(Arc<dyn MessageHandler>);
#[async_trait]
impl MessageHandler for ArcHandler {
async fn handle(
&self,
message: ReceivedMessage<serde_json::Value>,
) -> Result<MiddlewareResult, WorkerError> {
self.0.handle(message).await
}
}
struct ArcHandlerWrapper(Box<dyn MessageHandler>);
#[async_trait]
impl MessageHandler for ArcHandlerWrapper {
async fn handle(
&self,
message: ReceivedMessage<serde_json::Value>,
) -> Result<MiddlewareResult, WorkerError> {
self.0.handle(message).await
}
}