1use crate::Message;
6use chrono::{DateTime, Duration, Utc};
7use serde::{Deserialize, Serialize};
8
9#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
11pub enum RetryStrategy {
12 None,
14 Fixed {
16 delay_secs: u32,
18 },
19 Exponential {
21 base_delay_secs: u32,
23 max_delay_secs: u32,
25 multiplier: f64,
27 },
28 Linear {
30 initial_delay_secs: u32,
32 increment_secs: u32,
34 max_delay_secs: u32,
36 },
37 Custom {
39 delays: Vec<u32>,
41 },
42}
43
44impl RetryStrategy {
45 pub fn fixed(delay_secs: u32) -> Self {
47 Self::Fixed { delay_secs }
48 }
49
50 pub fn exponential(base_delay_secs: u32, max_delay_secs: u32) -> Self {
52 Self::Exponential {
53 base_delay_secs,
54 max_delay_secs,
55 multiplier: 2.0,
56 }
57 }
58
59 pub fn linear(initial_delay_secs: u32, increment_secs: u32, max_delay_secs: u32) -> Self {
61 Self::Linear {
62 initial_delay_secs,
63 increment_secs,
64 max_delay_secs,
65 }
66 }
67
68 pub fn custom(delays: Vec<u32>) -> Self {
70 Self::Custom { delays }
71 }
72
73 pub fn calculate_delay(&self, retry_count: u32) -> Option<Duration> {
75 match self {
76 RetryStrategy::None => None,
77 RetryStrategy::Fixed { delay_secs } => Some(Duration::seconds(*delay_secs as i64)),
78 RetryStrategy::Exponential {
79 base_delay_secs,
80 max_delay_secs,
81 multiplier,
82 } => {
83 let delay = (*base_delay_secs as f64 * multiplier.powi(retry_count as i32))
84 .min(*max_delay_secs as f64);
85 Some(Duration::seconds(delay as i64))
86 }
87 RetryStrategy::Linear {
88 initial_delay_secs,
89 increment_secs,
90 max_delay_secs,
91 } => {
92 let delay =
93 (initial_delay_secs + increment_secs * retry_count).min(*max_delay_secs);
94 Some(Duration::seconds(delay as i64))
95 }
96 RetryStrategy::Custom { delays } => delays
97 .get(retry_count as usize)
98 .map(|&d| Duration::seconds(d as i64)),
99 }
100 }
101
102 pub fn next_eta(&self, retry_count: u32) -> Option<DateTime<Utc>> {
104 self.calculate_delay(retry_count)
105 .map(|delay| Utc::now() + delay)
106 }
107}
108
109impl Default for RetryStrategy {
110 fn default() -> Self {
111 Self::exponential(1, 3600) }
113}
114
115#[derive(Debug, Clone)]
117pub struct RetryPolicy {
118 strategy: RetryStrategy,
119 max_retries: u32,
120 retry_on_timeout: bool,
121 retry_on_rate_limit: bool,
122}
123
124impl RetryPolicy {
125 pub fn new(strategy: RetryStrategy, max_retries: u32) -> Self {
127 Self {
128 strategy,
129 max_retries,
130 retry_on_timeout: true,
131 retry_on_rate_limit: true,
132 }
133 }
134
135 #[must_use]
137 pub fn with_retry_on_timeout(mut self, retry: bool) -> Self {
138 self.retry_on_timeout = retry;
139 self
140 }
141
142 #[must_use]
144 pub fn with_retry_on_rate_limit(mut self, retry: bool) -> Self {
145 self.retry_on_rate_limit = retry;
146 self
147 }
148
149 pub fn should_retry(&self, message: &Message) -> bool {
151 let current_retries = message.headers.retries.unwrap_or(0);
152 current_retries < self.max_retries
153 }
154
155 pub fn next_retry_eta(&self, message: &Message) -> Option<DateTime<Utc>> {
157 let retry_count = message.headers.retries.unwrap_or(0);
158 if self.should_retry(message) {
159 self.strategy.next_eta(retry_count)
160 } else {
161 None
162 }
163 }
164
165 pub fn create_retry_message(&self, message: &Message) -> Option<Message> {
167 if !self.should_retry(message) {
168 return None;
169 }
170
171 let mut retry_msg = message.clone();
172 let current_retries = retry_msg.headers.retries.unwrap_or(0);
173 retry_msg.headers.retries = Some(current_retries + 1);
174
175 if let Some(eta) = self.strategy.next_eta(current_retries) {
176 retry_msg.headers.eta = Some(eta);
177 }
178
179 Some(retry_msg)
180 }
181
182 pub fn strategy(&self) -> &RetryStrategy {
184 &self.strategy
185 }
186
187 pub fn max_retries(&self) -> u32 {
189 self.max_retries
190 }
191}
192
193impl Default for RetryPolicy {
194 fn default() -> Self {
195 Self::new(RetryStrategy::default(), 3)
196 }
197}
198
199#[derive(Debug, Clone, Default)]
201pub struct RetryStats {
202 pub total_retries: u64,
204 pub successful_retries: u64,
206 pub failed_retries: u64,
208 pub max_retries_exceeded: u64,
210}
211
212impl RetryStats {
213 pub fn new() -> Self {
215 Self::default()
216 }
217
218 pub fn record_success(&mut self) {
220 self.total_retries += 1;
221 self.successful_retries += 1;
222 }
223
224 pub fn record_failure(&mut self) {
226 self.total_retries += 1;
227 self.failed_retries += 1;
228 }
229
230 pub fn record_max_exceeded(&mut self) {
232 self.max_retries_exceeded += 1;
233 }
234
235 pub fn success_rate(&self) -> f64 {
237 if self.total_retries == 0 {
238 0.0
239 } else {
240 (self.successful_retries as f64 / self.total_retries as f64) * 100.0
241 }
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248 use crate::builder::MessageBuilder;
249
250 fn create_test_message() -> Message {
251 MessageBuilder::new("tasks.test").build().unwrap()
252 }
253
254 #[test]
255 fn test_fixed_retry_strategy() {
256 let strategy = RetryStrategy::fixed(5);
257 assert_eq!(strategy.calculate_delay(0), Some(Duration::seconds(5)));
258 assert_eq!(strategy.calculate_delay(1), Some(Duration::seconds(5)));
259 assert_eq!(strategy.calculate_delay(5), Some(Duration::seconds(5)));
260 }
261
262 #[test]
263 fn test_exponential_retry_strategy() {
264 let strategy = RetryStrategy::exponential(1, 60);
265 assert_eq!(strategy.calculate_delay(0), Some(Duration::seconds(1)));
266 assert_eq!(strategy.calculate_delay(1), Some(Duration::seconds(2)));
267 assert_eq!(strategy.calculate_delay(2), Some(Duration::seconds(4)));
268 assert_eq!(strategy.calculate_delay(3), Some(Duration::seconds(8)));
269
270 assert_eq!(strategy.calculate_delay(10), Some(Duration::seconds(60)));
272 }
273
274 #[test]
275 fn test_linear_retry_strategy() {
276 let strategy = RetryStrategy::linear(5, 10, 100);
277 assert_eq!(strategy.calculate_delay(0), Some(Duration::seconds(5)));
278 assert_eq!(strategy.calculate_delay(1), Some(Duration::seconds(15)));
279 assert_eq!(strategy.calculate_delay(2), Some(Duration::seconds(25)));
280
281 assert_eq!(strategy.calculate_delay(10), Some(Duration::seconds(100)));
283 }
284
285 #[test]
286 fn test_custom_retry_strategy() {
287 let strategy = RetryStrategy::custom(vec![1, 5, 10, 30]);
288 assert_eq!(strategy.calculate_delay(0), Some(Duration::seconds(1)));
289 assert_eq!(strategy.calculate_delay(1), Some(Duration::seconds(5)));
290 assert_eq!(strategy.calculate_delay(2), Some(Duration::seconds(10)));
291 assert_eq!(strategy.calculate_delay(3), Some(Duration::seconds(30)));
292 assert_eq!(strategy.calculate_delay(4), None); }
294
295 #[test]
296 fn test_retry_policy_should_retry() {
297 let policy = RetryPolicy::new(RetryStrategy::fixed(5), 3);
298 let mut msg = create_test_message();
299
300 assert!(policy.should_retry(&msg));
301
302 msg.headers.retries = Some(2);
303 assert!(policy.should_retry(&msg));
304
305 msg.headers.retries = Some(3);
306 assert!(!policy.should_retry(&msg));
307 }
308
309 #[test]
310 fn test_retry_policy_create_retry_message() {
311 let policy = RetryPolicy::new(RetryStrategy::fixed(5), 3);
312 let msg = create_test_message();
313
314 let retry_msg = policy.create_retry_message(&msg).unwrap();
315 assert_eq!(retry_msg.headers.retries, Some(1));
316 assert!(retry_msg.headers.eta.is_some());
317
318 let mut max_msg = msg.clone();
320 max_msg.headers.retries = Some(3);
321 assert!(policy.create_retry_message(&max_msg).is_none());
322 }
323
324 #[test]
325 fn test_retry_policy_next_retry_eta() {
326 let policy = RetryPolicy::new(RetryStrategy::fixed(10), 3);
327 let msg = create_test_message();
328
329 let eta = policy.next_retry_eta(&msg);
330 assert!(eta.is_some());
331
332 let now = Utc::now();
333 let eta_time = eta.unwrap();
334 let diff = (eta_time - now).num_seconds();
335 assert!((9..=11).contains(&diff)); }
337
338 #[test]
339 fn test_retry_stats() {
340 let mut stats = RetryStats::new();
341
342 stats.record_success();
343 stats.record_success();
344 stats.record_failure();
345 stats.record_max_exceeded();
346
347 assert_eq!(stats.total_retries, 3);
348 assert_eq!(stats.successful_retries, 2);
349 assert_eq!(stats.failed_retries, 1);
350 assert_eq!(stats.max_retries_exceeded, 1);
351
352 let rate = stats.success_rate();
354 assert!((rate - 66.66666666666667).abs() < 0.0001);
355 }
356
357 #[test]
358 fn test_retry_strategy_none() {
359 let strategy = RetryStrategy::None;
360 assert_eq!(strategy.calculate_delay(0), None);
361 assert_eq!(strategy.calculate_delay(5), None);
362 }
363
364 #[test]
365 fn test_default_retry_policy() {
366 let policy = RetryPolicy::default();
367 assert_eq!(policy.max_retries(), 3);
368 assert!(policy.retry_on_timeout);
369 assert!(policy.retry_on_rate_limit);
370 }
371}