1use crate::utils::RetryConfig;
26use std::{
27 collections::HashMap,
28 sync::{Arc, Mutex},
29 time::{Duration, Instant},
30};
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
34pub enum FailureType {
35 Timeout,
37 ConnectionFailed,
39 RateLimited,
41 InvalidData,
43 ServerError,
45 Unknown,
47}
48
49impl FailureType {
50 #[must_use]
52 #[inline]
53 pub const fn is_retryable(&self) -> bool {
54 matches!(
55 self,
56 Self::Timeout | Self::ConnectionFailed | Self::ServerError
57 )
58 }
59
60 #[must_use]
62 #[inline]
63 pub const fn retry_multiplier(&self) -> f64 {
64 match self {
65 Self::Timeout => 1.5, Self::ConnectionFailed => 2.0, Self::RateLimited => 3.0, Self::ServerError => 1.2, Self::InvalidData => 0.5, Self::Unknown => 1.0, }
72 }
73}
74
75#[derive(Debug, Clone)]
77struct FailureRecord {
78 failure_type: FailureType,
79 timestamp: Instant,
80}
81
82#[derive(Debug, Clone, Default)]
84struct TargetStats {
85 total_attempts: u64,
87 successful_attempts: u64,
89 recent_failures: Vec<FailureRecord>,
91 last_success: Option<Instant>,
93 consecutive_failures: u32,
95}
96
97impl TargetStats {
98 #[must_use]
100 #[inline]
101 fn success_rate(&self) -> f64 {
102 if self.total_attempts == 0 {
103 return 0.5; }
105 self.successful_attempts as f64 / self.total_attempts as f64
106 }
107
108 #[must_use]
110 #[inline]
111 fn dominant_failure_type(&self) -> Option<FailureType> {
112 let mut counts: HashMap<FailureType, usize> = HashMap::new();
113 for record in &self.recent_failures {
114 *counts.entry(record.failure_type).or_insert(0) += 1;
115 }
116
117 counts
118 .into_iter()
119 .max_by_key(|(_, count)| *count)
120 .map(|(failure_type, _)| failure_type)
121 }
122
123 #[must_use]
125 #[inline]
126 fn is_having_issues(&self) -> bool {
127 self.consecutive_failures > 3 || self.success_rate() < 0.3
128 }
129}
130
131pub struct AdaptiveRetryPolicy {
133 target_stats: Arc<Mutex<HashMap<String, TargetStats>>>,
135 base_config: RetryConfig,
137 history_window: Duration,
139}
140
141impl AdaptiveRetryPolicy {
142 #[must_use]
144 #[inline]
145 pub fn new() -> Self {
146 Self {
147 target_stats: Arc::new(Mutex::new(HashMap::new())),
148 base_config: RetryConfig::default(),
149 history_window: Duration::from_secs(300), }
151 }
152
153 #[must_use]
155 #[inline]
156 pub fn with_config(base_config: RetryConfig) -> Self {
157 Self {
158 target_stats: Arc::new(Mutex::new(HashMap::new())),
159 base_config,
160 history_window: Duration::from_secs(300),
161 }
162 }
163
164 pub fn record_success(&mut self, target: &str) {
166 let mut stats = self.target_stats.lock().unwrap();
167 let entry = stats.entry(target.to_string()).or_default();
168
169 entry.total_attempts += 1;
170 entry.successful_attempts += 1;
171 entry.consecutive_failures = 0;
172 entry.last_success = Some(Instant::now());
173 }
174
175 pub fn record_failure(&mut self, target: &str, failure_type: FailureType) {
177 let mut stats = self.target_stats.lock().unwrap();
178 let entry = stats.entry(target.to_string()).or_default();
179
180 entry.total_attempts += 1;
181 entry.consecutive_failures += 1;
182 entry.recent_failures.push(FailureRecord {
183 failure_type,
184 timestamp: Instant::now(),
185 });
186
187 self.cleanup_old_failures(entry);
189 }
190
191 fn cleanup_old_failures(&self, stats: &mut TargetStats) {
193 let cutoff = Instant::now() - self.history_window;
194 stats.recent_failures.retain(|f| f.timestamp > cutoff);
195 }
196
197 #[must_use]
199 #[inline]
200 pub fn should_retry(&self, target: &str, attempt: u32) -> bool {
201 let stats = self.target_stats.lock().unwrap();
202
203 if attempt >= self.base_config.max_attempts {
205 return false;
206 }
207
208 if let Some(target_stats) = stats.get(target) {
210 if target_stats.consecutive_failures > 5 {
212 return false;
213 }
214
215 if let Some(failure_type) = target_stats.dominant_failure_type() {
217 if !failure_type.is_retryable() {
218 return false;
219 }
220 }
221 }
222
223 true
224 }
225
226 #[must_use]
228 #[inline]
229 pub fn retry_delay(&self, target: &str, attempt: u32) -> Duration {
230 let base_delay = self.base_config.delay_for_attempt(attempt);
231 let stats = self.target_stats.lock().unwrap();
232
233 if let Some(target_stats) = stats.get(target) {
234 let mut multiplier = 1.0;
236
237 let success_rate = target_stats.success_rate();
239 if success_rate < 0.5 {
240 multiplier *= 1.5;
241 } else if success_rate < 0.7 {
242 multiplier *= 1.2;
243 }
244
245 if let Some(failure_type) = target_stats.dominant_failure_type() {
247 multiplier *= failure_type.retry_multiplier();
248 }
249
250 if target_stats.consecutive_failures > 2 {
252 multiplier *= 1.5f64.powi(target_stats.consecutive_failures as i32 - 2);
253 }
254
255 Duration::from_millis((base_delay.as_millis() as f64 * multiplier) as u64)
256 .min(Duration::from_millis(self.base_config.max_delay_ms))
257 } else {
258 base_delay
259 }
260 }
261
262 #[must_use]
264 #[inline]
265 pub fn success_rate(&self, target: &str) -> f64 {
266 let stats = self.target_stats.lock().unwrap();
267 stats.get(target).map(|s| s.success_rate()).unwrap_or(0.5)
268 }
269
270 #[must_use]
272 #[inline]
273 pub fn consecutive_failures(&self, target: &str) -> u32 {
274 let stats = self.target_stats.lock().unwrap();
275 stats
276 .get(target)
277 .map(|s| s.consecutive_failures)
278 .unwrap_or(0)
279 }
280
281 #[must_use]
283 #[inline]
284 pub fn is_target_having_issues(&self, target: &str) -> bool {
285 let stats = self.target_stats.lock().unwrap();
286 stats
287 .get(target)
288 .map(|s| s.is_having_issues())
289 .unwrap_or(false)
290 }
291
292 #[must_use]
294 #[inline]
295 pub fn recommended_config(&self, target: &str) -> RetryConfig {
296 let stats = self.target_stats.lock().unwrap();
297
298 if let Some(target_stats) = stats.get(target) {
299 let success_rate = target_stats.success_rate();
300
301 if success_rate > 0.8 {
303 RetryConfig::aggressive()
305 } else if success_rate > 0.5 {
306 self.base_config.clone()
308 } else {
309 RetryConfig::conservative()
311 }
312 } else {
313 self.base_config.clone()
315 }
316 }
317
318 pub fn reset_target(&mut self, target: &str) {
320 let mut stats = self.target_stats.lock().unwrap();
321 stats.remove(target);
322 }
323
324 pub fn reset_all(&mut self) {
326 let mut stats = self.target_stats.lock().unwrap();
327 stats.clear();
328 }
329
330 #[must_use]
332 #[inline]
333 pub fn tracked_targets_count(&self) -> usize {
334 let stats = self.target_stats.lock().unwrap();
335 stats.len()
336 }
337
338 #[must_use]
340 #[inline]
341 pub fn detect_failure_burst(&self, target: &str) -> bool {
342 let stats = self.target_stats.lock().unwrap();
343
344 if let Some(target_stats) = stats.get(target) {
345 let one_minute_ago = Instant::now() - Duration::from_secs(60);
347 let recent_count = target_stats
348 .recent_failures
349 .iter()
350 .filter(|f| f.timestamp > one_minute_ago)
351 .count();
352
353 return recent_count >= 5;
354 }
355
356 false
357 }
358
359 #[must_use]
361 #[inline]
362 pub fn failure_interval(&self, target: &str) -> Option<Duration> {
363 let stats = self.target_stats.lock().unwrap();
364
365 if let Some(target_stats) = stats.get(target) {
366 if target_stats.recent_failures.len() < 2 {
367 return None;
368 }
369
370 let failures = &target_stats.recent_failures;
371 let mut intervals = Vec::new();
372
373 for i in 1..failures.len() {
374 let interval = failures[i]
375 .timestamp
376 .saturating_duration_since(failures[i - 1].timestamp);
377 intervals.push(interval);
378 }
379
380 if intervals.is_empty() {
381 return None;
382 }
383
384 let total: Duration = intervals.iter().sum();
386 Some(total / intervals.len() as u32)
387 } else {
388 None
389 }
390 }
391
392 #[must_use]
394 #[inline]
395 pub fn predict_recovery_time(&self, target: &str) -> Option<Duration> {
396 let stats = self.target_stats.lock().unwrap();
397
398 if let Some(target_stats) = stats.get(target) {
399 if let Some(last_success) = target_stats.last_success {
400 let time_since_success = Instant::now().saturating_duration_since(last_success);
401
402 if target_stats.consecutive_failures > 0 {
404 let estimated_recovery = time_since_success * 2;
407 return Some(estimated_recovery);
408 }
409 }
410
411 if !target_stats.recent_failures.is_empty() {
413 return Some(Duration::from_secs(60)); }
415 }
416
417 None
418 }
419
420 #[must_use]
422 #[inline]
423 pub fn failure_patterns(&self, target: &str) -> Option<FailurePatterns> {
424 let stats = self.target_stats.lock().unwrap();
425
426 stats.get(target).map(|target_stats| {
427 let mut type_counts: HashMap<FailureType, usize> = HashMap::new();
428 for record in &target_stats.recent_failures {
429 *type_counts.entry(record.failure_type).or_insert(0) += 1;
430 }
431
432 let one_minute_ago = Instant::now() - Duration::from_secs(60);
434 let recent_count = target_stats
435 .recent_failures
436 .iter()
437 .filter(|f| f.timestamp > one_minute_ago)
438 .count();
439 let is_burst = recent_count >= 5;
440
441 FailurePatterns {
442 total_failures: target_stats.recent_failures.len(),
443 failure_types: type_counts,
444 consecutive_failures: target_stats.consecutive_failures,
445 success_rate: target_stats.success_rate(),
446 is_burst,
447 dominant_type: target_stats.dominant_failure_type(),
448 }
449 })
450 }
451}
452
453#[derive(Debug, Clone)]
455pub struct FailurePatterns {
456 pub total_failures: usize,
458 pub failure_types: HashMap<FailureType, usize>,
460 pub consecutive_failures: u32,
462 pub success_rate: f64,
464 pub is_burst: bool,
466 pub dominant_type: Option<FailureType>,
468}
469
470impl FailurePatterns {
471 #[must_use]
473 #[inline]
474 pub fn is_systemic_issue(&self) -> bool {
475 self.consecutive_failures > 5 || self.success_rate < 0.2 || self.is_burst
476 }
477
478 #[must_use]
480 #[inline]
481 pub fn failure_type_percentage(&self, failure_type: FailureType) -> f64 {
482 if self.total_failures == 0 {
483 return 0.0;
484 }
485 let count = self.failure_types.get(&failure_type).copied().unwrap_or(0);
486 count as f64 / self.total_failures as f64
487 }
488}
489
490impl Default for AdaptiveRetryPolicy {
491 fn default() -> Self {
492 Self::new()
493 }
494}
495
496#[cfg(test)]
497mod tests {
498 use super::*;
499
500 #[test]
501 fn test_failure_type_retryable() {
502 assert!(FailureType::Timeout.is_retryable());
503 assert!(FailureType::ConnectionFailed.is_retryable());
504 assert!(!FailureType::RateLimited.is_retryable());
505 }
506
507 #[test]
508 fn test_adaptive_policy_success_rate() {
509 let mut policy = AdaptiveRetryPolicy::new();
510
511 policy.record_success("peer1");
512 policy.record_success("peer1");
513 policy.record_failure("peer1", FailureType::Timeout);
514
515 let rate = policy.success_rate("peer1");
516 assert!((rate - 0.666).abs() < 0.01);
517 }
518
519 #[test]
520 fn test_should_retry_after_max_attempts() {
521 let policy = AdaptiveRetryPolicy::new();
522 assert!(!policy.should_retry("peer1", 10));
523 }
524
525 #[test]
526 fn test_consecutive_failures_tracking() {
527 let mut policy = AdaptiveRetryPolicy::new();
528
529 policy.record_failure("peer1", FailureType::Timeout);
530 policy.record_failure("peer1", FailureType::Timeout);
531
532 assert_eq!(policy.consecutive_failures("peer1"), 2);
533
534 policy.record_success("peer1");
535 assert_eq!(policy.consecutive_failures("peer1"), 0);
536 }
537
538 #[test]
539 fn test_recommended_config_adapts() {
540 let mut policy = AdaptiveRetryPolicy::new();
541
542 for _ in 0..10 {
544 policy.record_success("peer1");
545 }
546 policy.record_failure("peer1", FailureType::Timeout);
547
548 let config = policy.recommended_config("peer1");
549 assert!(config.max_attempts >= 5);
551 }
552
553 #[test]
554 fn test_target_having_issues() {
555 let mut policy = AdaptiveRetryPolicy::new();
556
557 for _ in 0..5 {
559 policy.record_failure("peer1", FailureType::Timeout);
560 }
561
562 assert!(policy.is_target_having_issues("peer1"));
563 }
564
565 #[test]
566 fn test_reset_target() {
567 let mut policy = AdaptiveRetryPolicy::new();
568
569 policy.record_failure("peer1", FailureType::Timeout);
570 assert_eq!(policy.consecutive_failures("peer1"), 1);
571
572 policy.reset_target("peer1");
573 assert_eq!(policy.consecutive_failures("peer1"), 0);
574 }
575
576 #[test]
577 fn test_failure_burst_detection() {
578 let mut policy = AdaptiveRetryPolicy::new();
579
580 for _ in 0..6 {
582 policy.record_failure("peer1", FailureType::Timeout);
583 }
584
585 assert!(policy.detect_failure_burst("peer1"));
586
587 assert!(!policy.detect_failure_burst("peer2"));
589 }
590
591 #[test]
592 fn test_failure_patterns() {
593 let mut policy = AdaptiveRetryPolicy::new();
594
595 policy.record_failure("peer1", FailureType::Timeout);
597 policy.record_failure("peer1", FailureType::Timeout);
598 policy.record_failure("peer1", FailureType::ConnectionFailed);
599 policy.record_success("peer1");
600
601 let patterns = policy.failure_patterns("peer1");
602 assert!(patterns.is_some());
603
604 let patterns = patterns.unwrap();
605 assert_eq!(patterns.total_failures, 3);
606 assert_eq!(patterns.dominant_type, Some(FailureType::Timeout));
607 assert_eq!(
608 patterns.failure_type_percentage(FailureType::Timeout),
609 2.0 / 3.0
610 );
611 }
612
613 #[test]
614 fn test_systemic_issue_detection() {
615 let mut policy = AdaptiveRetryPolicy::new();
616
617 for _ in 0..7 {
619 policy.record_failure("peer1", FailureType::ServerError);
620 }
621
622 let patterns = policy.failure_patterns("peer1").unwrap();
623 assert!(patterns.is_systemic_issue());
624 }
625
626 #[test]
627 fn test_predict_recovery_time() {
628 let mut policy = AdaptiveRetryPolicy::new();
629
630 policy.record_success("peer1");
631 std::thread::sleep(Duration::from_millis(10));
632 policy.record_failure("peer1", FailureType::Timeout);
633
634 let recovery = policy.predict_recovery_time("peer1");
635 assert!(recovery.is_some());
636 }
637
638 #[test]
639 fn test_failure_interval() {
640 let mut policy = AdaptiveRetryPolicy::new();
641
642 policy.record_failure("peer1", FailureType::Timeout);
643 std::thread::sleep(Duration::from_millis(10));
644 policy.record_failure("peer1", FailureType::Timeout);
645
646 let interval = policy.failure_interval("peer1");
647 assert!(interval.is_some());
648 }
649}