1use crate::error::{Error, ErrorSeverity, RecoveryStrategy, Result};
7use std::sync::{Arc, Mutex};
8use std::time::{Duration, Instant};
9use tokio::time::sleep;
10use tracing::{debug, error, info, warn};
11
12#[derive(Debug, Clone)]
14pub struct RetryConfig {
15 pub max_attempts: u32,
17 pub base_delay: Duration,
19 pub max_delay: Duration,
21 pub backoff_multiplier: f64,
23 pub jitter: bool,
25}
26
27impl Default for RetryConfig {
28 fn default() -> Self {
29 Self {
30 max_attempts: 3,
31 base_delay: Duration::from_millis(100),
32 max_delay: Duration::from_secs(10),
33 backoff_multiplier: 2.0,
34 jitter: true,
35 }
36 }
37}
38
39impl RetryConfig {
40 pub fn new(max_attempts: u32, base_delay: Duration) -> Self {
42 Self {
43 max_attempts,
44 base_delay,
45 ..Default::default()
46 }
47 }
48
49 pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
51 self.max_delay = max_delay;
52 self
53 }
54
55 pub fn with_backoff_multiplier(mut self, multiplier: f64) -> Self {
57 self.backoff_multiplier = multiplier;
58 self
59 }
60
61 pub fn with_jitter(mut self, jitter: bool) -> Self {
63 self.jitter = jitter;
64 self
65 }
66
67 pub fn calculate_delay(&self, attempt: u32) -> Duration {
69 let mut delay = Duration::from_millis(
70 (self.base_delay.as_millis() as f64 * self.backoff_multiplier.powi(attempt as i32))
71 as u64,
72 );
73
74 if delay > self.max_delay {
75 delay = self.max_delay;
76 }
77
78 if self.jitter {
79 let jitter_ms = (delay.as_millis() as f64 * 0.25 * rand::random::<f64>()) as u64;
81 delay += Duration::from_millis(jitter_ms);
82 }
83
84 delay
85 }
86}
87
88#[derive(Debug, Clone, PartialEq)]
90pub enum CircuitState {
91 Closed,
93 Open,
95 HalfOpen,
97}
98
99#[derive(Debug, Clone)]
101pub struct CircuitBreakerConfig {
102 pub failure_threshold: u32,
104 pub success_threshold: u32,
106 pub recovery_timeout: Duration,
108 pub time_window: Duration,
110}
111
112impl Default for CircuitBreakerConfig {
113 fn default() -> Self {
114 Self {
115 failure_threshold: 5,
116 success_threshold: 3,
117 recovery_timeout: Duration::from_secs(60),
118 time_window: Duration::from_secs(60),
119 }
120 }
121}
122
123#[derive(Debug)]
125pub struct CircuitBreaker {
126 config: CircuitBreakerConfig,
127 state: Arc<Mutex<CircuitBreakerState>>,
128}
129
130#[derive(Debug)]
131struct CircuitBreakerState {
132 circuit_state: CircuitState,
133 failure_count: u32,
134 success_count: u32,
135 last_failure_time: Option<Instant>,
136 last_transition_time: Instant,
137}
138
139impl CircuitBreaker {
140 pub fn new(config: CircuitBreakerConfig) -> Self {
142 Self {
143 config,
144 state: Arc::new(Mutex::new(CircuitBreakerState {
145 circuit_state: CircuitState::Closed,
146 failure_count: 0,
147 success_count: 0,
148 last_failure_time: None,
149 last_transition_time: Instant::now(),
150 })),
151 }
152 }
153
154 pub fn can_execute(&self) -> bool {
156 let mut state = self.state.lock().unwrap();
157 let now = Instant::now();
158
159 match state.circuit_state {
160 CircuitState::Closed => true,
161 CircuitState::Open => {
162 if now.duration_since(state.last_transition_time) >= self.config.recovery_timeout {
163 state.circuit_state = CircuitState::HalfOpen;
165 state.success_count = 0;
166 state.last_transition_time = now;
167 debug!("Circuit breaker transitioning to half-open state");
168 true
169 } else {
170 false
171 }
172 }
173 CircuitState::HalfOpen => true,
174 }
175 }
176
177 pub fn record_success(&self) {
179 let mut state = self.state.lock().unwrap();
180 let now = Instant::now();
181
182 match state.circuit_state {
183 CircuitState::Closed => {
184 state.failure_count = 0;
186 }
187 CircuitState::HalfOpen => {
188 state.success_count += 1;
189 if state.success_count >= self.config.success_threshold {
190 state.circuit_state = CircuitState::Closed;
192 state.failure_count = 0;
193 state.success_count = 0;
194 state.last_transition_time = now;
195 info!("Circuit breaker closed - service recovered");
196 }
197 }
198 CircuitState::Open => {
199 warn!("Recording success in open circuit state");
201 }
202 }
203 }
204
205 pub fn record_failure(&self) {
207 let mut state = self.state.lock().unwrap();
208 let now = Instant::now();
209
210 state.last_failure_time = Some(now);
211
212 match state.circuit_state {
213 CircuitState::Closed => {
214 state.failure_count += 1;
215 if state.failure_count >= self.config.failure_threshold {
216 state.circuit_state = CircuitState::Open;
218 state.last_transition_time = now;
219 error!(
220 "Circuit breaker opened due to {} failures",
221 state.failure_count
222 );
223 }
224 }
225 CircuitState::HalfOpen => {
226 state.circuit_state = CircuitState::Open;
228 state.failure_count += 1;
229 state.success_count = 0;
230 state.last_transition_time = now;
231 warn!("Circuit breaker returned to open state");
232 }
233 CircuitState::Open => {
234 state.failure_count += 1;
235 }
236 }
237 }
238
239 pub fn state(&self) -> CircuitState {
241 self.state.lock().unwrap().circuit_state.clone()
242 }
243
244 pub fn failure_count(&self) -> u32 {
246 self.state.lock().unwrap().failure_count
247 }
248}
249
250impl Default for CircuitBreaker {
251 fn default() -> Self {
252 Self::new(CircuitBreakerConfig::default())
253 }
254}
255
256pub struct RetryExecutor {
258 config: RetryConfig,
259}
260
261impl RetryExecutor {
262 pub fn new(config: RetryConfig) -> Self {
264 Self { config }
265 }
266
267 pub async fn execute<F, Fut, T>(&self, operation: F) -> Result<T>
269 where
270 F: Fn() -> Fut,
271 Fut: std::future::Future<Output = Result<T>>,
272 {
273 let mut last_error = None;
274
275 for attempt in 0..self.config.max_attempts {
276 debug!(
277 "Executing operation, attempt {}/{}",
278 attempt + 1,
279 self.config.max_attempts
280 );
281
282 match operation().await {
283 Ok(result) => {
284 if attempt > 0 {
285 info!("Operation succeeded after {} retries", attempt);
286 }
287 return Ok(result);
288 }
289 Err(error) => {
290 last_error = Some(error.clone());
291
292 if !error.should_retry() {
293 debug!("Error is not retryable: {}", error);
294 return Err(error);
295 }
296
297 if attempt + 1 >= self.config.max_attempts {
298 error!(
299 "Operation failed after {} attempts: {}",
300 self.config.max_attempts, error
301 );
302 break;
303 }
304
305 let delay = self.config.calculate_delay(attempt);
306 warn!(
307 "Operation failed (attempt {}), retrying in {:?}: {}",
308 attempt + 1,
309 delay,
310 error
311 );
312
313 sleep(delay).await;
314 }
315 }
316 }
317
318 Err(last_error.unwrap_or_else(|| Error::other("Unknown retry error")))
319 }
320}
321
322impl Default for RetryExecutor {
323 fn default() -> Self {
324 Self::new(RetryConfig::default())
325 }
326}
327
328#[derive(Default)]
330pub struct ResilienceManager {
331 retry_executor: RetryExecutor,
332 circuit_breaker: CircuitBreaker,
333}
334
335impl ResilienceManager {
336 pub fn new(retry_config: RetryConfig, circuit_config: CircuitBreakerConfig) -> Self {
338 Self {
339 retry_executor: RetryExecutor::new(retry_config),
340 circuit_breaker: CircuitBreaker::new(circuit_config),
341 }
342 }
343
344 pub async fn execute<F, Fut, T>(&self, operation: F) -> Result<T>
346 where
347 F: Fn() -> Fut + Clone,
348 Fut: std::future::Future<Output = Result<T>>,
349 {
350 if !self.circuit_breaker.can_execute() {
351 return Err(Error::other("Circuit breaker is open"));
352 }
353
354 let result = self
355 .retry_executor
356 .execute(|| {
357 let op = operation.clone();
358 async move { op().await }
359 })
360 .await;
361
362 match &result {
363 Ok(_) => self.circuit_breaker.record_success(),
364 Err(error) => {
365 if matches!(
366 error.severity(),
367 ErrorSeverity::Error | ErrorSeverity::Critical
368 ) {
369 self.circuit_breaker.record_failure();
370 }
371 }
372 }
373
374 result
375 }
376
377 pub async fn execute_with_fallback<F, Fut, T, FB, FutB>(&self, operation: F, fallback: FB) -> T
379 where
380 F: Fn() -> Fut + Clone,
381 Fut: std::future::Future<Output = Result<T>>,
382 FB: Fn() -> FutB,
383 FutB: std::future::Future<Output = T>,
384 {
385 match self.execute(operation).await {
386 Ok(result) => result,
387 Err(error) => {
388 warn!("Operation failed, using fallback: {}", error);
389 fallback().await
390 }
391 }
392 }
393
394 pub fn circuit_state(&self) -> CircuitState {
396 self.circuit_breaker.state()
397 }
398
399 pub fn is_healthy(&self) -> bool {
401 matches!(self.circuit_breaker.state(), CircuitState::Closed)
402 }
403}
404
405pub struct DegradationHandler;
407
408impl DegradationHandler {
409 pub async fn handle_parse_failure<T>(
411 file_path: &std::path::Path,
412 error: &Error,
413 fallback_fn: impl std::future::Future<Output = Option<T>>,
414 ) -> Result<Option<T>> {
415 match error.recovery_strategy() {
416 RecoveryStrategy::Degrade => {
417 warn!(
418 "Gracefully degrading parse operation for {}: {}",
419 file_path.display(),
420 error
421 );
422 Ok(fallback_fn.await)
423 }
424 RecoveryStrategy::Fallback => {
425 info!(
426 "Using fallback for parse operation for {}: {}",
427 file_path.display(),
428 error
429 );
430 Ok(fallback_fn.await)
431 }
432 _ => Err(error.clone()),
433 }
434 }
435
436 pub fn handle_indexing_failure(
438 total_files: usize,
439 processed_files: usize,
440 error: &Error,
441 ) -> Result<()> {
442 match error.recovery_strategy() {
443 RecoveryStrategy::Degrade => {
444 let completion_rate = (processed_files as f64 / total_files as f64) * 100.0;
445 if completion_rate >= 80.0 {
446 warn!(
447 "Indexing completed with degraded results: {:.1}% processed",
448 completion_rate
449 );
450 Ok(())
451 } else {
452 Err(error.clone())
453 }
454 }
455 _ => Err(error.clone()),
456 }
457 }
458}
459
460#[cfg(test)]
461mod tests {
462 use super::*;
463 use tokio::time::{sleep, Duration};
464
465 #[test]
466 fn test_retry_config() {
467 let config = RetryConfig::new(5, Duration::from_millis(100))
468 .with_max_delay(Duration::from_secs(5))
469 .with_backoff_multiplier(2.0)
470 .with_jitter(true);
471
472 assert_eq!(config.max_attempts, 5);
473 assert_eq!(config.base_delay, Duration::from_millis(100));
474 assert_eq!(config.max_delay, Duration::from_secs(5));
475 assert_eq!(config.backoff_multiplier, 2.0);
476 assert!(config.jitter);
477 }
478
479 #[test]
480 fn test_retry_config_delay_calculation() {
481 let config = RetryConfig::new(3, Duration::from_millis(100))
482 .with_backoff_multiplier(2.0)
483 .with_jitter(false);
484
485 let delay1 = config.calculate_delay(0);
486 let delay2 = config.calculate_delay(1);
487 let delay3 = config.calculate_delay(2);
488
489 assert_eq!(delay1, Duration::from_millis(100));
490 assert_eq!(delay2, Duration::from_millis(200));
491 assert_eq!(delay3, Duration::from_millis(400));
492 }
493
494 #[test]
495 fn test_circuit_breaker_states() {
496 let config = CircuitBreakerConfig {
497 failure_threshold: 2,
498 success_threshold: 1,
499 recovery_timeout: Duration::from_millis(100),
500 time_window: Duration::from_secs(60),
501 };
502
503 let circuit = CircuitBreaker::new(config);
504
505 assert_eq!(circuit.state(), CircuitState::Closed);
507 assert!(circuit.can_execute());
508
509 circuit.record_failure();
511 assert_eq!(circuit.state(), CircuitState::Closed);
512 assert!(circuit.can_execute());
513
514 circuit.record_failure();
515 assert_eq!(circuit.state(), CircuitState::Open);
516 assert!(!circuit.can_execute());
517 }
518
519 #[tokio::test]
520 async fn test_circuit_breaker_recovery() {
521 let config = CircuitBreakerConfig {
522 failure_threshold: 1,
523 success_threshold: 1,
524 recovery_timeout: Duration::from_millis(50),
525 time_window: Duration::from_secs(60),
526 };
527
528 let circuit = CircuitBreaker::new(config);
529
530 circuit.record_failure();
532 assert_eq!(circuit.state(), CircuitState::Open);
533 assert!(!circuit.can_execute());
534
535 sleep(Duration::from_millis(60)).await;
537
538 assert!(circuit.can_execute());
540 assert_eq!(circuit.state(), CircuitState::HalfOpen);
541
542 circuit.record_success();
544 assert_eq!(circuit.state(), CircuitState::Closed);
545 assert!(circuit.can_execute());
546 }
547
548 #[tokio::test]
549 async fn test_retry_executor_success() {
550 let executor = RetryExecutor::new(RetryConfig::new(3, Duration::from_millis(10)));
551
552 let attempts = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
553 let attempts_clone = attempts.clone();
554
555 let result = executor
556 .execute(|| {
557 let current_attempt =
558 attempts_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
559 async move {
560 if current_attempt < 2 {
561 Err(Error::storage("temporary failure"))
562 } else {
563 Ok("success")
564 }
565 }
566 })
567 .await;
568
569 assert!(result.is_ok());
570 assert_eq!(result.unwrap(), "success");
571 assert_eq!(attempts.load(std::sync::atomic::Ordering::SeqCst), 2);
572 }
573
574 #[tokio::test]
575 async fn test_retry_executor_failure() {
576 let executor = RetryExecutor::new(RetryConfig::new(2, Duration::from_millis(10)));
577
578 let attempts = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
579 let attempts_clone = attempts.clone();
580
581 let result: Result<&str> = executor
582 .execute(|| {
583 attempts_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
584 async move { Err(Error::storage("persistent failure")) }
585 })
586 .await;
587
588 assert!(result.is_err());
589 assert_eq!(attempts.load(std::sync::atomic::Ordering::SeqCst), 2);
590 }
591
592 #[tokio::test]
593 async fn test_resilience_manager() {
594 let manager = ResilienceManager::new(
595 RetryConfig::new(2, Duration::from_millis(10)),
596 CircuitBreakerConfig {
597 failure_threshold: 3,
598 success_threshold: 1,
599 recovery_timeout: Duration::from_millis(50),
600 time_window: Duration::from_secs(60),
601 },
602 );
603
604 let attempts = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
605 let attempts_clone = attempts.clone();
606
607 let result = manager
608 .execute(|| {
609 let current_attempt =
610 attempts_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
611 async move {
612 if current_attempt < 2 {
613 Err(Error::storage("temporary failure"))
614 } else {
615 Ok("success")
616 }
617 }
618 })
619 .await;
620
621 assert!(result.is_ok());
622 assert_eq!(result.unwrap(), "success");
623 assert!(manager.is_healthy());
624 }
625
626 #[tokio::test]
627 async fn test_execute_with_fallback() {
628 let manager = ResilienceManager::default();
629
630 let result = manager
631 .execute_with_fallback(
632 || async { Err(Error::storage("operation failed")) },
633 || async { "fallback result" },
634 )
635 .await;
636
637 assert_eq!(result, "fallback result");
638 }
639}