Skip to main content

foxtive_worker/
worker.rs

1use async_trait::async_trait;
2use std::time::Duration;
3
4use crate::error::WorkerResult;
5use crate::message::ReceivedMessage;
6
7/// Backoff strategy for retries and restarts.
8#[derive(Debug, Clone)]
9pub enum BackoffStrategy {
10    /// Fixed delay between attempts.
11    Fixed(Duration),
12
13    /// Exponential backoff with optional jitter.
14    Exponential {
15        /// Initial delay
16        initial: Duration,
17        /// Maximum delay cap
18        max: Duration,
19        /// Multiplier for exponential growth (e.g., 2.0 doubles each time)
20        multiplier: f64,
21    },
22}
23
24impl BackoffStrategy {
25    /// Calculate the delay for a given attempt number (0-indexed).
26    pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
27        match self {
28            BackoffStrategy::Fixed(duration) => *duration,
29            BackoffStrategy::Exponential {
30                initial,
31                max,
32                multiplier,
33            } => {
34                let delay = initial.mul_f64(multiplier.powi(attempt as i32));
35                delay.min(*max)
36            }
37        }
38    }
39}
40
41/// Core worker interface that all workers must implement.
42///
43/// Workers are responsible for processing individual messages from a backend.
44/// They can be supervised by `foxtive-supervisor` for automatic restarts on failure.
45///
46/// # Two-Level Backoff System
47///
48/// Foxtive Worker uses two independent backoff strategies:
49///
50/// 1. **Worker Restart Backoff** (this trait): Controls how quickly a crashed/failed
51///    worker is restarted by the supervisor. This handles worker-level failures like
52///    panics, setup failures, or connection losses.
53///
54/// 2. **Message Retry Backoff** (RetryHandler middleware): Controls delays between
55///    retry attempts for individual failed messages. This is configured separately
56///    in the middleware pipeline.
57///
58/// # Example
59/// ```rust
60/// use foxtive_worker::{Worker, ReceivedMessage};
61/// use foxtive_worker::error::WorkerResult;
62/// use async_trait::async_trait;
63///
64/// struct MyWorker;
65///
66/// #[async_trait]
67/// impl Worker for MyWorker {
68///     fn id(&self) -> &str { "my-worker" }
69///     
70///     async fn process(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
71///         println!("Processing message: {}", message.message.id);
72///         // Your processing logic here
73///         message.ack().await?;
74///         Ok(())
75///     }
76/// }
77/// ```
78#[async_trait]
79pub trait Worker: Send + Sync {
80    /// Unique worker identifier.
81    ///
82    /// This should be stable across restarts and unique within a worker pool.
83    fn id(&self) -> &str;
84
85    /// Human-readable name for the worker.
86    ///
87    /// Used for logging and monitoring. Defaults to the worker ID if not overridden.
88    fn name(&self) -> String {
89        self.id().to_string()
90    }
91
92    /// Process a single message.
93    ///
94    /// This is the core method where message processing logic is implemented.
95    /// The worker should acknowledge the message on success or negative-acknowledge
96    /// on failure.
97    ///
98    /// # Arguments
99    /// * `message` - The received message with acknowledgment capability
100    ///
101    /// # Returns
102    /// * `Ok(())` if processing succeeded
103    /// * `Err(WorkerError)` if processing failed
104    async fn process(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()>;
105
106    /// Optional setup before worker starts.
107    ///
108    /// This method is called once when the worker is initialized, before any
109    /// messages are processed. Use it for one-time initialization like
110    /// establishing connections or loading configuration.
111    ///
112    /// # Returns
113    /// * `Ok(())` if setup succeeded
114    /// * `Err(WorkerError)` if setup failed (worker will not start)
115    async fn setup(&self) -> WorkerResult<()> {
116        Ok(())
117    }
118
119    /// Optional cleanup on shutdown.
120    ///
121    /// This method is called when the worker is being shut down gracefully.
122    /// Use it to release resources, close connections, or flush buffers.
123    async fn teardown(&self) {}
124
125    /// Concurrency limit for this worker.
126    ///
127    /// If `Some(n)`, the worker will process at most `n` messages concurrently.
128    /// If `None`, there is no limit (bounded only by system resources).
129    ///
130    /// This is useful for preventing resource exhaustion when processing
131    /// expensive messages.
132    fn concurrency_limit(&self) -> Option<usize> {
133        None
134    }
135
136    /// Backoff strategy for worker-level restarts.
137    ///
138    /// This controls how quickly a crashed or failed worker is restarted
139    /// by the supervisor. It is **independent** of message-level retry backoff
140    /// (which is handled by the RetryHandler middleware).
141    ///
142    /// # When This Applies
143    /// - Worker panics during message processing
144    /// - Worker setup fails
145    /// - Worker encounters unrecoverable errors
146    /// - Supervisor detects worker health check failures
147    ///
148    /// # When This Does NOT Apply
149    /// - Individual message processing failures (handled by RetryHandler)
150    /// - Graceful worker shutdown
151    ///
152    /// # Default
153    /// Exponential backoff starting at 1 second, max 60 seconds, multiplier 2.0
154    fn restart_backoff_strategy(&self) -> BackoffStrategy {
155        BackoffStrategy::Exponential {
156            initial: Duration::from_secs(1),
157            max: Duration::from_secs(60),
158            multiplier: 2.0,
159        }
160    }
161
162    /// Optional processing timeout for individual messages.
163    ///
164    /// If `Some(duration)`, each message processed by this worker will have a maximum
165    /// processing time enforced. If processing exceeds this timeout, the message will
166    /// be negative-acknowledged (nacked) with requeue, preventing RabbitMQ consumer
167    /// timeout errors.
168    ///
169    /// This is especially important when:
170    /// - Processing times are variable or unpredictable
171    /// - External dependencies (APIs, databases) might be slow
172    /// - You want to fail fast rather than wait for broker timeouts
173    ///
174    /// # Relationship to Broker Timeouts
175    /// Set this to be **less than** your broker's consumer timeout to ensure
176    /// graceful handling before the broker kills the connection.
177    ///
178    /// Example: If RabbitMQ has a 30-second consumer timeout, set this to 25 seconds.
179    ///
180    /// # Default
181    /// `None` - No timeout enforcement (relies on broker timeouts)
182    ///
183    /// # Example
184    /// ```rust
185    /// use foxtive_worker::Worker;
186    /// use std::time::Duration;
187    ///
188    /// struct SlowWorker;
189    ///
190    /// #[async_trait::async_trait]
191    /// impl Worker for SlowWorker {
192    ///     fn id(&self) -> &str { "slow-worker" }
193    ///     
194    ///     // This worker can take up to 2 minutes per message
195    ///     fn processing_timeout(&self) -> Option<Duration> {
196    ///         Some(Duration::from_secs(120))
197    ///     }
198    ///     
199    ///     async fn process(&self, message: foxtive_worker::ReceivedMessage<serde_json::Value>) 
200    ///         -> foxtive_worker::error::WorkerResult<()> {
201    ///         // Long-running processing...
202    ///         Ok(())
203    ///     }
204    /// }
205    /// ```
206    fn processing_timeout(&self) -> Option<Duration> {
207        None
208    }
209}
210
211/// Helper function to create a boxed worker trait object.
212pub fn box_worker<W: Worker + 'static>(worker: W) -> Box<dyn Worker> {
213    Box::new(worker)
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219
220    #[test]
221    fn test_fixed_backoff() {
222        let strategy = BackoffStrategy::Fixed(Duration::from_secs(5));
223
224        assert_eq!(strategy.delay_for_attempt(0), Duration::from_secs(5));
225        assert_eq!(strategy.delay_for_attempt(1), Duration::from_secs(5));
226        assert_eq!(strategy.delay_for_attempt(10), Duration::from_secs(5));
227    }
228
229    #[test]
230    fn test_exponential_backoff() {
231        let strategy = BackoffStrategy::Exponential {
232            initial: Duration::from_secs(1),
233            max: Duration::from_secs(60),
234            multiplier: 2.0,
235        };
236
237        assert_eq!(strategy.delay_for_attempt(0), Duration::from_secs(1));
238        assert_eq!(strategy.delay_for_attempt(1), Duration::from_secs(2));
239        assert_eq!(strategy.delay_for_attempt(2), Duration::from_secs(4));
240        assert_eq!(strategy.delay_for_attempt(3), Duration::from_secs(8));
241        assert_eq!(strategy.delay_for_attempt(4), Duration::from_secs(16));
242        assert_eq!(strategy.delay_for_attempt(5), Duration::from_secs(32));
243        // Should be capped at max
244        assert_eq!(strategy.delay_for_attempt(10), Duration::from_secs(60));
245    }
246
247    #[test]
248    fn test_worker_default_methods() {
249        struct TestWorker;
250
251        #[async_trait]
252        impl Worker for TestWorker {
253            fn id(&self) -> &str {
254                "test-worker"
255            }
256
257            async fn process(
258                &self,
259                _message: ReceivedMessage<serde_json::Value>,
260            ) -> WorkerResult<()> {
261                Ok(())
262            }
263        }
264
265        let worker = TestWorker;
266        assert_eq!(worker.id(), "test-worker");
267        assert_eq!(worker.name(), "test-worker");
268        assert!(worker.concurrency_limit().is_none());
269
270        let strategy = worker.restart_backoff_strategy();
271        match strategy {
272            BackoffStrategy::Exponential {
273                initial,
274                max,
275                multiplier,
276            } => {
277                assert_eq!(initial, Duration::from_secs(1));
278                assert_eq!(max, Duration::from_secs(60));
279                assert_eq!(multiplier, 2.0);
280            }
281            _ => panic!("Expected Exponential strategy"),
282        }
283    }
284}