app_task/backoff_strategy/
threshold_buckets.rs

1use std::time::{Duration, Instant};
2
3use super::{BackoffStrategy, StrategyFactory};
4
5pub struct ThresholdBucketsFactory {
6    pub buckets: Vec<(usize, Duration)>,
7    pub monitoring_period: Duration,
8}
9
10impl Default for ThresholdBucketsFactory {
11    fn default() -> Self {
12        Self {
13            buckets: vec![
14                (1, Duration::from_secs(5)),
15                (5, Duration::from_secs(10)),
16                (10, Duration::from_secs(30)),
17            ],
18            monitoring_period: Duration::from_secs(60),
19        }
20    }
21}
22
23impl StrategyFactory for ThresholdBucketsFactory {
24    type Strategy = ThresholdBucketsBackoff;
25
26    fn create_strategy(&self) -> Self::Strategy {
27        ThresholdBucketsBackoff {
28            buckets: self.buckets.clone(),
29            monitoring_period: self.monitoring_period,
30            failures: Vec::new(),
31        }
32    }
33}
34
35pub struct ThresholdBucketsBackoff {
36    buckets: Vec<(usize, Duration)>,
37    monitoring_period: Duration,
38    failures: Vec<Instant>,
39}
40
41impl BackoffStrategy for ThresholdBucketsBackoff {
42    fn add_failure(&mut self) {
43        self.failures.push(Instant::now());
44        self.prune_failures();
45    }
46
47    fn next_backoff(&self) -> Duration {
48        let mut backoff = Duration::from_secs(0);
49
50        for (threshold, duration) in &self.buckets {
51            if self.failures.len() >= *threshold {
52                backoff = *duration;
53            }
54        }
55
56        backoff
57    }
58}
59
60impl ThresholdBucketsBackoff {
61    fn prune_failures(&mut self) {
62        self.failures
63            .retain(|instant| instant.elapsed() < self.monitoring_period);
64    }
65}