chie_core/
adaptive_retry.rs

1//! Adaptive retry policies based on failure patterns.
2//!
3//! This module provides intelligent retry logic that adapts based on observed
4//! failure patterns, success rates, and error types. Instead of using static
5//! retry configurations, it dynamically adjusts retry behavior to optimize
6//! for both success rate and resource utilization.
7//!
8//! # Example
9//!
10//! ```rust
11//! use chie_core::adaptive_retry::{AdaptiveRetryPolicy, FailureType};
12//!
13//! let mut policy = AdaptiveRetryPolicy::new();
14//!
15//! // Record failures and successes
16//! policy.record_failure("peer1", FailureType::Timeout);
17//! policy.record_failure("peer1", FailureType::Timeout);
18//! policy.record_success("peer1");
19//!
20//! // Get recommended retry config for this peer
21//! let should_retry = policy.should_retry("peer1", 1);
22//! let delay = policy.retry_delay("peer1", 1);
23//! ```
24
25use crate::utils::RetryConfig;
26use std::{
27    collections::HashMap,
28    sync::{Arc, Mutex},
29    time::{Duration, Instant},
30};
31
32/// Types of failures that can occur.
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
34pub enum FailureType {
35    /// Network timeout.
36    Timeout,
37    /// Connection refused or failed.
38    ConnectionFailed,
39    /// Rate limit exceeded.
40    RateLimited,
41    /// Invalid response or data corruption.
42    InvalidData,
43    /// Temporary server error.
44    ServerError,
45    /// Unknown or unclassified error.
46    Unknown,
47}
48
49impl FailureType {
50    /// Check if this failure type is retryable.
51    #[must_use]
52    #[inline]
53    pub const fn is_retryable(&self) -> bool {
54        matches!(
55            self,
56            Self::Timeout | Self::ConnectionFailed | Self::ServerError
57        )
58    }
59
60    /// Get the base retry multiplier for this failure type.
61    #[must_use]
62    #[inline]
63    pub const fn retry_multiplier(&self) -> f64 {
64        match self {
65            Self::Timeout => 1.5,          // Increase delay for timeouts
66            Self::ConnectionFailed => 2.0, // Much longer delay for connection issues
67            Self::RateLimited => 3.0,      // Very long delay for rate limits
68            Self::ServerError => 1.2,      // Slight increase for server errors
69            Self::InvalidData => 0.5,      // Fast retry for data issues
70            Self::Unknown => 1.0,          // Default delay
71        }
72    }
73}
74
75/// Failure record for tracking patterns.
76#[derive(Debug, Clone)]
77struct FailureRecord {
78    failure_type: FailureType,
79    timestamp: Instant,
80}
81
82/// Statistics for a specific target (peer, endpoint, etc.).
83#[derive(Debug, Clone, Default)]
84struct TargetStats {
85    /// Total attempts made.
86    total_attempts: u64,
87    /// Successful attempts.
88    successful_attempts: u64,
89    /// Recent failures (limited window).
90    recent_failures: Vec<FailureRecord>,
91    /// Last success timestamp.
92    last_success: Option<Instant>,
93    /// Consecutive failures.
94    consecutive_failures: u32,
95}
96
97impl TargetStats {
98    /// Calculate success rate (0.0 to 1.0).
99    #[must_use]
100    #[inline]
101    fn success_rate(&self) -> f64 {
102        if self.total_attempts == 0 {
103            return 0.5; // Assume 50% for unknown targets
104        }
105        self.successful_attempts as f64 / self.total_attempts as f64
106    }
107
108    /// Get the most common recent failure type.
109    #[must_use]
110    #[inline]
111    fn dominant_failure_type(&self) -> Option<FailureType> {
112        let mut counts: HashMap<FailureType, usize> = HashMap::new();
113        for record in &self.recent_failures {
114            *counts.entry(record.failure_type).or_insert(0) += 1;
115        }
116
117        counts
118            .into_iter()
119            .max_by_key(|(_, count)| *count)
120            .map(|(failure_type, _)| failure_type)
121    }
122
123    /// Check if the target is currently experiencing issues.
124    #[must_use]
125    #[inline]
126    fn is_having_issues(&self) -> bool {
127        self.consecutive_failures > 3 || self.success_rate() < 0.3
128    }
129}
130
131/// Adaptive retry policy that learns from failure patterns.
132pub struct AdaptiveRetryPolicy {
133    /// Per-target statistics.
134    target_stats: Arc<Mutex<HashMap<String, TargetStats>>>,
135    /// Base retry configuration.
136    base_config: RetryConfig,
137    /// Failure history retention window.
138    history_window: Duration,
139}
140
141impl AdaptiveRetryPolicy {
142    /// Create a new adaptive retry policy.
143    #[must_use]
144    #[inline]
145    pub fn new() -> Self {
146        Self {
147            target_stats: Arc::new(Mutex::new(HashMap::new())),
148            base_config: RetryConfig::default(),
149            history_window: Duration::from_secs(300), // 5 minutes
150        }
151    }
152
153    /// Create a new adaptive retry policy with custom base configuration.
154    #[must_use]
155    #[inline]
156    pub fn with_config(base_config: RetryConfig) -> Self {
157        Self {
158            target_stats: Arc::new(Mutex::new(HashMap::new())),
159            base_config,
160            history_window: Duration::from_secs(300),
161        }
162    }
163
164    /// Record a successful operation.
165    pub fn record_success(&mut self, target: &str) {
166        let mut stats = self.target_stats.lock().unwrap();
167        let entry = stats.entry(target.to_string()).or_default();
168
169        entry.total_attempts += 1;
170        entry.successful_attempts += 1;
171        entry.consecutive_failures = 0;
172        entry.last_success = Some(Instant::now());
173    }
174
175    /// Record a failed operation.
176    pub fn record_failure(&mut self, target: &str, failure_type: FailureType) {
177        let mut stats = self.target_stats.lock().unwrap();
178        let entry = stats.entry(target.to_string()).or_default();
179
180        entry.total_attempts += 1;
181        entry.consecutive_failures += 1;
182        entry.recent_failures.push(FailureRecord {
183            failure_type,
184            timestamp: Instant::now(),
185        });
186
187        // Cleanup old failures
188        self.cleanup_old_failures(entry);
189    }
190
191    /// Clean up old failure records outside the history window.
192    fn cleanup_old_failures(&self, stats: &mut TargetStats) {
193        let cutoff = Instant::now() - self.history_window;
194        stats.recent_failures.retain(|f| f.timestamp > cutoff);
195    }
196
197    /// Determine if a retry should be attempted.
198    #[must_use]
199    #[inline]
200    pub fn should_retry(&self, target: &str, attempt: u32) -> bool {
201        let stats = self.target_stats.lock().unwrap();
202
203        // Check attempt limit
204        if attempt >= self.base_config.max_attempts {
205            return false;
206        }
207
208        // Get target stats if available
209        if let Some(target_stats) = stats.get(target) {
210            // Don't retry if too many consecutive failures (circuit breaker)
211            if target_stats.consecutive_failures > 5 {
212                return false;
213            }
214
215            // Don't retry non-retryable failure types
216            if let Some(failure_type) = target_stats.dominant_failure_type() {
217                if !failure_type.is_retryable() {
218                    return false;
219                }
220            }
221        }
222
223        true
224    }
225
226    /// Calculate the recommended retry delay for a target.
227    #[must_use]
228    #[inline]
229    pub fn retry_delay(&self, target: &str, attempt: u32) -> Duration {
230        let base_delay = self.base_config.delay_for_attempt(attempt);
231        let stats = self.target_stats.lock().unwrap();
232
233        if let Some(target_stats) = stats.get(target) {
234            // Adjust delay based on failure patterns
235            let mut multiplier = 1.0;
236
237            // Increase delay for targets with low success rate
238            let success_rate = target_stats.success_rate();
239            if success_rate < 0.5 {
240                multiplier *= 1.5;
241            } else if success_rate < 0.7 {
242                multiplier *= 1.2;
243            }
244
245            // Apply failure type specific multiplier
246            if let Some(failure_type) = target_stats.dominant_failure_type() {
247                multiplier *= failure_type.retry_multiplier();
248            }
249
250            // Increase delay for consecutive failures (exponential)
251            if target_stats.consecutive_failures > 2 {
252                multiplier *= 1.5f64.powi(target_stats.consecutive_failures as i32 - 2);
253            }
254
255            Duration::from_millis((base_delay.as_millis() as f64 * multiplier) as u64)
256                .min(Duration::from_millis(self.base_config.max_delay_ms))
257        } else {
258            base_delay
259        }
260    }
261
262    /// Get the success rate for a target.
263    #[must_use]
264    #[inline]
265    pub fn success_rate(&self, target: &str) -> f64 {
266        let stats = self.target_stats.lock().unwrap();
267        stats.get(target).map(|s| s.success_rate()).unwrap_or(0.5)
268    }
269
270    /// Get the number of consecutive failures for a target.
271    #[must_use]
272    #[inline]
273    pub fn consecutive_failures(&self, target: &str) -> u32 {
274        let stats = self.target_stats.lock().unwrap();
275        stats
276            .get(target)
277            .map(|s| s.consecutive_failures)
278            .unwrap_or(0)
279    }
280
281    /// Check if a target is currently having issues.
282    #[must_use]
283    #[inline]
284    pub fn is_target_having_issues(&self, target: &str) -> bool {
285        let stats = self.target_stats.lock().unwrap();
286        stats
287            .get(target)
288            .map(|s| s.is_having_issues())
289            .unwrap_or(false)
290    }
291
292    /// Get recommended retry config for a specific target.
293    #[must_use]
294    #[inline]
295    pub fn recommended_config(&self, target: &str) -> RetryConfig {
296        let stats = self.target_stats.lock().unwrap();
297
298        if let Some(target_stats) = stats.get(target) {
299            let success_rate = target_stats.success_rate();
300
301            // Adapt retry strategy based on success rate
302            if success_rate > 0.8 {
303                // High success rate: use aggressive retries
304                RetryConfig::aggressive()
305            } else if success_rate > 0.5 {
306                // Moderate success rate: use default
307                self.base_config.clone()
308            } else {
309                // Low success rate: use conservative retries
310                RetryConfig::conservative()
311            }
312        } else {
313            // Unknown target: use default
314            self.base_config.clone()
315        }
316    }
317
318    /// Reset statistics for a target.
319    pub fn reset_target(&mut self, target: &str) {
320        let mut stats = self.target_stats.lock().unwrap();
321        stats.remove(target);
322    }
323
324    /// Clear all statistics.
325    pub fn reset_all(&mut self) {
326        let mut stats = self.target_stats.lock().unwrap();
327        stats.clear();
328    }
329
330    /// Get total number of tracked targets.
331    #[must_use]
332    #[inline]
333    pub fn tracked_targets_count(&self) -> usize {
334        let stats = self.target_stats.lock().unwrap();
335        stats.len()
336    }
337
338    /// Detect if there's a burst of failures (many failures in short time).
339    #[must_use]
340    #[inline]
341    pub fn detect_failure_burst(&self, target: &str) -> bool {
342        let stats = self.target_stats.lock().unwrap();
343
344        if let Some(target_stats) = stats.get(target) {
345            // Check for 5+ failures in the last minute
346            let one_minute_ago = Instant::now() - Duration::from_secs(60);
347            let recent_count = target_stats
348                .recent_failures
349                .iter()
350                .filter(|f| f.timestamp > one_minute_ago)
351                .count();
352
353            return recent_count >= 5;
354        }
355
356        false
357    }
358
359    /// Get the average time between failures for pattern detection.
360    #[must_use]
361    #[inline]
362    pub fn failure_interval(&self, target: &str) -> Option<Duration> {
363        let stats = self.target_stats.lock().unwrap();
364
365        if let Some(target_stats) = stats.get(target) {
366            if target_stats.recent_failures.len() < 2 {
367                return None;
368            }
369
370            let failures = &target_stats.recent_failures;
371            let mut intervals = Vec::new();
372
373            for i in 1..failures.len() {
374                let interval = failures[i]
375                    .timestamp
376                    .saturating_duration_since(failures[i - 1].timestamp);
377                intervals.push(interval);
378            }
379
380            if intervals.is_empty() {
381                return None;
382            }
383
384            // Calculate average interval
385            let total: Duration = intervals.iter().sum();
386            Some(total / intervals.len() as u32)
387        } else {
388            None
389        }
390    }
391
392    /// Predict when the target might recover based on past recovery patterns.
393    #[must_use]
394    #[inline]
395    pub fn predict_recovery_time(&self, target: &str) -> Option<Duration> {
396        let stats = self.target_stats.lock().unwrap();
397
398        if let Some(target_stats) = stats.get(target) {
399            if let Some(last_success) = target_stats.last_success {
400                let time_since_success = Instant::now().saturating_duration_since(last_success);
401
402                // If we have consecutive failures, estimate recovery time
403                if target_stats.consecutive_failures > 0 {
404                    // Simple heuristic: double the time since last success
405                    // In production, this could use ML or historical patterns
406                    let estimated_recovery = time_since_success * 2;
407                    return Some(estimated_recovery);
408                }
409            }
410
411            // If no success history, estimate based on failure rate
412            if !target_stats.recent_failures.is_empty() {
413                return Some(Duration::from_secs(60)); // Default 1 minute
414            }
415        }
416
417        None
418    }
419
420    /// Get failure pattern statistics for a target.
421    #[must_use]
422    #[inline]
423    pub fn failure_patterns(&self, target: &str) -> Option<FailurePatterns> {
424        let stats = self.target_stats.lock().unwrap();
425
426        stats.get(target).map(|target_stats| {
427            let mut type_counts: HashMap<FailureType, usize> = HashMap::new();
428            for record in &target_stats.recent_failures {
429                *type_counts.entry(record.failure_type).or_insert(0) += 1;
430            }
431
432            // Calculate is_burst inline to avoid re-acquiring lock
433            let one_minute_ago = Instant::now() - Duration::from_secs(60);
434            let recent_count = target_stats
435                .recent_failures
436                .iter()
437                .filter(|f| f.timestamp > one_minute_ago)
438                .count();
439            let is_burst = recent_count >= 5;
440
441            FailurePatterns {
442                total_failures: target_stats.recent_failures.len(),
443                failure_types: type_counts,
444                consecutive_failures: target_stats.consecutive_failures,
445                success_rate: target_stats.success_rate(),
446                is_burst,
447                dominant_type: target_stats.dominant_failure_type(),
448            }
449        })
450    }
451}
452
453/// Failure pattern statistics for a specific target.
454#[derive(Debug, Clone)]
455pub struct FailurePatterns {
456    /// Total failures in the history window.
457    pub total_failures: usize,
458    /// Count of each failure type.
459    pub failure_types: HashMap<FailureType, usize>,
460    /// Current consecutive failures.
461    pub consecutive_failures: u32,
462    /// Success rate (0.0 to 1.0).
463    pub success_rate: f64,
464    /// Whether a failure burst is detected.
465    pub is_burst: bool,
466    /// Most common failure type.
467    pub dominant_type: Option<FailureType>,
468}
469
470impl FailurePatterns {
471    /// Check if the pattern indicates a systemic issue.
472    #[must_use]
473    #[inline]
474    pub fn is_systemic_issue(&self) -> bool {
475        self.consecutive_failures > 5 || self.success_rate < 0.2 || self.is_burst
476    }
477
478    /// Get the percentage of a specific failure type.
479    #[must_use]
480    #[inline]
481    pub fn failure_type_percentage(&self, failure_type: FailureType) -> f64 {
482        if self.total_failures == 0 {
483            return 0.0;
484        }
485        let count = self.failure_types.get(&failure_type).copied().unwrap_or(0);
486        count as f64 / self.total_failures as f64
487    }
488}
489
490impl Default for AdaptiveRetryPolicy {
491    fn default() -> Self {
492        Self::new()
493    }
494}
495
496#[cfg(test)]
497mod tests {
498    use super::*;
499
500    #[test]
501    fn test_failure_type_retryable() {
502        assert!(FailureType::Timeout.is_retryable());
503        assert!(FailureType::ConnectionFailed.is_retryable());
504        assert!(!FailureType::RateLimited.is_retryable());
505    }
506
507    #[test]
508    fn test_adaptive_policy_success_rate() {
509        let mut policy = AdaptiveRetryPolicy::new();
510
511        policy.record_success("peer1");
512        policy.record_success("peer1");
513        policy.record_failure("peer1", FailureType::Timeout);
514
515        let rate = policy.success_rate("peer1");
516        assert!((rate - 0.666).abs() < 0.01);
517    }
518
519    #[test]
520    fn test_should_retry_after_max_attempts() {
521        let policy = AdaptiveRetryPolicy::new();
522        assert!(!policy.should_retry("peer1", 10));
523    }
524
525    #[test]
526    fn test_consecutive_failures_tracking() {
527        let mut policy = AdaptiveRetryPolicy::new();
528
529        policy.record_failure("peer1", FailureType::Timeout);
530        policy.record_failure("peer1", FailureType::Timeout);
531
532        assert_eq!(policy.consecutive_failures("peer1"), 2);
533
534        policy.record_success("peer1");
535        assert_eq!(policy.consecutive_failures("peer1"), 0);
536    }
537
538    #[test]
539    fn test_recommended_config_adapts() {
540        let mut policy = AdaptiveRetryPolicy::new();
541
542        // Record high success rate
543        for _ in 0..10 {
544            policy.record_success("peer1");
545        }
546        policy.record_failure("peer1", FailureType::Timeout);
547
548        let config = policy.recommended_config("peer1");
549        // High success rate should give aggressive retries
550        assert!(config.max_attempts >= 5);
551    }
552
553    #[test]
554    fn test_target_having_issues() {
555        let mut policy = AdaptiveRetryPolicy::new();
556
557        // Record many failures
558        for _ in 0..5 {
559            policy.record_failure("peer1", FailureType::Timeout);
560        }
561
562        assert!(policy.is_target_having_issues("peer1"));
563    }
564
565    #[test]
566    fn test_reset_target() {
567        let mut policy = AdaptiveRetryPolicy::new();
568
569        policy.record_failure("peer1", FailureType::Timeout);
570        assert_eq!(policy.consecutive_failures("peer1"), 1);
571
572        policy.reset_target("peer1");
573        assert_eq!(policy.consecutive_failures("peer1"), 0);
574    }
575
576    #[test]
577    fn test_failure_burst_detection() {
578        let mut policy = AdaptiveRetryPolicy::new();
579
580        // Record 6 failures in quick succession
581        for _ in 0..6 {
582            policy.record_failure("peer1", FailureType::Timeout);
583        }
584
585        assert!(policy.detect_failure_burst("peer1"));
586
587        // Different peer should not show burst
588        assert!(!policy.detect_failure_burst("peer2"));
589    }
590
591    #[test]
592    fn test_failure_patterns() {
593        let mut policy = AdaptiveRetryPolicy::new();
594
595        // Record mixed failures
596        policy.record_failure("peer1", FailureType::Timeout);
597        policy.record_failure("peer1", FailureType::Timeout);
598        policy.record_failure("peer1", FailureType::ConnectionFailed);
599        policy.record_success("peer1");
600
601        let patterns = policy.failure_patterns("peer1");
602        assert!(patterns.is_some());
603
604        let patterns = patterns.unwrap();
605        assert_eq!(patterns.total_failures, 3);
606        assert_eq!(patterns.dominant_type, Some(FailureType::Timeout));
607        assert_eq!(
608            patterns.failure_type_percentage(FailureType::Timeout),
609            2.0 / 3.0
610        );
611    }
612
613    #[test]
614    fn test_systemic_issue_detection() {
615        let mut policy = AdaptiveRetryPolicy::new();
616
617        // Record many consecutive failures
618        for _ in 0..7 {
619            policy.record_failure("peer1", FailureType::ServerError);
620        }
621
622        let patterns = policy.failure_patterns("peer1").unwrap();
623        assert!(patterns.is_systemic_issue());
624    }
625
626    #[test]
627    fn test_predict_recovery_time() {
628        let mut policy = AdaptiveRetryPolicy::new();
629
630        policy.record_success("peer1");
631        std::thread::sleep(Duration::from_millis(10));
632        policy.record_failure("peer1", FailureType::Timeout);
633
634        let recovery = policy.predict_recovery_time("peer1");
635        assert!(recovery.is_some());
636    }
637
638    #[test]
639    fn test_failure_interval() {
640        let mut policy = AdaptiveRetryPolicy::new();
641
642        policy.record_failure("peer1", FailureType::Timeout);
643        std::thread::sleep(Duration::from_millis(10));
644        policy.record_failure("peer1", FailureType::Timeout);
645
646        let interval = policy.failure_interval("peer1");
647        assert!(interval.is_some());
648    }
649}