Skip to main content

foxtive_worker/middleware/
mod.rs

1use async_trait::async_trait;
2use std::sync::Arc;
3
4use crate::error::WorkerError;
5use crate::message::ReceivedMessage;
6
7pub mod ack_nack;
8pub mod batch;
9pub mod circuit_breaker;
10pub mod processing_timeout;
11#[cfg(feature = "rate-limit")]
12pub mod rate_limit;
13pub mod retry_handler;
14pub mod tracing;
15
16/// Result of middleware processing.
17///
18/// This enum provides explicit control flow for middleware chains,
19/// allowing middleware to signal whether they've handled acknowledgment
20/// or whether processing should continue normally.
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum MiddlewareResult {
23    /// Message has been acknowledged by this middleware.
24    /// The pool should NOT attempt to ack/nack again.
25    Acknowledged,
26
27    /// Continue with normal processing flow.
28    /// The pool should handle ack/nack based on the result.
29    Continue,
30}
31
32impl MiddlewareResult {
33    /// Returns true if the message was already acknowledged.
34    pub fn is_acknowledged(&self) -> bool {
35        matches!(self, Self::Acknowledged)
36    }
37
38    /// Returns true if processing should continue normally.
39    pub fn is_continue(&self) -> bool {
40        matches!(self, Self::Continue)
41    }
42}
43
44/// A message handler that processes messages.
45///
46/// This trait represents a single step in the middleware chain.
47/// Handlers can be middleware components or final workers.
48#[async_trait]
49pub trait MessageHandler: Send + Sync {
50    /// Process a message.
51    ///
52    /// # Arguments
53    /// * `message` - The message to process
54    ///
55    /// # Returns
56    /// * `Ok(MiddlewareResult::Acknowledged)` if middleware handled acknowledgment
57    /// * `Ok(MiddlewareResult::Continue)` if processing should continue normally
58    /// * `Err(WorkerError)` if processing failed
59    async fn handle(
60        &self,
61        message: ReceivedMessage<serde_json::Value>,
62    ) -> Result<MiddlewareResult, WorkerError>;
63}
64
65/// Middleware that wraps message handlers to add cross-cutting concerns.
66///
67/// Middleware forms a chain where each piece can:
68/// - Modify messages before passing them along
69/// - Perform actions before and after processing
70/// - Short-circuit the chain by returning early
71/// - Handle errors and implement retry logic
72/// - Signal whether acknowledgment was handled
73///
74/// # Example
75/// ```rust,no_run
76/// use foxtive_worker::middleware::{Middleware, MessageHandler, MiddlewareResult};
77/// use foxtive_worker::{ReceivedMessage, WorkerError};
78///
79/// struct LoggingMiddleware;
80///
81/// #[async_trait::async_trait]
82/// impl Middleware for LoggingMiddleware {
83///     fn name(&self) -> &str { "logging" }
84///     
85///     async fn handle(
86///         &self,
87///         message: ReceivedMessage<serde_json::Value>,
88///         next: Box<dyn MessageHandler>,
89///     ) -> Result<MiddlewareResult, WorkerError> {
90///         println!("Processing message: {}", message.message.id);
91///         let result = next.handle(message).await?;
92///         println!("Message processed with result: {:?}", result);
93///         Ok(result)
94///     }
95/// }
96/// ```
97#[async_trait]
98pub trait Middleware: Send + Sync {
99    /// Get the name of this middleware for debugging and logging.
100    fn name(&self) -> &str;
101
102    /// Process a message with access to the next handler in the chain.
103    ///
104    /// # Arguments
105    /// * `message` - The message to process
106    /// * `next` - The next handler in the middleware chain
107    ///
108    /// # Returns
109    /// * `Ok(MiddlewareResult::Acknowledged)` if middleware handled acknowledgment
110    /// * `Ok(MiddlewareResult::Continue)` if processing should continue normally
111    /// * `Err(WorkerError)` if processing failed
112    async fn handle(
113        &self,
114        message: ReceivedMessage<serde_json::Value>,
115        next: Box<dyn MessageHandler>,
116    ) -> Result<MiddlewareResult, WorkerError>;
117}
118
119/// A chain of middleware that processes messages in sequence.
120///
121/// Middleware chains allow you to compose multiple middleware components
122/// into a single handler that executes them in order.
123pub struct MiddlewareChain {
124    middlewares: Vec<Box<dyn Middleware>>,
125    final_handler: Box<dyn MessageHandler>,
126}
127
128impl MiddlewareChain {
129    /// Create a new middleware chain.
130    ///
131    /// # Arguments
132    /// * `middlewares` - Vector of middleware to execute in order
133    /// * `final_handler` - The final handler that processes the message
134    pub fn new(
135        middlewares: Vec<Box<dyn Middleware>>,
136        final_handler: Box<dyn MessageHandler>,
137    ) -> Self {
138        Self {
139            middlewares,
140            final_handler,
141        }
142    }
143
144    /// Build the middleware chain into a single handler.
145    ///
146    /// This creates a nested structure where each middleware wraps the next.
147    pub fn build(self) -> Box<dyn MessageHandler> {
148        let mut handler: Arc<dyn MessageHandler> = Arc::new(ArcHandlerWrapper(self.final_handler));
149
150        // Wrap handlers in reverse order so first middleware executes first
151        for middleware in self.middlewares.into_iter().rev() {
152            handler = Arc::new(MiddlewareWrapper {
153                middleware,
154                inner: handler,
155            });
156        }
157
158        Box::new(ArcHandler(handler))
159    }
160}
161
162/// Wrapper that applies a single middleware around an inner handler.
163struct MiddlewareWrapper {
164    middleware: Box<dyn Middleware>,
165    inner: Arc<dyn MessageHandler>,
166}
167
168#[async_trait]
169impl MessageHandler for MiddlewareWrapper {
170    async fn handle(
171        &self,
172        message: ReceivedMessage<serde_json::Value>,
173    ) -> Result<MiddlewareResult, WorkerError> {
174        let next = self.inner.clone();
175        self.middleware
176            .handle(message, Box::new(ArcHandler(next)))
177            .await
178    }
179}
180
181/// Helper to wrap Arc<dyn MessageHandler> as Box<dyn MessageHandler>
182struct ArcHandler(Arc<dyn MessageHandler>);
183
184#[async_trait]
185impl MessageHandler for ArcHandler {
186    async fn handle(
187        &self,
188        message: ReceivedMessage<serde_json::Value>,
189    ) -> Result<MiddlewareResult, WorkerError> {
190        self.0.handle(message).await
191    }
192}
193
194/// Wrapper to convert Box<dyn MessageHandler> to Arc
195struct ArcHandlerWrapper(Box<dyn MessageHandler>);
196
197#[async_trait]
198impl MessageHandler for ArcHandlerWrapper {
199    async fn handle(
200        &self,
201        message: ReceivedMessage<serde_json::Value>,
202    ) -> Result<MiddlewareResult, WorkerError> {
203        self.0.handle(message).await
204    }
205}