foxtive_worker/worker.rs
1use async_trait::async_trait;
2use std::time::Duration;
3
4use crate::error::WorkerResult;
5use crate::message::ReceivedMessage;
6
7/// Backoff strategy for retries and restarts.
8#[derive(Debug, Clone)]
9pub enum BackoffStrategy {
10 /// Fixed delay between attempts.
11 Fixed(Duration),
12
13 /// Exponential backoff with optional jitter.
14 Exponential {
15 /// Initial delay
16 initial: Duration,
17 /// Maximum delay cap
18 max: Duration,
19 /// Multiplier for exponential growth (e.g., 2.0 doubles each time)
20 multiplier: f64,
21 },
22}
23
24impl BackoffStrategy {
25 /// Calculate the delay for a given attempt number (0-indexed).
26 pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
27 match self {
28 BackoffStrategy::Fixed(duration) => *duration,
29 BackoffStrategy::Exponential {
30 initial,
31 max,
32 multiplier,
33 } => {
34 let delay = initial.mul_f64(multiplier.powi(attempt as i32));
35 delay.min(*max)
36 }
37 }
38 }
39}
40
41/// Core worker interface that all workers must implement.
42///
43/// Workers are responsible for processing individual messages from a backend.
44/// They can be supervised by `foxtive-supervisor` for automatic restarts on failure.
45///
46/// # Two-Level Backoff System
47///
48/// Foxtive Worker uses two independent backoff strategies:
49///
50/// 1. **Worker Restart Backoff** (this trait): Controls how quickly a crashed/failed
51/// worker is restarted by the supervisor. This handles worker-level failures like
52/// panics, setup failures, or connection losses.
53///
54/// 2. **Message Retry Backoff** (RetryHandler middleware): Controls delays between
55/// retry attempts for individual failed messages. This is configured separately
56/// in the middleware pipeline.
57///
58/// # Example
59/// ```rust
60/// use foxtive_worker::{Worker, ReceivedMessage};
61/// use foxtive_worker::error::WorkerResult;
62/// use async_trait::async_trait;
63///
64/// struct MyWorker;
65///
66/// #[async_trait]
67/// impl Worker for MyWorker {
68/// fn id(&self) -> &str { "my-worker" }
69///
70/// async fn process(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
71/// println!("Processing message: {}", message.message.id);
72/// // Your processing logic here
73/// message.ack().await?;
74/// Ok(())
75/// }
76/// }
77/// ```
78#[async_trait]
79pub trait Worker: Send + Sync {
80 /// Unique worker identifier.
81 ///
82 /// This should be stable across restarts and unique within a worker pool.
83 fn id(&self) -> &str;
84
85 /// Human-readable name for the worker.
86 ///
87 /// Used for logging and monitoring. Defaults to the worker ID if not overridden.
88 fn name(&self) -> String {
89 self.id().to_string()
90 }
91
92 /// Process a single message.
93 ///
94 /// This is the core method where message processing logic is implemented.
95 /// The worker should acknowledge the message on success or negative-acknowledge
96 /// on failure.
97 ///
98 /// # Arguments
99 /// * `message` - The received message with acknowledgment capability
100 ///
101 /// # Returns
102 /// * `Ok(())` if processing succeeded
103 /// * `Err(WorkerError)` if processing failed
104 async fn process(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()>;
105
106 /// Optional setup before worker starts.
107 ///
108 /// This method is called once when the worker is initialized, before any
109 /// messages are processed. Use it for one-time initialization like
110 /// establishing connections or loading configuration.
111 ///
112 /// # Returns
113 /// * `Ok(())` if setup succeeded
114 /// * `Err(WorkerError)` if setup failed (worker will not start)
115 async fn setup(&self) -> WorkerResult<()> {
116 Ok(())
117 }
118
119 /// Optional cleanup on shutdown.
120 ///
121 /// This method is called when the worker is being shut down gracefully.
122 /// Use it to release resources, close connections, or flush buffers.
123 async fn teardown(&self) {}
124
125 /// Concurrency limit for this worker.
126 ///
127 /// If `Some(n)`, the worker will process at most `n` messages concurrently.
128 /// If `None`, there is no limit (bounded only by system resources).
129 ///
130 /// This is useful for preventing resource exhaustion when processing
131 /// expensive messages.
132 fn concurrency_limit(&self) -> Option<usize> {
133 None
134 }
135
136 /// Backoff strategy for worker-level restarts.
137 ///
138 /// This controls how quickly a crashed or failed worker is restarted
139 /// by the supervisor. It is **independent** of message-level retry backoff
140 /// (which is handled by the RetryHandler middleware).
141 ///
142 /// # When This Applies
143 /// - Worker panics during message processing
144 /// - Worker setup fails
145 /// - Worker encounters unrecoverable errors
146 /// - Supervisor detects worker health check failures
147 ///
148 /// # When This Does NOT Apply
149 /// - Individual message processing failures (handled by RetryHandler)
150 /// - Graceful worker shutdown
151 ///
152 /// # Default
153 /// Exponential backoff starting at 1 second, max 60 seconds, multiplier 2.0
154 fn restart_backoff_strategy(&self) -> BackoffStrategy {
155 BackoffStrategy::Exponential {
156 initial: Duration::from_secs(1),
157 max: Duration::from_secs(60),
158 multiplier: 2.0,
159 }
160 }
161
162 /// Optional processing timeout for individual messages.
163 ///
164 /// If `Some(duration)`, each message processed by this worker will have a maximum
165 /// processing time enforced. If processing exceeds this timeout, the message will
166 /// be negative-acknowledged (nacked) with requeue, preventing RabbitMQ consumer
167 /// timeout errors.
168 ///
169 /// This is especially important when:
170 /// - Processing times are variable or unpredictable
171 /// - External dependencies (APIs, databases) might be slow
172 /// - You want to fail fast rather than wait for broker timeouts
173 ///
174 /// # Relationship to Broker Timeouts
175 /// Set this to be **less than** your broker's consumer timeout to ensure
176 /// graceful handling before the broker kills the connection.
177 ///
178 /// Example: If RabbitMQ has a 30-second consumer timeout, set this to 25 seconds.
179 ///
180 /// # Default
181 /// `None` - No timeout enforcement (relies on broker timeouts)
182 ///
183 /// # Example
184 /// ```rust
185 /// use foxtive_worker::Worker;
186 /// use std::time::Duration;
187 ///
188 /// struct SlowWorker;
189 ///
190 /// #[async_trait::async_trait]
191 /// impl Worker for SlowWorker {
192 /// fn id(&self) -> &str { "slow-worker" }
193 ///
194 /// // This worker can take up to 2 minutes per message
195 /// fn processing_timeout(&self) -> Option<Duration> {
196 /// Some(Duration::from_secs(120))
197 /// }
198 ///
199 /// async fn process(&self, message: foxtive_worker::ReceivedMessage<serde_json::Value>)
200 /// -> foxtive_worker::error::WorkerResult<()> {
201 /// // Long-running processing...
202 /// Ok(())
203 /// }
204 /// }
205 /// ```
206 fn processing_timeout(&self) -> Option<Duration> {
207 None
208 }
209}
210
211/// Helper function to create a boxed worker trait object.
212pub fn box_worker<W: Worker + 'static>(worker: W) -> Box<dyn Worker> {
213 Box::new(worker)
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219
220 #[test]
221 fn test_fixed_backoff() {
222 let strategy = BackoffStrategy::Fixed(Duration::from_secs(5));
223
224 assert_eq!(strategy.delay_for_attempt(0), Duration::from_secs(5));
225 assert_eq!(strategy.delay_for_attempt(1), Duration::from_secs(5));
226 assert_eq!(strategy.delay_for_attempt(10), Duration::from_secs(5));
227 }
228
229 #[test]
230 fn test_exponential_backoff() {
231 let strategy = BackoffStrategy::Exponential {
232 initial: Duration::from_secs(1),
233 max: Duration::from_secs(60),
234 multiplier: 2.0,
235 };
236
237 assert_eq!(strategy.delay_for_attempt(0), Duration::from_secs(1));
238 assert_eq!(strategy.delay_for_attempt(1), Duration::from_secs(2));
239 assert_eq!(strategy.delay_for_attempt(2), Duration::from_secs(4));
240 assert_eq!(strategy.delay_for_attempt(3), Duration::from_secs(8));
241 assert_eq!(strategy.delay_for_attempt(4), Duration::from_secs(16));
242 assert_eq!(strategy.delay_for_attempt(5), Duration::from_secs(32));
243 // Should be capped at max
244 assert_eq!(strategy.delay_for_attempt(10), Duration::from_secs(60));
245 }
246
247 #[test]
248 fn test_worker_default_methods() {
249 struct TestWorker;
250
251 #[async_trait]
252 impl Worker for TestWorker {
253 fn id(&self) -> &str {
254 "test-worker"
255 }
256
257 async fn process(
258 &self,
259 _message: ReceivedMessage<serde_json::Value>,
260 ) -> WorkerResult<()> {
261 Ok(())
262 }
263 }
264
265 let worker = TestWorker;
266 assert_eq!(worker.id(), "test-worker");
267 assert_eq!(worker.name(), "test-worker");
268 assert!(worker.concurrency_limit().is_none());
269
270 let strategy = worker.restart_backoff_strategy();
271 match strategy {
272 BackoffStrategy::Exponential {
273 initial,
274 max,
275 multiplier,
276 } => {
277 assert_eq!(initial, Duration::from_secs(1));
278 assert_eq!(max, Duration::from_secs(60));
279 assert_eq!(multiplier, 2.0);
280 }
281 _ => panic!("Expected Exponential strategy"),
282 }
283 }
284}