celers_protocol/
retry.rs

1//! Retry strategy utilities
2//!
3//! This module provides sophisticated retry strategies for message processing.
4
5use crate::Message;
6use chrono::{DateTime, Duration, Utc};
7use serde::{Deserialize, Serialize};
8
9/// Retry strategy for failed tasks
10#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
11pub enum RetryStrategy {
12    /// No retry
13    None,
14    /// Fixed delay between retries
15    Fixed {
16        /// Delay in seconds
17        delay_secs: u32,
18    },
19    /// Exponential backoff with optional jitter
20    Exponential {
21        /// Base delay in seconds
22        base_delay_secs: u32,
23        /// Maximum delay in seconds
24        max_delay_secs: u32,
25        /// Multiplier for each retry (default: 2.0)
26        multiplier: f64,
27    },
28    /// Linear backoff
29    Linear {
30        /// Initial delay in seconds
31        initial_delay_secs: u32,
32        /// Increment per retry in seconds
33        increment_secs: u32,
34        /// Maximum delay in seconds
35        max_delay_secs: u32,
36    },
37    /// Custom delay for each retry attempt
38    Custom {
39        /// Delays for each retry attempt (in seconds)
40        delays: Vec<u32>,
41    },
42}
43
44impl RetryStrategy {
45    /// Create a fixed delay retry strategy
46    pub fn fixed(delay_secs: u32) -> Self {
47        Self::Fixed { delay_secs }
48    }
49
50    /// Create an exponential backoff retry strategy
51    pub fn exponential(base_delay_secs: u32, max_delay_secs: u32) -> Self {
52        Self::Exponential {
53            base_delay_secs,
54            max_delay_secs,
55            multiplier: 2.0,
56        }
57    }
58
59    /// Create a linear backoff retry strategy
60    pub fn linear(initial_delay_secs: u32, increment_secs: u32, max_delay_secs: u32) -> Self {
61        Self::Linear {
62            initial_delay_secs,
63            increment_secs,
64            max_delay_secs,
65        }
66    }
67
68    /// Create a custom retry strategy with specific delays
69    pub fn custom(delays: Vec<u32>) -> Self {
70        Self::Custom { delays }
71    }
72
73    /// Calculate the delay for a specific retry attempt
74    pub fn calculate_delay(&self, retry_count: u32) -> Option<Duration> {
75        match self {
76            RetryStrategy::None => None,
77            RetryStrategy::Fixed { delay_secs } => Some(Duration::seconds(*delay_secs as i64)),
78            RetryStrategy::Exponential {
79                base_delay_secs,
80                max_delay_secs,
81                multiplier,
82            } => {
83                let delay = (*base_delay_secs as f64 * multiplier.powi(retry_count as i32))
84                    .min(*max_delay_secs as f64);
85                Some(Duration::seconds(delay as i64))
86            }
87            RetryStrategy::Linear {
88                initial_delay_secs,
89                increment_secs,
90                max_delay_secs,
91            } => {
92                let delay =
93                    (initial_delay_secs + increment_secs * retry_count).min(*max_delay_secs);
94                Some(Duration::seconds(delay as i64))
95            }
96            RetryStrategy::Custom { delays } => delays
97                .get(retry_count as usize)
98                .map(|&d| Duration::seconds(d as i64)),
99        }
100    }
101
102    /// Get the ETA for the next retry
103    pub fn next_eta(&self, retry_count: u32) -> Option<DateTime<Utc>> {
104        self.calculate_delay(retry_count)
105            .map(|delay| Utc::now() + delay)
106    }
107}
108
109impl Default for RetryStrategy {
110    fn default() -> Self {
111        Self::exponential(1, 3600) // 1 second base, 1 hour max
112    }
113}
114
115/// Retry policy with maximum retry limits
116#[derive(Debug, Clone)]
117pub struct RetryPolicy {
118    strategy: RetryStrategy,
119    max_retries: u32,
120    retry_on_timeout: bool,
121    retry_on_rate_limit: bool,
122}
123
124impl RetryPolicy {
125    /// Create a new retry policy
126    pub fn new(strategy: RetryStrategy, max_retries: u32) -> Self {
127        Self {
128            strategy,
129            max_retries,
130            retry_on_timeout: true,
131            retry_on_rate_limit: true,
132        }
133    }
134
135    /// Set whether to retry on timeout errors
136    #[must_use]
137    pub fn with_retry_on_timeout(mut self, retry: bool) -> Self {
138        self.retry_on_timeout = retry;
139        self
140    }
141
142    /// Set whether to retry on rate limit errors
143    #[must_use]
144    pub fn with_retry_on_rate_limit(mut self, retry: bool) -> Self {
145        self.retry_on_rate_limit = retry;
146        self
147    }
148
149    /// Check if a message should be retried
150    pub fn should_retry(&self, message: &Message) -> bool {
151        let current_retries = message.headers.retries.unwrap_or(0);
152        current_retries < self.max_retries
153    }
154
155    /// Calculate the next ETA for a retry
156    pub fn next_retry_eta(&self, message: &Message) -> Option<DateTime<Utc>> {
157        let retry_count = message.headers.retries.unwrap_or(0);
158        if self.should_retry(message) {
159            self.strategy.next_eta(retry_count)
160        } else {
161            None
162        }
163    }
164
165    /// Create a retry message with updated retry count and ETA
166    pub fn create_retry_message(&self, message: &Message) -> Option<Message> {
167        if !self.should_retry(message) {
168            return None;
169        }
170
171        let mut retry_msg = message.clone();
172        let current_retries = retry_msg.headers.retries.unwrap_or(0);
173        retry_msg.headers.retries = Some(current_retries + 1);
174
175        if let Some(eta) = self.strategy.next_eta(current_retries) {
176            retry_msg.headers.eta = Some(eta);
177        }
178
179        Some(retry_msg)
180    }
181
182    /// Get the retry strategy
183    pub fn strategy(&self) -> &RetryStrategy {
184        &self.strategy
185    }
186
187    /// Get the maximum number of retries
188    pub fn max_retries(&self) -> u32 {
189        self.max_retries
190    }
191}
192
193impl Default for RetryPolicy {
194    fn default() -> Self {
195        Self::new(RetryStrategy::default(), 3)
196    }
197}
198
199/// Retry statistics
200#[derive(Debug, Clone, Default)]
201pub struct RetryStats {
202    /// Total number of retries attempted
203    pub total_retries: u64,
204    /// Number of successful retries
205    pub successful_retries: u64,
206    /// Number of failed retries
207    pub failed_retries: u64,
208    /// Number of messages that exceeded max retries
209    pub max_retries_exceeded: u64,
210}
211
212impl RetryStats {
213    /// Create new retry statistics
214    pub fn new() -> Self {
215        Self::default()
216    }
217
218    /// Record a successful retry
219    pub fn record_success(&mut self) {
220        self.total_retries += 1;
221        self.successful_retries += 1;
222    }
223
224    /// Record a failed retry
225    pub fn record_failure(&mut self) {
226        self.total_retries += 1;
227        self.failed_retries += 1;
228    }
229
230    /// Record a message that exceeded max retries
231    pub fn record_max_exceeded(&mut self) {
232        self.max_retries_exceeded += 1;
233    }
234
235    /// Get the success rate as a percentage
236    pub fn success_rate(&self) -> f64 {
237        if self.total_retries == 0 {
238            0.0
239        } else {
240            (self.successful_retries as f64 / self.total_retries as f64) * 100.0
241        }
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248    use crate::builder::MessageBuilder;
249
250    fn create_test_message() -> Message {
251        MessageBuilder::new("tasks.test").build().unwrap()
252    }
253
254    #[test]
255    fn test_fixed_retry_strategy() {
256        let strategy = RetryStrategy::fixed(5);
257        assert_eq!(strategy.calculate_delay(0), Some(Duration::seconds(5)));
258        assert_eq!(strategy.calculate_delay(1), Some(Duration::seconds(5)));
259        assert_eq!(strategy.calculate_delay(5), Some(Duration::seconds(5)));
260    }
261
262    #[test]
263    fn test_exponential_retry_strategy() {
264        let strategy = RetryStrategy::exponential(1, 60);
265        assert_eq!(strategy.calculate_delay(0), Some(Duration::seconds(1)));
266        assert_eq!(strategy.calculate_delay(1), Some(Duration::seconds(2)));
267        assert_eq!(strategy.calculate_delay(2), Some(Duration::seconds(4)));
268        assert_eq!(strategy.calculate_delay(3), Some(Duration::seconds(8)));
269
270        // Test max cap
271        assert_eq!(strategy.calculate_delay(10), Some(Duration::seconds(60)));
272    }
273
274    #[test]
275    fn test_linear_retry_strategy() {
276        let strategy = RetryStrategy::linear(5, 10, 100);
277        assert_eq!(strategy.calculate_delay(0), Some(Duration::seconds(5)));
278        assert_eq!(strategy.calculate_delay(1), Some(Duration::seconds(15)));
279        assert_eq!(strategy.calculate_delay(2), Some(Duration::seconds(25)));
280
281        // Test max cap
282        assert_eq!(strategy.calculate_delay(10), Some(Duration::seconds(100)));
283    }
284
285    #[test]
286    fn test_custom_retry_strategy() {
287        let strategy = RetryStrategy::custom(vec![1, 5, 10, 30]);
288        assert_eq!(strategy.calculate_delay(0), Some(Duration::seconds(1)));
289        assert_eq!(strategy.calculate_delay(1), Some(Duration::seconds(5)));
290        assert_eq!(strategy.calculate_delay(2), Some(Duration::seconds(10)));
291        assert_eq!(strategy.calculate_delay(3), Some(Duration::seconds(30)));
292        assert_eq!(strategy.calculate_delay(4), None); // Beyond custom delays
293    }
294
295    #[test]
296    fn test_retry_policy_should_retry() {
297        let policy = RetryPolicy::new(RetryStrategy::fixed(5), 3);
298        let mut msg = create_test_message();
299
300        assert!(policy.should_retry(&msg));
301
302        msg.headers.retries = Some(2);
303        assert!(policy.should_retry(&msg));
304
305        msg.headers.retries = Some(3);
306        assert!(!policy.should_retry(&msg));
307    }
308
309    #[test]
310    fn test_retry_policy_create_retry_message() {
311        let policy = RetryPolicy::new(RetryStrategy::fixed(5), 3);
312        let msg = create_test_message();
313
314        let retry_msg = policy.create_retry_message(&msg).unwrap();
315        assert_eq!(retry_msg.headers.retries, Some(1));
316        assert!(retry_msg.headers.eta.is_some());
317
318        // Test max retries
319        let mut max_msg = msg.clone();
320        max_msg.headers.retries = Some(3);
321        assert!(policy.create_retry_message(&max_msg).is_none());
322    }
323
324    #[test]
325    fn test_retry_policy_next_retry_eta() {
326        let policy = RetryPolicy::new(RetryStrategy::fixed(10), 3);
327        let msg = create_test_message();
328
329        let eta = policy.next_retry_eta(&msg);
330        assert!(eta.is_some());
331
332        let now = Utc::now();
333        let eta_time = eta.unwrap();
334        let diff = (eta_time - now).num_seconds();
335        assert!((9..=11).contains(&diff)); // ~10 seconds
336    }
337
338    #[test]
339    fn test_retry_stats() {
340        let mut stats = RetryStats::new();
341
342        stats.record_success();
343        stats.record_success();
344        stats.record_failure();
345        stats.record_max_exceeded();
346
347        assert_eq!(stats.total_retries, 3);
348        assert_eq!(stats.successful_retries, 2);
349        assert_eq!(stats.failed_retries, 1);
350        assert_eq!(stats.max_retries_exceeded, 1);
351
352        // Use approximate equality for floating point
353        let rate = stats.success_rate();
354        assert!((rate - 66.66666666666667).abs() < 0.0001);
355    }
356
357    #[test]
358    fn test_retry_strategy_none() {
359        let strategy = RetryStrategy::None;
360        assert_eq!(strategy.calculate_delay(0), None);
361        assert_eq!(strategy.calculate_delay(5), None);
362    }
363
364    #[test]
365    fn test_default_retry_policy() {
366        let policy = RetryPolicy::default();
367        assert_eq!(policy.max_retries(), 3);
368        assert!(policy.retry_on_timeout);
369        assert!(policy.retry_on_rate_limit);
370    }
371}