Skip to main content

tower_acc/
aimd.rs

1use std::time::Duration;
2
3use crate::Algorithm;
4
5/// AIMD (Additive Increase / Multiplicative Decrease) concurrency limit
6/// strategy.
7///
8/// A loss-based algorithm that increases the limit by a fixed amount on
9/// success and multiplies it by a backoff ratio on error or timeout.
10/// This is the same approach used in TCP Reno congestion control.
11///
12/// Unlike [`Vegas`](crate::Vegas), AIMD does not track baseline RTT or
13/// estimate queue depth — it reacts purely to errors and timeouts, making it
14/// simpler but less proactive.
15///
16/// # Differences from Netflix's Java implementation
17///
18/// The Java reference truncates the limit to an integer after every update,
19/// while this implementation keeps it as an `f64` internally. This means
20/// repeated backoffs decay more smoothly (e.g. 10.0 → 9.0 → 8.1 → 7.29
21/// instead of 10 → 9 → 8 → 7). The observable limit (via
22/// [`max_concurrency`](Algorithm::max_concurrency)) is the same in most
23/// cases, but after recovery the internal state may be slightly higher than
24/// the Java equivalent, leading to marginally faster ramp-up.
25#[derive(Debug, Clone)]
26pub struct Aimd {
27    estimated_limit: f64,
28    min_limit: usize,
29    max_limit: usize,
30    backoff_ratio: f64,
31    timeout: Duration,
32}
33
34impl Aimd {
35    /// Returns an `AimdBuilder` for configuring a new `Aimd` instance.
36    pub fn builder() -> AimdBuilder {
37        AimdBuilder::default()
38    }
39}
40
41impl Default for Aimd {
42    fn default() -> Self {
43        AimdBuilder::default().build()
44    }
45}
46
47impl Algorithm for Aimd {
48    fn max_concurrency(&self) -> usize {
49        (self.estimated_limit as usize).clamp(self.min_limit, self.max_limit)
50    }
51
52    fn update(&mut self, rtt: Duration, num_inflight: usize, is_error: bool, is_canceled: bool) {
53        if is_canceled {
54            return;
55        }
56        let limit = self.estimated_limit;
57
58        let new_limit = if is_error || rtt > self.timeout {
59            // Multiplicative decrease.
60            limit * self.backoff_ratio
61        } else if num_inflight * 2 >= limit as usize {
62            // Additive increase — only when the system is reasonably loaded.
63            limit + 1.0
64        } else {
65            return;
66        };
67        self.estimated_limit = new_limit.clamp(self.min_limit as f64, self.max_limit as f64);
68    }
69}
70
71/// Builder for configuring an [`Aimd`] algorithm instance.
72///
73/// See [`Aimd::builder`] for usage.
74///
75/// # Defaults
76///
77/// | Parameter | Default |
78/// |-----------|---------|
79/// | `initial_limit` | 20 |
80/// | `min_limit` | 20 |
81/// | `max_limit` | 200 |
82/// | `backoff_ratio` | 0.9 |
83/// | `timeout` | 5 seconds |
84pub struct AimdBuilder {
85    initial_limit: usize,
86    min_limit: usize,
87    max_limit: usize,
88    backoff_ratio: f64,
89    timeout: Duration,
90}
91
92impl Default for AimdBuilder {
93    fn default() -> Self {
94        Self {
95            initial_limit: 20,
96            min_limit: 20,
97            max_limit: 200,
98            backoff_ratio: 0.9,
99            timeout: Duration::from_secs(5),
100        }
101    }
102}
103
104impl AimdBuilder {
105    /// Sets the starting concurrency limit (default: 20).
106    pub fn initial_limit(mut self, limit: usize) -> Self {
107        self.initial_limit = limit;
108        self
109    }
110
111    /// Sets the lower bound the limit can reach (default: 20).
112    pub fn min_limit(mut self, limit: usize) -> Self {
113        self.min_limit = limit;
114        self
115    }
116
117    /// Sets the upper bound the limit can reach (default: 200).
118    pub fn max_limit(mut self, limit: usize) -> Self {
119        self.max_limit = limit;
120        self
121    }
122
123    /// Sets the multiplicative backoff ratio applied on errors or timeouts
124    /// (default: 0.9). Must be in `(0, 1)`.
125    pub fn backoff_ratio(mut self, ratio: f64) -> Self {
126        self.backoff_ratio = ratio;
127        self
128    }
129
130    /// Sets the RTT threshold above which a request is treated as a timeout
131    /// (default: 5 seconds).
132    pub fn timeout(mut self, timeout: Duration) -> Self {
133        self.timeout = timeout;
134        self
135    }
136
137    /// Builds the [`Aimd`] algorithm with the configured parameters.
138    ///
139    /// # Panics
140    ///
141    /// Panics if:
142    /// - `backoff_ratio` is not in `[0.5, 1.0)`
143    /// - `min_limit` is zero
144    /// - `min_limit > max_limit`
145    /// - `initial_limit < min_limit` or `initial_limit > max_limit`
146    pub fn build(self) -> Aimd {
147        assert!(
148            (0.5..1.0).contains(&self.backoff_ratio),
149            "backoff_ratio must be in [0.5, 1.0), got {}",
150            self.backoff_ratio,
151        );
152        assert!(self.min_limit > 0, "min_limit must be > 0");
153        assert!(
154            self.min_limit <= self.max_limit,
155            "min_limit ({}) must be <= max_limit ({})",
156            self.min_limit,
157            self.max_limit,
158        );
159        assert!(
160            self.initial_limit >= self.min_limit && self.initial_limit <= self.max_limit,
161            "initial_limit ({}) must be in [min_limit({}), max_limit({})]",
162            self.initial_limit,
163            self.min_limit,
164            self.max_limit,
165        );
166
167        Aimd {
168            estimated_limit: self.initial_limit as f64,
169            min_limit: self.min_limit,
170            max_limit: self.max_limit,
171            backoff_ratio: self.backoff_ratio,
172            timeout: self.timeout,
173        }
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180
181    #[test]
182    fn increase_limit_on_success_when_loaded() {
183        let mut aimd = Aimd::builder().initial_limit(10).min_limit(1).build();
184
185        // Inflight * 2 >= limit → loaded → should increase by 1.
186        aimd.update(Duration::from_millis(50), 10, false, false);
187        assert_eq!(aimd.max_concurrency(), 11);
188    }
189
190    #[test]
191    fn no_increase_when_lightly_loaded() {
192        let mut aimd = Aimd::builder().initial_limit(10).min_limit(1).build();
193
194        // Inflight * 2 < limit → not loaded enough → no change.
195        aimd.update(Duration::from_millis(50), 2, false, false);
196        assert_eq!(aimd.max_concurrency(), 10);
197    }
198
199    #[test]
200    fn decrease_limit_on_error() {
201        let mut aimd = Aimd::builder().initial_limit(10).min_limit(1).build();
202
203        aimd.update(Duration::from_millis(50), 10, true, false);
204        assert_eq!(aimd.max_concurrency(), 9); // 10 * 0.9 = 9
205    }
206
207    #[test]
208    fn decrease_limit_on_timeout() {
209        let mut aimd = Aimd::builder()
210            .initial_limit(10)
211            .min_limit(1)
212            .timeout(Duration::from_secs(1))
213            .build();
214
215        // RTT exceeds timeout → treat as error.
216        aimd.update(Duration::from_secs(2), 10, false, false);
217        assert_eq!(aimd.max_concurrency(), 9);
218    }
219
220    #[test]
221    fn canceled_requests_are_ignored() {
222        let mut aimd = Aimd::builder().initial_limit(10).min_limit(1).build();
223
224        aimd.update(Duration::from_millis(50), 10, true, true);
225        assert_eq!(aimd.max_concurrency(), 10);
226    }
227
228    #[test]
229    fn limit_does_not_drop_below_min() {
230        let mut aimd = Aimd::builder().initial_limit(5).min_limit(5).build();
231
232        for _ in 0..100 {
233            aimd.update(Duration::from_millis(50), 10, true, false);
234        }
235        assert_eq!(aimd.max_concurrency(), 5);
236    }
237
238    #[test]
239    fn limit_does_not_exceed_max() {
240        let mut aimd = Aimd::builder()
241            .initial_limit(10)
242            .min_limit(1)
243            .max_limit(12)
244            .build();
245
246        for _ in 0..100 {
247            aimd.update(Duration::from_millis(50), 10, false, false);
248        }
249        assert_eq!(aimd.max_concurrency(), 12);
250    }
251
252    #[test]
253    fn custom_backoff_ratio() {
254        let mut aimd = Aimd::builder()
255            .initial_limit(100)
256            .min_limit(1)
257            .backoff_ratio(0.5)
258            .build();
259
260        aimd.update(Duration::from_millis(50), 100, true, false);
261        assert_eq!(aimd.max_concurrency(), 50); // 100 * 0.5 = 50
262    }
263
264    #[test]
265    #[should_panic(expected = "backoff_ratio must be in [0.5, 1.0)")]
266    fn rejects_backoff_ratio_too_low() {
267        Aimd::builder().backoff_ratio(0.3).build();
268    }
269
270    #[test]
271    #[should_panic(expected = "backoff_ratio must be in [0.5, 1.0)")]
272    fn rejects_backoff_ratio_ge_one() {
273        Aimd::builder().backoff_ratio(1.0).build();
274    }
275
276    #[test]
277    #[should_panic(expected = "min_limit must be > 0")]
278    fn rejects_zero_min_limit() {
279        Aimd::builder().min_limit(0).build();
280    }
281
282    #[test]
283    #[should_panic(expected = "min_limit (50) must be <= max_limit (10)")]
284    fn rejects_min_exceeds_max() {
285        Aimd::builder()
286            .min_limit(50)
287            .max_limit(10)
288            .initial_limit(50)
289            .build();
290    }
291
292    #[test]
293    #[should_panic(expected = "initial_limit (5) must be in")]
294    fn rejects_initial_below_min() {
295        Aimd::builder()
296            .initial_limit(5)
297            .min_limit(10)
298            .max_limit(100)
299            .build();
300    }
301}