foxtive_worker/middleware/mod.rs
1use async_trait::async_trait;
2use std::sync::Arc;
3
4use crate::error::WorkerError;
5use crate::message::ReceivedMessage;
6
7pub mod ack_nack;
8pub mod batch;
9pub mod circuit_breaker;
10pub mod processing_timeout;
11#[cfg(feature = "rate-limit")]
12pub mod rate_limit;
13pub mod retry_handler;
14pub mod tracing;
15
16/// Result of middleware processing.
17///
18/// This enum provides explicit control flow for middleware chains,
19/// allowing middleware to signal whether they've handled acknowledgment
20/// or whether processing should continue normally.
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum MiddlewareResult {
23 /// Message has been acknowledged by this middleware.
24 /// The pool should NOT attempt to ack/nack again.
25 Acknowledged,
26
27 /// Continue with normal processing flow.
28 /// The pool should handle ack/nack based on the result.
29 Continue,
30}
31
32impl MiddlewareResult {
33 /// Returns true if the message was already acknowledged.
34 pub fn is_acknowledged(&self) -> bool {
35 matches!(self, Self::Acknowledged)
36 }
37
38 /// Returns true if processing should continue normally.
39 pub fn is_continue(&self) -> bool {
40 matches!(self, Self::Continue)
41 }
42}
43
44/// A message handler that processes messages.
45///
46/// This trait represents a single step in the middleware chain.
47/// Handlers can be middleware components or final workers.
48#[async_trait]
49pub trait MessageHandler: Send + Sync {
50 /// Process a message.
51 ///
52 /// # Arguments
53 /// * `message` - The message to process
54 ///
55 /// # Returns
56 /// * `Ok(MiddlewareResult::Acknowledged)` if middleware handled acknowledgment
57 /// * `Ok(MiddlewareResult::Continue)` if processing should continue normally
58 /// * `Err(WorkerError)` if processing failed
59 async fn handle(&self, message: ReceivedMessage<serde_json::Value>) -> Result<MiddlewareResult, WorkerError>;
60}
61
62/// Middleware that wraps message handlers to add cross-cutting concerns.
63///
64/// Middleware forms a chain where each piece can:
65/// - Modify messages before passing them along
66/// - Perform actions before and after processing
67/// - Short-circuit the chain by returning early
68/// - Handle errors and implement retry logic
69/// - Signal whether acknowledgment was handled
70///
71/// # Example
72/// ```rust,no_run
73/// use foxtive_worker::middleware::{Middleware, MessageHandler, MiddlewareResult};
74/// use foxtive_worker::{ReceivedMessage, WorkerError};
75///
76/// struct LoggingMiddleware;
77///
78/// #[async_trait::async_trait]
79/// impl Middleware for LoggingMiddleware {
80/// fn name(&self) -> &str { "logging" }
81///
82/// async fn handle(
83/// &self,
84/// message: ReceivedMessage<serde_json::Value>,
85/// next: Box<dyn MessageHandler>,
86/// ) -> Result<MiddlewareResult, WorkerError> {
87/// println!("Processing message: {}", message.message.id);
88/// let result = next.handle(message).await?;
89/// println!("Message processed with result: {:?}", result);
90/// Ok(result)
91/// }
92/// }
93/// ```
94#[async_trait]
95pub trait Middleware: Send + Sync {
96 /// Get the name of this middleware for debugging and logging.
97 fn name(&self) -> &str;
98
99 /// Process a message with access to the next handler in the chain.
100 ///
101 /// # Arguments
102 /// * `message` - The message to process
103 /// * `next` - The next handler in the middleware chain
104 ///
105 /// # Returns
106 /// * `Ok(MiddlewareResult::Acknowledged)` if middleware handled acknowledgment
107 /// * `Ok(MiddlewareResult::Continue)` if processing should continue normally
108 /// * `Err(WorkerError)` if processing failed
109 async fn handle(
110 &self,
111 message: ReceivedMessage<serde_json::Value>,
112 next: Box<dyn MessageHandler>,
113 ) -> Result<MiddlewareResult, WorkerError>;
114}
115
116/// A chain of middleware that processes messages in sequence.
117///
118/// Middleware chains allow you to compose multiple middleware components
119/// into a single handler that executes them in order.
120pub struct MiddlewareChain {
121 middlewares: Vec<Box<dyn Middleware>>,
122 final_handler: Box<dyn MessageHandler>,
123}
124
125impl MiddlewareChain {
126 /// Create a new middleware chain.
127 ///
128 /// # Arguments
129 /// * `middlewares` - Vector of middleware to execute in order
130 /// * `final_handler` - The final handler that processes the message
131 pub fn new(
132 middlewares: Vec<Box<dyn Middleware>>,
133 final_handler: Box<dyn MessageHandler>,
134 ) -> Self {
135 Self {
136 middlewares,
137 final_handler,
138 }
139 }
140
141 /// Build the middleware chain into a single handler.
142 ///
143 /// This creates a nested structure where each middleware wraps the next.
144 pub fn build(self) -> Box<dyn MessageHandler> {
145 let mut handler: Arc<dyn MessageHandler> = Arc::new(ArcHandlerWrapper(self.final_handler));
146
147 // Wrap handlers in reverse order so first middleware executes first
148 for middleware in self.middlewares.into_iter().rev() {
149 handler = Arc::new(MiddlewareWrapper {
150 middleware,
151 inner: handler,
152 });
153 }
154
155 Box::new(ArcHandler(handler))
156 }
157}
158
159/// Wrapper that applies a single middleware around an inner handler.
160struct MiddlewareWrapper {
161 middleware: Box<dyn Middleware>,
162 inner: Arc<dyn MessageHandler>,
163}
164
165#[async_trait]
166impl MessageHandler for MiddlewareWrapper {
167 async fn handle(&self, message: ReceivedMessage<serde_json::Value>) -> Result<MiddlewareResult, WorkerError> {
168 let next = self.inner.clone();
169 self.middleware
170 .handle(message, Box::new(ArcHandler(next)))
171 .await
172 }
173}
174
175/// Helper to wrap Arc<dyn MessageHandler> as Box<dyn MessageHandler>
176struct ArcHandler(Arc<dyn MessageHandler>);
177
178#[async_trait]
179impl MessageHandler for ArcHandler {
180 async fn handle(&self, message: ReceivedMessage<serde_json::Value>) -> Result<MiddlewareResult, WorkerError> {
181 self.0.handle(message).await
182 }
183}
184
185/// Wrapper to convert Box<dyn MessageHandler> to Arc
186struct ArcHandlerWrapper(Box<dyn MessageHandler>);
187
188#[async_trait]
189impl MessageHandler for ArcHandlerWrapper {
190 async fn handle(&self, message: ReceivedMessage<serde_json::Value>) -> Result<MiddlewareResult, WorkerError> {
191 self.0.handle(message).await
192 }
193}