1use std::time::Duration;
2
3use crate::Algorithm;
4
5#[derive(Debug, Clone)]
26pub struct Aimd {
27 estimated_limit: f64,
28 min_limit: usize,
29 max_limit: usize,
30 backoff_ratio: f64,
31 timeout: Duration,
32}
33
34impl Aimd {
35 pub fn builder() -> AimdBuilder {
37 AimdBuilder::default()
38 }
39}
40
41impl Default for Aimd {
42 fn default() -> Self {
43 AimdBuilder::default().build()
44 }
45}
46
47impl Algorithm for Aimd {
48 fn max_concurrency(&self) -> usize {
49 (self.estimated_limit as usize).clamp(self.min_limit, self.max_limit)
50 }
51
52 fn update(&mut self, rtt: Duration, num_inflight: usize, is_error: bool, is_canceled: bool) {
53 if is_canceled {
54 return;
55 }
56 let limit = self.estimated_limit;
57
58 let new_limit = if is_error || rtt > self.timeout {
59 limit * self.backoff_ratio
61 } else if num_inflight * 2 >= limit as usize {
62 limit + 1.0
64 } else {
65 return;
66 };
67 self.estimated_limit = new_limit.clamp(self.min_limit as f64, self.max_limit as f64);
68 }
69}
70
71pub struct AimdBuilder {
85 initial_limit: usize,
86 min_limit: usize,
87 max_limit: usize,
88 backoff_ratio: f64,
89 timeout: Duration,
90}
91
92impl Default for AimdBuilder {
93 fn default() -> Self {
94 Self {
95 initial_limit: 20,
96 min_limit: 20,
97 max_limit: 200,
98 backoff_ratio: 0.9,
99 timeout: Duration::from_secs(5),
100 }
101 }
102}
103
104impl AimdBuilder {
105 pub fn initial_limit(mut self, limit: usize) -> Self {
107 self.initial_limit = limit;
108 self
109 }
110
111 pub fn min_limit(mut self, limit: usize) -> Self {
113 self.min_limit = limit;
114 self
115 }
116
117 pub fn max_limit(mut self, limit: usize) -> Self {
119 self.max_limit = limit;
120 self
121 }
122
123 pub fn backoff_ratio(mut self, ratio: f64) -> Self {
126 self.backoff_ratio = ratio;
127 self
128 }
129
130 pub fn timeout(mut self, timeout: Duration) -> Self {
133 self.timeout = timeout;
134 self
135 }
136
137 pub fn build(self) -> Aimd {
147 assert!(
148 (0.5..1.0).contains(&self.backoff_ratio),
149 "backoff_ratio must be in [0.5, 1.0), got {}",
150 self.backoff_ratio,
151 );
152 assert!(self.min_limit > 0, "min_limit must be > 0");
153 assert!(
154 self.min_limit <= self.max_limit,
155 "min_limit ({}) must be <= max_limit ({})",
156 self.min_limit,
157 self.max_limit,
158 );
159 assert!(
160 self.initial_limit >= self.min_limit && self.initial_limit <= self.max_limit,
161 "initial_limit ({}) must be in [min_limit({}), max_limit({})]",
162 self.initial_limit,
163 self.min_limit,
164 self.max_limit,
165 );
166
167 Aimd {
168 estimated_limit: self.initial_limit as f64,
169 min_limit: self.min_limit,
170 max_limit: self.max_limit,
171 backoff_ratio: self.backoff_ratio,
172 timeout: self.timeout,
173 }
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use super::*;
180
181 #[test]
182 fn increase_limit_on_success_when_loaded() {
183 let mut aimd = Aimd::builder().initial_limit(10).min_limit(1).build();
184
185 aimd.update(Duration::from_millis(50), 10, false, false);
187 assert_eq!(aimd.max_concurrency(), 11);
188 }
189
190 #[test]
191 fn no_increase_when_lightly_loaded() {
192 let mut aimd = Aimd::builder().initial_limit(10).min_limit(1).build();
193
194 aimd.update(Duration::from_millis(50), 2, false, false);
196 assert_eq!(aimd.max_concurrency(), 10);
197 }
198
199 #[test]
200 fn decrease_limit_on_error() {
201 let mut aimd = Aimd::builder().initial_limit(10).min_limit(1).build();
202
203 aimd.update(Duration::from_millis(50), 10, true, false);
204 assert_eq!(aimd.max_concurrency(), 9); }
206
207 #[test]
208 fn decrease_limit_on_timeout() {
209 let mut aimd = Aimd::builder()
210 .initial_limit(10)
211 .min_limit(1)
212 .timeout(Duration::from_secs(1))
213 .build();
214
215 aimd.update(Duration::from_secs(2), 10, false, false);
217 assert_eq!(aimd.max_concurrency(), 9);
218 }
219
220 #[test]
221 fn canceled_requests_are_ignored() {
222 let mut aimd = Aimd::builder().initial_limit(10).min_limit(1).build();
223
224 aimd.update(Duration::from_millis(50), 10, true, true);
225 assert_eq!(aimd.max_concurrency(), 10);
226 }
227
228 #[test]
229 fn limit_does_not_drop_below_min() {
230 let mut aimd = Aimd::builder().initial_limit(5).min_limit(5).build();
231
232 for _ in 0..100 {
233 aimd.update(Duration::from_millis(50), 10, true, false);
234 }
235 assert_eq!(aimd.max_concurrency(), 5);
236 }
237
238 #[test]
239 fn limit_does_not_exceed_max() {
240 let mut aimd = Aimd::builder()
241 .initial_limit(10)
242 .min_limit(1)
243 .max_limit(12)
244 .build();
245
246 for _ in 0..100 {
247 aimd.update(Duration::from_millis(50), 10, false, false);
248 }
249 assert_eq!(aimd.max_concurrency(), 12);
250 }
251
252 #[test]
253 fn custom_backoff_ratio() {
254 let mut aimd = Aimd::builder()
255 .initial_limit(100)
256 .min_limit(1)
257 .backoff_ratio(0.5)
258 .build();
259
260 aimd.update(Duration::from_millis(50), 100, true, false);
261 assert_eq!(aimd.max_concurrency(), 50); }
263
264 #[test]
265 #[should_panic(expected = "backoff_ratio must be in [0.5, 1.0)")]
266 fn rejects_backoff_ratio_too_low() {
267 Aimd::builder().backoff_ratio(0.3).build();
268 }
269
270 #[test]
271 #[should_panic(expected = "backoff_ratio must be in [0.5, 1.0)")]
272 fn rejects_backoff_ratio_ge_one() {
273 Aimd::builder().backoff_ratio(1.0).build();
274 }
275
276 #[test]
277 #[should_panic(expected = "min_limit must be > 0")]
278 fn rejects_zero_min_limit() {
279 Aimd::builder().min_limit(0).build();
280 }
281
282 #[test]
283 #[should_panic(expected = "min_limit (50) must be <= max_limit (10)")]
284 fn rejects_min_exceeds_max() {
285 Aimd::builder()
286 .min_limit(50)
287 .max_limit(10)
288 .initial_limit(50)
289 .build();
290 }
291
292 #[test]
293 #[should_panic(expected = "initial_limit (5) must be in")]
294 fn rejects_initial_below_min() {
295 Aimd::builder()
296 .initial_limit(5)
297 .min_limit(10)
298 .max_limit(100)
299 .build();
300 }
301}