1use std::time::Duration;
2
3use crate::Algorithm;
4
5fn log10(limit: usize) -> usize {
6 std::cmp::max(1, (limit as f64).log10() as usize)
7}
8
9#[derive(Debug, Clone)]
15pub struct Vegas {
16 estimated_limit: f64,
17 max_limit: usize,
18 rtt_noload: Option<Duration>,
19 smoothing: f64,
20 alpha_fn: fn(usize) -> usize,
21 beta_fn: fn(usize) -> usize,
22 threshold_fn: fn(usize) -> usize,
23 increase_fn: fn(f64) -> f64,
24 decrease_fn: fn(f64) -> f64,
25 probe_multiplier: usize,
26 probe_count: usize,
27 probe_jitter: f64,
28}
29
30impl Vegas {
31 pub fn builder() -> VegasBuilder {
33 VegasBuilder::default()
34 }
35
36 fn should_probe(&self, limit: usize) -> bool {
37 let interval = (self.probe_jitter * self.probe_multiplier as f64 * limit as f64) as usize;
38 interval > 0 && self.probe_count >= interval
39 }
40
41 fn next_jitter(&self) -> f64 {
44 let mut x = (self.probe_count as u64)
46 .wrapping_mul(0x9E37_79B9_7F4A_7C15)
47 .wrapping_add(self.estimated_limit.to_bits());
48 x ^= x >> 30;
49 x = x.wrapping_mul(0xBF58_476D_1CE4_E5B9);
50 x ^= x >> 27;
51 x = x.wrapping_mul(0x94D0_49BB_1331_11EB);
52 x ^= x >> 31;
53 0.5 + (x >> 11) as f64 / (1u64 << 53) as f64 * 0.5
55 }
56}
57
58impl Default for Vegas {
59 fn default() -> Self {
60 VegasBuilder::default().build()
61 }
62}
63
64impl Algorithm for Vegas {
65 fn max_concurrency(&self) -> usize {
66 std::cmp::max(1, self.estimated_limit as usize)
67 }
68
69 fn update(&mut self, rtt: Duration, num_inflight: usize, is_error: bool, is_canceled: bool) {
70 if is_canceled {
71 return;
72 }
73
74 self.probe_count += 1;
75
76 let limit = self.estimated_limit as usize;
77
78 if self.should_probe(limit) {
80 self.probe_count = 0;
81 self.probe_jitter = self.next_jitter();
82 self.rtt_noload = Some(rtt);
83 return;
84 }
85
86 let rtt_noload = match self.rtt_noload {
88 Some(current) if rtt < current => {
89 self.rtt_noload = Some(rtt);
90 return;
91 }
92 Some(current) => current,
93 None => {
94 self.rtt_noload = Some(rtt);
95 return;
96 }
97 };
98
99 if num_inflight * 2 < limit {
102 return;
103 }
104
105 let rtt_nanos = rtt.as_nanos() as f64;
107 let rtt_noload_nanos = rtt_noload.as_nanos() as f64;
108 let queue_size =
109 (self.estimated_limit * (1.0 - rtt_noload_nanos / rtt_nanos)).ceil() as usize;
110
111 let alpha = (self.alpha_fn)(limit);
112 let beta = (self.beta_fn)(limit);
113 let threshold = (self.threshold_fn)(limit);
114
115 let new_limit = if is_error {
116 (self.decrease_fn)(self.estimated_limit)
118 } else if queue_size <= threshold {
119 self.estimated_limit + beta as f64
121 } else if queue_size < alpha {
122 (self.increase_fn)(self.estimated_limit)
124 } else if queue_size > beta {
125 (self.decrease_fn)(self.estimated_limit)
127 } else {
128 return;
130 };
131
132 let new_limit = new_limit.clamp(1.0, self.max_limit as f64);
133 self.estimated_limit =
134 (1.0 - self.smoothing) * self.estimated_limit + self.smoothing * new_limit;
135 }
136}
137
138pub struct VegasBuilder {
156 initial_limit: usize,
157 max_limit: usize,
158 smoothing: f64,
159 alpha_fn: fn(usize) -> usize,
160 beta_fn: fn(usize) -> usize,
161 threshold_fn: fn(usize) -> usize,
162 increase_fn: fn(f64) -> f64,
163 decrease_fn: fn(f64) -> f64,
164 probe_multiplier: usize,
165}
166
167impl Default for VegasBuilder {
168 fn default() -> Self {
169 Self {
170 initial_limit: 20,
171 max_limit: 1000,
172 smoothing: 1.0,
173 alpha_fn: |limit| 3 * log10(limit),
174 beta_fn: |limit| 6 * log10(limit),
175 threshold_fn: log10,
176 increase_fn: |limit| limit + log10(limit as usize) as f64,
177 decrease_fn: |limit| limit - log10(limit as usize) as f64,
178 probe_multiplier: 30,
179 }
180 }
181}
182
183impl VegasBuilder {
184 pub fn initial_limit(mut self, limit: usize) -> Self {
186 self.initial_limit = limit;
187 self
188 }
189
190 pub fn max_limit(mut self, limit: usize) -> Self {
192 self.max_limit = limit;
193 self
194 }
195
196 pub fn smoothing(mut self, smoothing: f64) -> Self {
199 self.smoothing = smoothing;
200 self
201 }
202
203 pub fn alpha(mut self, f: fn(usize) -> usize) -> Self {
206 self.alpha_fn = f;
207 self
208 }
209
210 pub fn beta(mut self, f: fn(usize) -> usize) -> Self {
213 self.beta_fn = f;
214 self
215 }
216
217 pub fn threshold(mut self, f: fn(usize) -> usize) -> Self {
220 self.threshold_fn = f;
221 self
222 }
223
224 pub fn increase(mut self, f: fn(f64) -> f64) -> Self {
227 self.increase_fn = f;
228 self
229 }
230
231 pub fn decrease(mut self, f: fn(f64) -> f64) -> Self {
234 self.decrease_fn = f;
235 self
236 }
237
238 pub fn probe_multiplier(mut self, multiplier: usize) -> Self {
241 self.probe_multiplier = multiplier;
242 self
243 }
244
245 pub fn build(self) -> Vegas {
247 Vegas {
248 estimated_limit: self.initial_limit as f64,
249 max_limit: self.max_limit,
250 rtt_noload: None,
251 smoothing: self.smoothing,
252 alpha_fn: self.alpha_fn,
253 beta_fn: self.beta_fn,
254 threshold_fn: self.threshold_fn,
255 increase_fn: self.increase_fn,
256 decrease_fn: self.decrease_fn,
257 probe_multiplier: self.probe_multiplier,
258 probe_count: 0,
259 probe_jitter: 0.5 + (self.initial_limit as f64 / self.max_limit as f64) * 0.5,
260 }
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267
268 #[test]
269 fn increase_limit_on_low_queue() {
270 let mut vegas = Vegas::builder().initial_limit(10).build();
271 vegas.rtt_noload = Some(Duration::from_millis(10));
272
273 vegas.update(Duration::from_millis(11), 10, false, false);
275 assert!(vegas.max_concurrency() > 10);
276 }
277
278 #[test]
279 fn decrease_limit_on_high_queue() {
280 let mut vegas = Vegas::builder().initial_limit(10).build();
281 vegas.rtt_noload = Some(Duration::from_millis(10));
282
283 vegas.update(Duration::from_millis(50), 10, false, false);
285 assert!(vegas.max_concurrency() < 10);
286 }
287
288 #[test]
289 fn decrease_limit_on_error() {
290 let mut vegas = Vegas::builder().initial_limit(10).build();
291 vegas.rtt_noload = Some(Duration::from_millis(10));
292
293 vegas.update(Duration::from_millis(10), 10, true, false);
294 assert!(vegas.max_concurrency() < 10);
295 }
296
297 #[test]
298 fn no_change_within_thresholds() {
299 let mut vegas = Vegas::builder().initial_limit(10).build();
300 vegas.rtt_noload = Some(Duration::from_millis(10));
301
302 vegas.update(Duration::from_nanos(16_670_000), 10, false, false);
307 assert_eq!(vegas.max_concurrency(), 10);
308 }
309
310 #[test]
311 fn canceled_requests_are_ignored() {
312 let mut vegas = Vegas::builder().initial_limit(10).build();
313 vegas.rtt_noload = Some(Duration::from_millis(10));
314
315 vegas.update(Duration::from_millis(50), 10, false, true);
316 assert_eq!(vegas.max_concurrency(), 10);
317 }
318
319 #[test]
320 fn smoothing_dampens_changes() {
321 let mut vegas = Vegas::builder().initial_limit(100).smoothing(0.5).build();
322 vegas.rtt_noload = Some(Duration::from_millis(10));
323
324 vegas.update(Duration::from_millis(10), 100, true, false);
326 let limit = vegas.max_concurrency();
327 assert_eq!(limit, 99);
330 }
331
332 #[test]
333 fn limit_never_below_one() {
334 let mut vegas = Vegas::builder().initial_limit(1).build();
335 vegas.rtt_noload = Some(Duration::from_millis(10));
336
337 for _ in 0..100 {
338 vegas.update(Duration::from_millis(10), 1, true, false);
339 }
340 assert_eq!(vegas.max_concurrency(), 1);
341 }
342}