foxtive_worker/middleware/mod.rs
1use async_trait::async_trait;
2use std::sync::Arc;
3
4use crate::error::WorkerResult;
5use crate::message::ReceivedMessage;
6
7pub mod ack_nack;
8#[cfg(feature = "rate-limit")]
9pub mod rate_limit;
10pub mod circuit_breaker;
11pub mod tracing;
12pub mod retry_handler;
13pub mod batch;
14pub mod processing_timeout;
15
16/// A message handler that processes messages.
17///
18/// This trait represents a single step in the middleware chain.
19/// Handlers can be middleware components or final workers.
20#[async_trait]
21pub trait MessageHandler: Send + Sync {
22 /// Process a message.
23 ///
24 /// # Arguments
25 /// * `message` - The message to process
26 ///
27 /// # Returns
28 /// * `Ok(())` if processing succeeded
29 /// * `Err(WorkerError)` if processing failed
30 async fn handle(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()>;
31}
32
33/// Middleware that wraps message handlers to add cross-cutting concerns.
34///
35/// Middleware forms a chain where each piece can:
36/// - Modify messages before passing them along
37/// - Perform actions before and after processing
38/// - Short-circuit the chain by returning early
39/// - Handle errors and implement retry logic
40///
41/// # Example
42/// ```rust,no_run
43/// use foxtive_worker::middleware::{Middleware, MessageHandler};
44/// use foxtive_worker::{ReceivedMessage, WorkerResult};
45///
46/// struct LoggingMiddleware;
47///
48/// #[async_trait::async_trait]
49/// impl Middleware for LoggingMiddleware {
50/// fn name(&self) -> &str { "logging" }
51///
52/// async fn handle(
53/// &self,
54/// message: ReceivedMessage<serde_json::Value>,
55/// next: Box<dyn MessageHandler>,
56/// ) -> WorkerResult<()> {
57/// println!("Processing message: {}", message.message.id);
58/// let result = next.handle(message).await;
59/// println!("Message processed with result: {:?}", result.is_ok());
60/// result
61/// }
62/// }
63/// ```
64#[async_trait]
65pub trait Middleware: Send + Sync {
66 /// Get the name of this middleware for debugging and logging.
67 fn name(&self) -> &str;
68
69 /// Process a message with access to the next handler in the chain.
70 ///
71 /// # Arguments
72 /// * `message` - The message to process
73 /// * `next` - The next handler in the middleware chain
74 ///
75 /// # Returns
76 /// * `Ok(())` if processing succeeded
77 /// * `Err(WorkerError)` if processing failed
78 async fn handle(
79 &self,
80 message: ReceivedMessage<serde_json::Value>,
81 next: Box<dyn MessageHandler>,
82 ) -> WorkerResult<()>;
83}
84
85/// A chain of middleware that processes messages in sequence.
86///
87/// Middleware chains allow you to compose multiple middleware components
88/// into a single handler that executes them in order.
89pub struct MiddlewareChain {
90 middlewares: Vec<Box<dyn Middleware>>,
91 final_handler: Box<dyn MessageHandler>,
92}
93
94impl MiddlewareChain {
95 /// Create a new middleware chain.
96 ///
97 /// # Arguments
98 /// * `middlewares` - Vector of middleware to execute in order
99 /// * `final_handler` - The final handler that processes the message
100 pub fn new(
101 middlewares: Vec<Box<dyn Middleware>>,
102 final_handler: Box<dyn MessageHandler>,
103 ) -> Self {
104 Self {
105 middlewares,
106 final_handler,
107 }
108 }
109
110 /// Build the middleware chain into a single handler.
111 ///
112 /// This creates a nested structure where each middleware wraps the next.
113 pub fn build(self) -> Box<dyn MessageHandler> {
114 let mut handler: Arc<dyn MessageHandler> = Arc::new(ArcHandlerWrapper(self.final_handler));
115
116 // Wrap handlers in reverse order so first middleware executes first
117 for middleware in self.middlewares.into_iter().rev() {
118 handler = Arc::new(MiddlewareWrapper {
119 middleware,
120 inner: handler,
121 });
122 }
123
124 Box::new(ArcHandler(handler))
125 }
126}
127
128/// Wrapper that applies a single middleware around an inner handler.
129struct MiddlewareWrapper {
130 middleware: Box<dyn Middleware>,
131 inner: Arc<dyn MessageHandler>,
132}
133
134#[async_trait]
135impl MessageHandler for MiddlewareWrapper {
136 async fn handle(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
137 let next = self.inner.clone();
138 self.middleware.handle(message, Box::new(ArcHandler(next))).await
139 }
140}
141
142/// Helper to wrap Arc<dyn MessageHandler> as Box<dyn MessageHandler>
143struct ArcHandler(Arc<dyn MessageHandler>);
144
145#[async_trait]
146impl MessageHandler for ArcHandler {
147 async fn handle(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
148 self.0.handle(message).await
149 }
150}
151
152/// Wrapper to convert Box<dyn MessageHandler> to Arc
153struct ArcHandlerWrapper(Box<dyn MessageHandler>);
154
155#[async_trait]
156impl MessageHandler for ArcHandlerWrapper {
157 async fn handle(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
158 self.0.handle(message).await
159 }
160}