fluxus_core/error_handling/
mod.rs1mod backpressure;
2mod retry_strategy;
3
4pub use backpressure::{BackpressureController, BackpressureStrategy};
5use fluxus_utils::models::StreamResult;
6pub use retry_strategy::RetryStrategy;
7use tokio::time::sleep;
8
9pub struct ErrorHandler {
11 strategy: RetryStrategy,
12}
13
14impl ErrorHandler {
15 pub fn new(strategy: RetryStrategy) -> Self {
17 Self { strategy }
18 }
19
20 pub async fn retry<F, T>(&self, mut operation: F) -> StreamResult<T>
22 where
23 F: FnMut() -> StreamResult<T>,
24 {
25 let mut attempt = 0;
26 loop {
27 match operation() {
28 Ok(value) => return Ok(value),
29 Err(error) => {
30 if let Some(delay) = self.strategy.get_delay(attempt) {
31 tracing::warn!(
32 "Operation failed (attempt {}/{}): {}. Retrying after {:?}",
33 attempt + 1,
34 match &self.strategy {
35 RetryStrategy::NoRetry => 1,
36 RetryStrategy::Fixed { max_attempts, .. } => *max_attempts,
37 RetryStrategy::ExponentialBackoff { max_attempts, .. } =>
38 *max_attempts,
39 },
40 error,
41 delay
42 );
43 sleep(delay).await;
44 attempt += 1;
45 } else {
46 return Err(error);
47 }
48 }
49 }
50 }
51 }
52}