codeprism_core/
resilience.rs

1//! Resilience and error recovery mechanisms
2//!
3//! This module provides comprehensive error handling, retry logic, circuit breakers,
4//! and graceful degradation patterns for production reliability.
5
6use crate::error::{Error, ErrorSeverity, RecoveryStrategy, Result};
7use std::sync::{Arc, Mutex};
8use std::time::{Duration, Instant};
9use tokio::time::sleep;
10use tracing::{debug, error, info, warn};
11
12/// Configuration for retry logic
13#[derive(Debug, Clone)]
14pub struct RetryConfig {
15    /// Maximum number of retry attempts
16    pub max_attempts: u32,
17    /// Base delay between retry attempts
18    pub base_delay: Duration,
19    /// Maximum delay between retry attempts
20    pub max_delay: Duration,
21    /// Multiplier for exponential backoff
22    pub backoff_multiplier: f64,
23    /// Jitter to add randomness to delays
24    pub jitter: bool,
25}
26
27impl Default for RetryConfig {
28    fn default() -> Self {
29        Self {
30            max_attempts: 3,
31            base_delay: Duration::from_millis(100),
32            max_delay: Duration::from_secs(10),
33            backoff_multiplier: 2.0,
34            jitter: true,
35        }
36    }
37}
38
39impl RetryConfig {
40    /// Create a new retry configuration
41    pub fn new(max_attempts: u32, base_delay: Duration) -> Self {
42        Self {
43            max_attempts,
44            base_delay,
45            ..Default::default()
46        }
47    }
48
49    /// Set maximum delay
50    pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
51        self.max_delay = max_delay;
52        self
53    }
54
55    /// Set backoff multiplier
56    pub fn with_backoff_multiplier(mut self, multiplier: f64) -> Self {
57        self.backoff_multiplier = multiplier;
58        self
59    }
60
61    /// Enable or disable jitter
62    pub fn with_jitter(mut self, jitter: bool) -> Self {
63        self.jitter = jitter;
64        self
65    }
66
67    /// Calculate delay for a given attempt
68    pub fn calculate_delay(&self, attempt: u32) -> Duration {
69        let mut delay = Duration::from_millis(
70            (self.base_delay.as_millis() as f64 * self.backoff_multiplier.powi(attempt as i32))
71                as u64,
72        );
73
74        if delay > self.max_delay {
75            delay = self.max_delay;
76        }
77
78        if self.jitter {
79            // Add up to 25% jitter
80            let jitter_ms = (delay.as_millis() as f64 * 0.25 * rand::random::<f64>()) as u64;
81            delay += Duration::from_millis(jitter_ms);
82        }
83
84        delay
85    }
86}
87
88/// Circuit breaker states
89#[derive(Debug, Clone, PartialEq)]
90pub enum CircuitState {
91    /// Circuit is closed, requests flow normally
92    Closed,
93    /// Circuit is open, requests are rejected
94    Open,
95    /// Circuit is half-open, testing if service has recovered
96    HalfOpen,
97}
98
99/// Circuit breaker configuration
100#[derive(Debug, Clone)]
101pub struct CircuitBreakerConfig {
102    /// Failure threshold to open the circuit
103    pub failure_threshold: u32,
104    /// Success threshold to close the circuit from half-open
105    pub success_threshold: u32,
106    /// Time to wait before trying half-open state
107    pub recovery_timeout: Duration,
108    /// Time window for counting failures
109    pub time_window: Duration,
110}
111
112impl Default for CircuitBreakerConfig {
113    fn default() -> Self {
114        Self {
115            failure_threshold: 5,
116            success_threshold: 3,
117            recovery_timeout: Duration::from_secs(60),
118            time_window: Duration::from_secs(60),
119        }
120    }
121}
122
123/// Circuit breaker for handling cascading failures
124#[derive(Debug)]
125pub struct CircuitBreaker {
126    config: CircuitBreakerConfig,
127    state: Arc<Mutex<CircuitBreakerState>>,
128}
129
130#[derive(Debug)]
131struct CircuitBreakerState {
132    circuit_state: CircuitState,
133    failure_count: u32,
134    success_count: u32,
135    last_failure_time: Option<Instant>,
136    last_transition_time: Instant,
137}
138
139impl CircuitBreaker {
140    /// Create a new circuit breaker
141    pub fn new(config: CircuitBreakerConfig) -> Self {
142        Self {
143            config,
144            state: Arc::new(Mutex::new(CircuitBreakerState {
145                circuit_state: CircuitState::Closed,
146                failure_count: 0,
147                success_count: 0,
148                last_failure_time: None,
149                last_transition_time: Instant::now(),
150            })),
151        }
152    }
153
154    /// Check if the circuit allows the request
155    pub fn can_execute(&self) -> bool {
156        let mut state = self.state.lock().unwrap();
157        let now = Instant::now();
158
159        match state.circuit_state {
160            CircuitState::Closed => true,
161            CircuitState::Open => {
162                if now.duration_since(state.last_transition_time) >= self.config.recovery_timeout {
163                    // Transition to half-open
164                    state.circuit_state = CircuitState::HalfOpen;
165                    state.success_count = 0;
166                    state.last_transition_time = now;
167                    debug!("Circuit breaker transitioning to half-open state");
168                    true
169                } else {
170                    false
171                }
172            }
173            CircuitState::HalfOpen => true,
174        }
175    }
176
177    /// Record a successful execution
178    pub fn record_success(&self) {
179        let mut state = self.state.lock().unwrap();
180        let now = Instant::now();
181
182        match state.circuit_state {
183            CircuitState::Closed => {
184                // Reset failure count in closed state
185                state.failure_count = 0;
186            }
187            CircuitState::HalfOpen => {
188                state.success_count += 1;
189                if state.success_count >= self.config.success_threshold {
190                    // Transition to closed
191                    state.circuit_state = CircuitState::Closed;
192                    state.failure_count = 0;
193                    state.success_count = 0;
194                    state.last_transition_time = now;
195                    info!("Circuit breaker closed - service recovered");
196                }
197            }
198            CircuitState::Open => {
199                // Should not happen, but reset if it does
200                warn!("Recording success in open circuit state");
201            }
202        }
203    }
204
205    /// Record a failed execution
206    pub fn record_failure(&self) {
207        let mut state = self.state.lock().unwrap();
208        let now = Instant::now();
209
210        state.last_failure_time = Some(now);
211
212        match state.circuit_state {
213            CircuitState::Closed => {
214                state.failure_count += 1;
215                if state.failure_count >= self.config.failure_threshold {
216                    // Transition to open
217                    state.circuit_state = CircuitState::Open;
218                    state.last_transition_time = now;
219                    error!(
220                        "Circuit breaker opened due to {} failures",
221                        state.failure_count
222                    );
223                }
224            }
225            CircuitState::HalfOpen => {
226                // Go back to open
227                state.circuit_state = CircuitState::Open;
228                state.failure_count += 1;
229                state.success_count = 0;
230                state.last_transition_time = now;
231                warn!("Circuit breaker returned to open state");
232            }
233            CircuitState::Open => {
234                state.failure_count += 1;
235            }
236        }
237    }
238
239    /// Get current circuit state
240    pub fn state(&self) -> CircuitState {
241        self.state.lock().unwrap().circuit_state.clone()
242    }
243
244    /// Get failure count
245    pub fn failure_count(&self) -> u32 {
246        self.state.lock().unwrap().failure_count
247    }
248}
249
250impl Default for CircuitBreaker {
251    fn default() -> Self {
252        Self::new(CircuitBreakerConfig::default())
253    }
254}
255
256/// Retry executor with exponential backoff
257pub struct RetryExecutor {
258    config: RetryConfig,
259}
260
261impl RetryExecutor {
262    /// Create a new retry executor
263    pub fn new(config: RetryConfig) -> Self {
264        Self { config }
265    }
266
267    /// Execute a function with retry logic
268    pub async fn execute<F, Fut, T>(&self, operation: F) -> Result<T>
269    where
270        F: Fn() -> Fut,
271        Fut: std::future::Future<Output = Result<T>>,
272    {
273        let mut last_error = None;
274
275        for attempt in 0..self.config.max_attempts {
276            debug!(
277                "Executing operation, attempt {}/{}",
278                attempt + 1,
279                self.config.max_attempts
280            );
281
282            match operation().await {
283                Ok(result) => {
284                    if attempt > 0 {
285                        info!("Operation succeeded after {} retries", attempt);
286                    }
287                    return Ok(result);
288                }
289                Err(error) => {
290                    last_error = Some(error.clone());
291
292                    if !error.should_retry() {
293                        debug!("Error is not retryable: {}", error);
294                        return Err(error);
295                    }
296
297                    if attempt + 1 >= self.config.max_attempts {
298                        error!(
299                            "Operation failed after {} attempts: {}",
300                            self.config.max_attempts, error
301                        );
302                        break;
303                    }
304
305                    let delay = self.config.calculate_delay(attempt);
306                    warn!(
307                        "Operation failed (attempt {}), retrying in {:?}: {}",
308                        attempt + 1,
309                        delay,
310                        error
311                    );
312
313                    sleep(delay).await;
314                }
315            }
316        }
317
318        Err(last_error.unwrap_or_else(|| Error::other("Unknown retry error")))
319    }
320}
321
322impl Default for RetryExecutor {
323    fn default() -> Self {
324        Self::new(RetryConfig::default())
325    }
326}
327
328/// Comprehensive resilience manager
329#[derive(Default)]
330pub struct ResilienceManager {
331    retry_executor: RetryExecutor,
332    circuit_breaker: CircuitBreaker,
333}
334
335impl ResilienceManager {
336    /// Create a new resilience manager
337    pub fn new(retry_config: RetryConfig, circuit_config: CircuitBreakerConfig) -> Self {
338        Self {
339            retry_executor: RetryExecutor::new(retry_config),
340            circuit_breaker: CircuitBreaker::new(circuit_config),
341        }
342    }
343
344    /// Execute an operation with full resilience (circuit breaker + retry)
345    pub async fn execute<F, Fut, T>(&self, operation: F) -> Result<T>
346    where
347        F: Fn() -> Fut + Clone,
348        Fut: std::future::Future<Output = Result<T>>,
349    {
350        if !self.circuit_breaker.can_execute() {
351            return Err(Error::other("Circuit breaker is open"));
352        }
353
354        let result = self
355            .retry_executor
356            .execute(|| {
357                let op = operation.clone();
358                async move { op().await }
359            })
360            .await;
361
362        match &result {
363            Ok(_) => self.circuit_breaker.record_success(),
364            Err(error) => {
365                if matches!(
366                    error.severity(),
367                    ErrorSeverity::Error | ErrorSeverity::Critical
368                ) {
369                    self.circuit_breaker.record_failure();
370                }
371            }
372        }
373
374        result
375    }
376
377    /// Execute with graceful degradation - returns partial results on failure
378    pub async fn execute_with_fallback<F, Fut, T, FB, FutB>(&self, operation: F, fallback: FB) -> T
379    where
380        F: Fn() -> Fut + Clone,
381        Fut: std::future::Future<Output = Result<T>>,
382        FB: Fn() -> FutB,
383        FutB: std::future::Future<Output = T>,
384    {
385        match self.execute(operation).await {
386            Ok(result) => result,
387            Err(error) => {
388                warn!("Operation failed, using fallback: {}", error);
389                fallback().await
390            }
391        }
392    }
393
394    /// Get circuit breaker state
395    pub fn circuit_state(&self) -> CircuitState {
396        self.circuit_breaker.state()
397    }
398
399    /// Check if the circuit is healthy
400    pub fn is_healthy(&self) -> bool {
401        matches!(self.circuit_breaker.state(), CircuitState::Closed)
402    }
403}
404
405/// Graceful degradation handler
406pub struct DegradationHandler;
407
408impl DegradationHandler {
409    /// Handle parser failure with graceful degradation
410    pub async fn handle_parse_failure<T>(
411        file_path: &std::path::Path,
412        error: &Error,
413        fallback_fn: impl std::future::Future<Output = Option<T>>,
414    ) -> Result<Option<T>> {
415        match error.recovery_strategy() {
416            RecoveryStrategy::Degrade => {
417                warn!(
418                    "Gracefully degrading parse operation for {}: {}",
419                    file_path.display(),
420                    error
421                );
422                Ok(fallback_fn.await)
423            }
424            RecoveryStrategy::Fallback => {
425                info!(
426                    "Using fallback for parse operation for {}: {}",
427                    file_path.display(),
428                    error
429                );
430                Ok(fallback_fn.await)
431            }
432            _ => Err(error.clone()),
433        }
434    }
435
436    /// Handle indexing failure with partial results
437    pub fn handle_indexing_failure(
438        total_files: usize,
439        processed_files: usize,
440        error: &Error,
441    ) -> Result<()> {
442        match error.recovery_strategy() {
443            RecoveryStrategy::Degrade => {
444                let completion_rate = (processed_files as f64 / total_files as f64) * 100.0;
445                if completion_rate >= 80.0 {
446                    warn!(
447                        "Indexing completed with degraded results: {:.1}% processed",
448                        completion_rate
449                    );
450                    Ok(())
451                } else {
452                    Err(error.clone())
453                }
454            }
455            _ => Err(error.clone()),
456        }
457    }
458}
459
460#[cfg(test)]
461mod tests {
462    use super::*;
463    use tokio::time::{sleep, Duration};
464
465    #[test]
466    fn test_retry_config() {
467        let config = RetryConfig::new(5, Duration::from_millis(100))
468            .with_max_delay(Duration::from_secs(5))
469            .with_backoff_multiplier(2.0)
470            .with_jitter(true);
471
472        assert_eq!(config.max_attempts, 5);
473        assert_eq!(config.base_delay, Duration::from_millis(100));
474        assert_eq!(config.max_delay, Duration::from_secs(5));
475        assert_eq!(config.backoff_multiplier, 2.0);
476        assert!(config.jitter);
477    }
478
479    #[test]
480    fn test_retry_config_delay_calculation() {
481        let config = RetryConfig::new(3, Duration::from_millis(100))
482            .with_backoff_multiplier(2.0)
483            .with_jitter(false);
484
485        let delay1 = config.calculate_delay(0);
486        let delay2 = config.calculate_delay(1);
487        let delay3 = config.calculate_delay(2);
488
489        assert_eq!(delay1, Duration::from_millis(100));
490        assert_eq!(delay2, Duration::from_millis(200));
491        assert_eq!(delay3, Duration::from_millis(400));
492    }
493
494    #[test]
495    fn test_circuit_breaker_states() {
496        let config = CircuitBreakerConfig {
497            failure_threshold: 2,
498            success_threshold: 1,
499            recovery_timeout: Duration::from_millis(100),
500            time_window: Duration::from_secs(60),
501        };
502
503        let circuit = CircuitBreaker::new(config);
504
505        // Initially closed
506        assert_eq!(circuit.state(), CircuitState::Closed);
507        assert!(circuit.can_execute());
508
509        // Record failures to open circuit
510        circuit.record_failure();
511        assert_eq!(circuit.state(), CircuitState::Closed);
512        assert!(circuit.can_execute());
513
514        circuit.record_failure();
515        assert_eq!(circuit.state(), CircuitState::Open);
516        assert!(!circuit.can_execute());
517    }
518
519    #[tokio::test]
520    async fn test_circuit_breaker_recovery() {
521        let config = CircuitBreakerConfig {
522            failure_threshold: 1,
523            success_threshold: 1,
524            recovery_timeout: Duration::from_millis(50),
525            time_window: Duration::from_secs(60),
526        };
527
528        let circuit = CircuitBreaker::new(config);
529
530        // Open the circuit
531        circuit.record_failure();
532        assert_eq!(circuit.state(), CircuitState::Open);
533        assert!(!circuit.can_execute());
534
535        // Wait for recovery timeout
536        sleep(Duration::from_millis(60)).await;
537
538        // Should transition to half-open
539        assert!(circuit.can_execute());
540        assert_eq!(circuit.state(), CircuitState::HalfOpen);
541
542        // Success should close the circuit
543        circuit.record_success();
544        assert_eq!(circuit.state(), CircuitState::Closed);
545        assert!(circuit.can_execute());
546    }
547
548    #[tokio::test]
549    async fn test_retry_executor_success() {
550        let executor = RetryExecutor::new(RetryConfig::new(3, Duration::from_millis(10)));
551
552        let attempts = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
553        let attempts_clone = attempts.clone();
554
555        let result = executor
556            .execute(|| {
557                let current_attempt =
558                    attempts_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
559                async move {
560                    if current_attempt < 2 {
561                        Err(Error::storage("temporary failure"))
562                    } else {
563                        Ok("success")
564                    }
565                }
566            })
567            .await;
568
569        assert!(result.is_ok());
570        assert_eq!(result.unwrap(), "success");
571        assert_eq!(attempts.load(std::sync::atomic::Ordering::SeqCst), 2);
572    }
573
574    #[tokio::test]
575    async fn test_retry_executor_failure() {
576        let executor = RetryExecutor::new(RetryConfig::new(2, Duration::from_millis(10)));
577
578        let attempts = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
579        let attempts_clone = attempts.clone();
580
581        let result: Result<&str> = executor
582            .execute(|| {
583                attempts_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
584                async move { Err(Error::storage("persistent failure")) }
585            })
586            .await;
587
588        assert!(result.is_err());
589        assert_eq!(attempts.load(std::sync::atomic::Ordering::SeqCst), 2);
590    }
591
592    #[tokio::test]
593    async fn test_resilience_manager() {
594        let manager = ResilienceManager::new(
595            RetryConfig::new(2, Duration::from_millis(10)),
596            CircuitBreakerConfig {
597                failure_threshold: 3,
598                success_threshold: 1,
599                recovery_timeout: Duration::from_millis(50),
600                time_window: Duration::from_secs(60),
601            },
602        );
603
604        let attempts = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
605        let attempts_clone = attempts.clone();
606
607        let result = manager
608            .execute(|| {
609                let current_attempt =
610                    attempts_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
611                async move {
612                    if current_attempt < 2 {
613                        Err(Error::storage("temporary failure"))
614                    } else {
615                        Ok("success")
616                    }
617                }
618            })
619            .await;
620
621        assert!(result.is_ok());
622        assert_eq!(result.unwrap(), "success");
623        assert!(manager.is_healthy());
624    }
625
626    #[tokio::test]
627    async fn test_execute_with_fallback() {
628        let manager = ResilienceManager::default();
629
630        let result = manager
631            .execute_with_fallback(
632                || async { Err(Error::storage("operation failed")) },
633                || async { "fallback result" },
634            )
635            .await;
636
637        assert_eq!(result, "fallback result");
638    }
639}