fluxus_core/error_handling/
mod.rs

1mod backpressure;
2mod retry_strategy;
3
4pub use backpressure::{BackpressureController, BackpressureStrategy};
5use fluxus_utils::models::StreamResult;
6pub use retry_strategy::RetryStrategy;
7use tokio::time::sleep;
8
9/// Error handler for retrying operations
10pub struct ErrorHandler {
11    strategy: RetryStrategy,
12}
13
14impl ErrorHandler {
15    /// Create a new error handler with the given retry strategy
16    pub fn new(strategy: RetryStrategy) -> Self {
17        Self { strategy }
18    }
19
20    /// Retry an operation with the configured strategy
21    pub async fn retry<F, T>(&self, mut operation: F) -> StreamResult<T>
22    where
23        F: FnMut() -> StreamResult<T>,
24    {
25        let mut attempt = 0;
26        loop {
27            match operation() {
28                Ok(value) => return Ok(value),
29                Err(error) => {
30                    if let Some(delay) = self.strategy.get_delay(attempt) {
31                        tracing::warn!(
32                            "Operation failed (attempt {}/{}): {}. Retrying after {:?}",
33                            attempt + 1,
34                            match &self.strategy {
35                                RetryStrategy::NoRetry => 1,
36                                RetryStrategy::Fixed { max_attempts, .. } => *max_attempts,
37                                RetryStrategy::ExponentialBackoff { max_attempts, .. } =>
38                                    *max_attempts,
39                            },
40                            error,
41                            delay
42                        );
43                        sleep(delay).await;
44                        attempt += 1;
45                    } else {
46                        return Err(error);
47                    }
48                }
49            }
50        }
51    }
52}