foxtive_worker/middleware/mod.rs
1use async_trait::async_trait;
2use std::sync::Arc;
3
4use crate::error::WorkerError;
5use crate::message::ReceivedMessage;
6
7pub mod ack_nack;
8pub mod batch;
9pub mod circuit_breaker;
10pub mod processing_timeout;
11#[cfg(feature = "rate-limit")]
12pub mod rate_limit;
13pub mod retry_handler;
14pub mod tracing;
15
16/// Result of middleware processing.
17///
18/// This enum provides explicit control flow for middleware chains,
19/// allowing middleware to signal whether they've handled acknowledgment
20/// or whether processing should continue normally.
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum MiddlewareResult {
23 /// Message has been acknowledged by this middleware.
24 /// The pool should NOT attempt to ack/nack again.
25 Acknowledged,
26
27 /// Continue with normal processing flow.
28 /// The pool should handle ack/nack based on the result.
29 Continue,
30}
31
32impl MiddlewareResult {
33 /// Returns true if the message was already acknowledged.
34 pub fn is_acknowledged(&self) -> bool {
35 matches!(self, Self::Acknowledged)
36 }
37
38 /// Returns true if processing should continue normally.
39 pub fn is_continue(&self) -> bool {
40 matches!(self, Self::Continue)
41 }
42}
43
44/// A message handler that processes messages.
45///
46/// This trait represents a single step in the middleware chain.
47/// Handlers can be middleware components or final workers.
48#[async_trait]
49pub trait MessageHandler: Send + Sync {
50 /// Process a message.
51 ///
52 /// # Arguments
53 /// * `message` - The message to process
54 ///
55 /// # Returns
56 /// * `Ok(MiddlewareResult::Acknowledged)` if middleware handled acknowledgment
57 /// * `Ok(MiddlewareResult::Continue)` if processing should continue normally
58 /// * `Err(WorkerError)` if processing failed
59 async fn handle(
60 &self,
61 message: ReceivedMessage<serde_json::Value>,
62 ) -> Result<MiddlewareResult, WorkerError>;
63}
64
65/// Middleware that wraps message handlers to add cross-cutting concerns.
66///
67/// Middleware forms a chain where each piece can:
68/// - Modify messages before passing them along
69/// - Perform actions before and after processing
70/// - Short-circuit the chain by returning early
71/// - Handle errors and implement retry logic
72/// - Signal whether acknowledgment was handled
73///
74/// # Example
75/// ```rust,no_run
76/// use foxtive_worker::middleware::{Middleware, MessageHandler, MiddlewareResult};
77/// use foxtive_worker::{ReceivedMessage, WorkerError};
78///
79/// struct LoggingMiddleware;
80///
81/// #[async_trait::async_trait]
82/// impl Middleware for LoggingMiddleware {
83/// fn name(&self) -> &str { "logging" }
84///
85/// async fn handle(
86/// &self,
87/// message: ReceivedMessage<serde_json::Value>,
88/// next: Box<dyn MessageHandler>,
89/// ) -> Result<MiddlewareResult, WorkerError> {
90/// println!("Processing message: {}", message.message.id);
91/// let result = next.handle(message).await?;
92/// println!("Message processed with result: {:?}", result);
93/// Ok(result)
94/// }
95/// }
96/// ```
97#[async_trait]
98pub trait Middleware: Send + Sync {
99 /// Get the name of this middleware for debugging and logging.
100 fn name(&self) -> &str;
101
102 /// Process a message with access to the next handler in the chain.
103 ///
104 /// # Arguments
105 /// * `message` - The message to process
106 /// * `next` - The next handler in the middleware chain
107 ///
108 /// # Returns
109 /// * `Ok(MiddlewareResult::Acknowledged)` if middleware handled acknowledgment
110 /// * `Ok(MiddlewareResult::Continue)` if processing should continue normally
111 /// * `Err(WorkerError)` if processing failed
112 async fn handle(
113 &self,
114 message: ReceivedMessage<serde_json::Value>,
115 next: Box<dyn MessageHandler>,
116 ) -> Result<MiddlewareResult, WorkerError>;
117}
118
119/// A chain of middleware that processes messages in sequence.
120///
121/// Middleware chains allow you to compose multiple middleware components
122/// into a single handler that executes them in order.
123pub struct MiddlewareChain {
124 middlewares: Vec<Box<dyn Middleware>>,
125 final_handler: Box<dyn MessageHandler>,
126}
127
128impl MiddlewareChain {
129 /// Create a new middleware chain.
130 ///
131 /// # Arguments
132 /// * `middlewares` - Vector of middleware to execute in order
133 /// * `final_handler` - The final handler that processes the message
134 pub fn new(
135 middlewares: Vec<Box<dyn Middleware>>,
136 final_handler: Box<dyn MessageHandler>,
137 ) -> Self {
138 Self {
139 middlewares,
140 final_handler,
141 }
142 }
143
144 /// Build the middleware chain into a single handler.
145 ///
146 /// This creates a nested structure where each middleware wraps the next.
147 pub fn build(self) -> Box<dyn MessageHandler> {
148 let mut handler: Arc<dyn MessageHandler> = Arc::new(ArcHandlerWrapper(self.final_handler));
149
150 // Wrap handlers in reverse order so first middleware executes first
151 for middleware in self.middlewares.into_iter().rev() {
152 handler = Arc::new(MiddlewareWrapper {
153 middleware,
154 inner: handler,
155 });
156 }
157
158 Box::new(ArcHandler(handler))
159 }
160}
161
162/// Wrapper that applies a single middleware around an inner handler.
163struct MiddlewareWrapper {
164 middleware: Box<dyn Middleware>,
165 inner: Arc<dyn MessageHandler>,
166}
167
168#[async_trait]
169impl MessageHandler for MiddlewareWrapper {
170 async fn handle(
171 &self,
172 message: ReceivedMessage<serde_json::Value>,
173 ) -> Result<MiddlewareResult, WorkerError> {
174 let next = self.inner.clone();
175 self.middleware
176 .handle(message, Box::new(ArcHandler(next)))
177 .await
178 }
179}
180
181/// Helper to wrap Arc<dyn MessageHandler> as Box<dyn MessageHandler>
182struct ArcHandler(Arc<dyn MessageHandler>);
183
184#[async_trait]
185impl MessageHandler for ArcHandler {
186 async fn handle(
187 &self,
188 message: ReceivedMessage<serde_json::Value>,
189 ) -> Result<MiddlewareResult, WorkerError> {
190 self.0.handle(message).await
191 }
192}
193
194/// Wrapper to convert Box<dyn MessageHandler> to Arc
195struct ArcHandlerWrapper(Box<dyn MessageHandler>);
196
197#[async_trait]
198impl MessageHandler for ArcHandlerWrapper {
199 async fn handle(
200 &self,
201 message: ReceivedMessage<serde_json::Value>,
202 ) -> Result<MiddlewareResult, WorkerError> {
203 self.0.handle(message).await
204 }
205}