Skip to main content

foxtive_worker/middleware/
mod.rs

1use async_trait::async_trait;
2use std::sync::Arc;
3
4use crate::error::WorkerResult;
5use crate::message::ReceivedMessage;
6
7pub mod ack_nack;
8#[cfg(feature = "rate-limit")]
9pub mod rate_limit;
10pub mod circuit_breaker;
11pub mod tracing;
12pub mod retry_handler;
13pub mod batch;
14pub mod processing_timeout;
15
16/// A message handler that processes messages.
17///
18/// This trait represents a single step in the middleware chain.
19/// Handlers can be middleware components or final workers.
20#[async_trait]
21pub trait MessageHandler: Send + Sync {
22    /// Process a message.
23    ///
24    /// # Arguments
25    /// * `message` - The message to process
26    ///
27    /// # Returns
28    /// * `Ok(())` if processing succeeded
29    /// * `Err(WorkerError)` if processing failed
30    async fn handle(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()>;
31}
32
33/// Middleware that wraps message handlers to add cross-cutting concerns.
34///
35/// Middleware forms a chain where each piece can:
36/// - Modify messages before passing them along
37/// - Perform actions before and after processing
38/// - Short-circuit the chain by returning early
39/// - Handle errors and implement retry logic
40///
41/// # Example
42/// ```rust,no_run
43/// use foxtive_worker::middleware::{Middleware, MessageHandler};
44/// use foxtive_worker::{ReceivedMessage, WorkerResult};
45///
46/// struct LoggingMiddleware;
47///
48/// #[async_trait::async_trait]
49/// impl Middleware for LoggingMiddleware {
50///     fn name(&self) -> &str { "logging" }
51///     
52///     async fn handle(
53///         &self,
54///         message: ReceivedMessage<serde_json::Value>,
55///         next: Box<dyn MessageHandler>,
56///     ) -> WorkerResult<()> {
57///         println!("Processing message: {}", message.message.id);
58///         let result = next.handle(message).await;
59///         println!("Message processed with result: {:?}", result.is_ok());
60///         result
61///     }
62/// }
63/// ```
64#[async_trait]
65pub trait Middleware: Send + Sync {
66    /// Get the name of this middleware for debugging and logging.
67    fn name(&self) -> &str;
68
69    /// Process a message with access to the next handler in the chain.
70    ///
71    /// # Arguments
72    /// * `message` - The message to process
73    /// * `next` - The next handler in the middleware chain
74    ///
75    /// # Returns
76    /// * `Ok(())` if processing succeeded
77    /// * `Err(WorkerError)` if processing failed
78    async fn handle(
79        &self,
80        message: ReceivedMessage<serde_json::Value>,
81        next: Box<dyn MessageHandler>,
82    ) -> WorkerResult<()>;
83}
84
85/// A chain of middleware that processes messages in sequence.
86///
87/// Middleware chains allow you to compose multiple middleware components
88/// into a single handler that executes them in order.
89pub struct MiddlewareChain {
90    middlewares: Vec<Box<dyn Middleware>>,
91    final_handler: Box<dyn MessageHandler>,
92}
93
94impl MiddlewareChain {
95    /// Create a new middleware chain.
96    ///
97    /// # Arguments
98    /// * `middlewares` - Vector of middleware to execute in order
99    /// * `final_handler` - The final handler that processes the message
100    pub fn new(
101        middlewares: Vec<Box<dyn Middleware>>,
102        final_handler: Box<dyn MessageHandler>,
103    ) -> Self {
104        Self {
105            middlewares,
106            final_handler,
107        }
108    }
109
110    /// Build the middleware chain into a single handler.
111    ///
112    /// This creates a nested structure where each middleware wraps the next.
113    pub fn build(self) -> Box<dyn MessageHandler> {
114        let mut handler: Arc<dyn MessageHandler> = Arc::new(ArcHandlerWrapper(self.final_handler));
115
116        // Wrap handlers in reverse order so first middleware executes first
117        for middleware in self.middlewares.into_iter().rev() {
118            handler = Arc::new(MiddlewareWrapper {
119                middleware,
120                inner: handler,
121            });
122        }
123
124        Box::new(ArcHandler(handler))
125    }
126}
127
128/// Wrapper that applies a single middleware around an inner handler.
129struct MiddlewareWrapper {
130    middleware: Box<dyn Middleware>,
131    inner: Arc<dyn MessageHandler>,
132}
133
134#[async_trait]
135impl MessageHandler for MiddlewareWrapper {
136    async fn handle(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
137        let next = self.inner.clone();
138        self.middleware.handle(message, Box::new(ArcHandler(next))).await
139    }
140}
141
142/// Helper to wrap Arc<dyn MessageHandler> as Box<dyn MessageHandler>
143struct ArcHandler(Arc<dyn MessageHandler>);
144
145#[async_trait]
146impl MessageHandler for ArcHandler {
147    async fn handle(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
148        self.0.handle(message).await
149    }
150}
151
152/// Wrapper to convert Box<dyn MessageHandler> to Arc
153struct ArcHandlerWrapper(Box<dyn MessageHandler>);
154
155#[async_trait]
156impl MessageHandler for ArcHandlerWrapper {
157    async fn handle(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
158        self.0.handle(message).await
159    }
160}