Skip to main content

foxtive_worker/middleware/
mod.rs

1use async_trait::async_trait;
2use std::sync::Arc;
3
4use crate::error::WorkerError;
5use crate::message::ReceivedMessage;
6
7pub mod ack_nack;
8pub mod batch;
9pub mod circuit_breaker;
10pub mod processing_timeout;
11#[cfg(feature = "rate-limit")]
12pub mod rate_limit;
13pub mod retry_handler;
14pub mod tracing;
15
16/// Result of middleware processing.
17///
18/// This enum provides explicit control flow for middleware chains,
19/// allowing middleware to signal whether they've handled acknowledgment
20/// or whether processing should continue normally.
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum MiddlewareResult {
23    /// Message has been acknowledged by this middleware.
24    /// The pool should NOT attempt to ack/nack again.
25    Acknowledged,
26
27    /// Continue with normal processing flow.
28    /// The pool should handle ack/nack based on the result.
29    Continue,
30}
31
32impl MiddlewareResult {
33    /// Returns true if the message was already acknowledged.
34    pub fn is_acknowledged(&self) -> bool {
35        matches!(self, Self::Acknowledged)
36    }
37
38    /// Returns true if processing should continue normally.
39    pub fn is_continue(&self) -> bool {
40        matches!(self, Self::Continue)
41    }
42}
43
44/// A message handler that processes messages.
45///
46/// This trait represents a single step in the middleware chain.
47/// Handlers can be middleware components or final workers.
48#[async_trait]
49pub trait MessageHandler: Send + Sync {
50    /// Process a message.
51    ///
52    /// # Arguments
53    /// * `message` - The message to process
54    ///
55    /// # Returns
56    /// * `Ok(MiddlewareResult::Acknowledged)` if middleware handled acknowledgment
57    /// * `Ok(MiddlewareResult::Continue)` if processing should continue normally
58    /// * `Err(WorkerError)` if processing failed
59    async fn handle(&self, message: ReceivedMessage<serde_json::Value>) -> Result<MiddlewareResult, WorkerError>;
60}
61
62/// Middleware that wraps message handlers to add cross-cutting concerns.
63///
64/// Middleware forms a chain where each piece can:
65/// - Modify messages before passing them along
66/// - Perform actions before and after processing
67/// - Short-circuit the chain by returning early
68/// - Handle errors and implement retry logic
69/// - Signal whether acknowledgment was handled
70///
71/// # Example
72/// ```rust,no_run
73/// use foxtive_worker::middleware::{Middleware, MessageHandler, MiddlewareResult};
74/// use foxtive_worker::{ReceivedMessage, WorkerError};
75///
76/// struct LoggingMiddleware;
77///
78/// #[async_trait::async_trait]
79/// impl Middleware for LoggingMiddleware {
80///     fn name(&self) -> &str { "logging" }
81///     
82///     async fn handle(
83///         &self,
84///         message: ReceivedMessage<serde_json::Value>,
85///         next: Box<dyn MessageHandler>,
86///     ) -> Result<MiddlewareResult, WorkerError> {
87///         println!("Processing message: {}", message.message.id);
88///         let result = next.handle(message).await?;
89///         println!("Message processed with result: {:?}", result);
90///         Ok(result)
91///     }
92/// }
93/// ```
94#[async_trait]
95pub trait Middleware: Send + Sync {
96    /// Get the name of this middleware for debugging and logging.
97    fn name(&self) -> &str;
98
99    /// Process a message with access to the next handler in the chain.
100    ///
101    /// # Arguments
102    /// * `message` - The message to process
103    /// * `next` - The next handler in the middleware chain
104    ///
105    /// # Returns
106    /// * `Ok(MiddlewareResult::Acknowledged)` if middleware handled acknowledgment
107    /// * `Ok(MiddlewareResult::Continue)` if processing should continue normally
108    /// * `Err(WorkerError)` if processing failed
109    async fn handle(
110        &self,
111        message: ReceivedMessage<serde_json::Value>,
112        next: Box<dyn MessageHandler>,
113    ) -> Result<MiddlewareResult, WorkerError>;
114}
115
116/// A chain of middleware that processes messages in sequence.
117///
118/// Middleware chains allow you to compose multiple middleware components
119/// into a single handler that executes them in order.
120pub struct MiddlewareChain {
121    middlewares: Vec<Box<dyn Middleware>>,
122    final_handler: Box<dyn MessageHandler>,
123}
124
125impl MiddlewareChain {
126    /// Create a new middleware chain.
127    ///
128    /// # Arguments
129    /// * `middlewares` - Vector of middleware to execute in order
130    /// * `final_handler` - The final handler that processes the message
131    pub fn new(
132        middlewares: Vec<Box<dyn Middleware>>,
133        final_handler: Box<dyn MessageHandler>,
134    ) -> Self {
135        Self {
136            middlewares,
137            final_handler,
138        }
139    }
140
141    /// Build the middleware chain into a single handler.
142    ///
143    /// This creates a nested structure where each middleware wraps the next.
144    pub fn build(self) -> Box<dyn MessageHandler> {
145        let mut handler: Arc<dyn MessageHandler> = Arc::new(ArcHandlerWrapper(self.final_handler));
146
147        // Wrap handlers in reverse order so first middleware executes first
148        for middleware in self.middlewares.into_iter().rev() {
149            handler = Arc::new(MiddlewareWrapper {
150                middleware,
151                inner: handler,
152            });
153        }
154
155        Box::new(ArcHandler(handler))
156    }
157}
158
159/// Wrapper that applies a single middleware around an inner handler.
160struct MiddlewareWrapper {
161    middleware: Box<dyn Middleware>,
162    inner: Arc<dyn MessageHandler>,
163}
164
165#[async_trait]
166impl MessageHandler for MiddlewareWrapper {
167    async fn handle(&self, message: ReceivedMessage<serde_json::Value>) -> Result<MiddlewareResult, WorkerError> {
168        let next = self.inner.clone();
169        self.middleware
170            .handle(message, Box::new(ArcHandler(next)))
171            .await
172    }
173}
174
175/// Helper to wrap Arc<dyn MessageHandler> as Box<dyn MessageHandler>
176struct ArcHandler(Arc<dyn MessageHandler>);
177
178#[async_trait]
179impl MessageHandler for ArcHandler {
180    async fn handle(&self, message: ReceivedMessage<serde_json::Value>) -> Result<MiddlewareResult, WorkerError> {
181        self.0.handle(message).await
182    }
183}
184
185/// Wrapper to convert Box<dyn MessageHandler> to Arc
186struct ArcHandlerWrapper(Box<dyn MessageHandler>);
187
188#[async_trait]
189impl MessageHandler for ArcHandlerWrapper {
190    async fn handle(&self, message: ReceivedMessage<serde_json::Value>) -> Result<MiddlewareResult, WorkerError> {
191        self.0.handle(message).await
192    }
193}