Skip to main content

tower_acc/
vegas.rs

1use std::time::Duration;
2
3use crate::Algorithm;
4
5fn log10(limit: usize) -> usize {
6    std::cmp::max(1, (limit as f64).log10() as usize)
7}
8
9/// TCP Vegas–inspired adaptive concurrency limit strategy.
10///
11/// Estimates queue depth from the ratio of updated RTT to the minimum
12/// (no-load) RTT, then adjusts the concurrency limit based on where the
13/// estimated queue falls relative to configurable alpha/beta thresholds.
14#[derive(Debug, Clone)]
15pub struct Vegas {
16    estimated_limit: f64,
17    max_limit: usize,
18    rtt_noload: Option<Duration>,
19    smoothing: f64,
20    alpha_fn: fn(usize) -> usize,
21    beta_fn: fn(usize) -> usize,
22    threshold_fn: fn(usize) -> usize,
23    increase_fn: fn(f64) -> f64,
24    decrease_fn: fn(f64) -> f64,
25    probe_multiplier: usize,
26    probe_count: usize,
27    probe_jitter: f64,
28}
29
30impl Vegas {
31    /// Returns a `VegasBuilder` for configuring a new `Vegas` instance.
32    pub fn builder() -> VegasBuilder {
33        VegasBuilder::default()
34    }
35
36    fn should_probe(&self, limit: usize) -> bool {
37        let interval = (self.probe_jitter * self.probe_multiplier as f64 * limit as f64) as usize;
38        interval > 0 && self.probe_count >= interval
39    }
40
41    /// Returns a random jitter in [0.5, 1.0) using a simple xorshift on the
42    /// current probe count and estimated limit to avoid pulling in a RNG crate.
43    fn next_jitter(&self) -> f64 {
44        // Mix bits from the current state to produce a pseudo-random value.
45        let mut x = (self.probe_count as u64)
46            .wrapping_mul(0x9E37_79B9_7F4A_7C15)
47            .wrapping_add(self.estimated_limit.to_bits());
48        x ^= x >> 30;
49        x = x.wrapping_mul(0xBF58_476D_1CE4_E5B9);
50        x ^= x >> 27;
51        x = x.wrapping_mul(0x94D0_49BB_1331_11EB);
52        x ^= x >> 31;
53        // Map to [0.5, 1.0)
54        0.5 + (x >> 11) as f64 / (1u64 << 53) as f64 * 0.5
55    }
56}
57
58impl Default for Vegas {
59    fn default() -> Self {
60        VegasBuilder::default().build()
61    }
62}
63
64impl Algorithm for Vegas {
65    fn max_concurrency(&self) -> usize {
66        std::cmp::max(1, self.estimated_limit as usize)
67    }
68
69    fn update(&mut self, rtt: Duration, num_inflight: usize, is_error: bool, is_canceled: bool) {
70        if is_canceled {
71            return;
72        }
73
74        self.probe_count += 1;
75
76        let limit = self.estimated_limit as usize;
77
78        // Periodically reset rtt_noload to track baseline changes.
79        if self.should_probe(limit) {
80            self.probe_count = 0;
81            self.probe_jitter = self.next_jitter();
82            self.rtt_noload = Some(rtt);
83            return;
84        }
85
86        // Update rtt_noload, recording baseline on the first sample.
87        let rtt_noload = match self.rtt_noload {
88            Some(current) if rtt < current => {
89                self.rtt_noload = Some(rtt);
90                return;
91            }
92            Some(current) => current,
93            None => {
94                self.rtt_noload = Some(rtt);
95                return;
96            }
97        };
98
99        // Don't adjust the limit when the system is lightly loaded — low RTT
100        // is a misleading signal when few requests are in-flight.
101        if num_inflight * 2 < limit {
102            return;
103        }
104
105        // Estimate queue depth: limit × (1 − rtt_noload / rtt).
106        let rtt_nanos = rtt.as_nanos() as f64;
107        let rtt_noload_nanos = rtt_noload.as_nanos() as f64;
108        let queue_size =
109            (self.estimated_limit * (1.0 - rtt_noload_nanos / rtt_nanos)).ceil() as usize;
110
111        let alpha = (self.alpha_fn)(limit);
112        let beta = (self.beta_fn)(limit);
113        let threshold = (self.threshold_fn)(limit);
114
115        let new_limit = if is_error {
116            // Errors (timeouts / overload) immediately decrease.
117            (self.decrease_fn)(self.estimated_limit)
118        } else if queue_size <= threshold {
119            // Very short queue — aggressive increase.
120            self.estimated_limit + beta as f64
121        } else if queue_size < alpha {
122            // Short queue — gradual increase.
123            (self.increase_fn)(self.estimated_limit)
124        } else if queue_size > beta {
125            // Long queue — decrease.
126            (self.decrease_fn)(self.estimated_limit)
127        } else {
128            // Within [alpha, beta] — no change.
129            return;
130        };
131
132        let new_limit = new_limit.clamp(1.0, self.max_limit as f64);
133        self.estimated_limit =
134            (1.0 - self.smoothing) * self.estimated_limit + self.smoothing * new_limit;
135    }
136}
137
138/// Builder for configuring a [`Vegas`] algorithm instance.
139///
140/// See [`Vegas::builder`] for usage.
141///
142/// # Defaults
143///
144/// | Parameter | Default |
145/// |-----------|---------|
146/// | `initial_limit` | 20 |
147/// | `max_limit` | 1000 |
148/// | `smoothing` | 1.0 (no smoothing) |
149/// | `alpha` | `3 × log10(limit)` |
150/// | `beta` | `6 × log10(limit)` |
151/// | `threshold` | `log10(limit)` |
152/// | `increase` | `limit + log10(limit)` |
153/// | `decrease` | `limit − log10(limit)` |
154/// | `probe_multiplier` | 30 |
155pub struct VegasBuilder {
156    initial_limit: usize,
157    max_limit: usize,
158    smoothing: f64,
159    alpha_fn: fn(usize) -> usize,
160    beta_fn: fn(usize) -> usize,
161    threshold_fn: fn(usize) -> usize,
162    increase_fn: fn(f64) -> f64,
163    decrease_fn: fn(f64) -> f64,
164    probe_multiplier: usize,
165}
166
167impl Default for VegasBuilder {
168    fn default() -> Self {
169        Self {
170            initial_limit: 20,
171            max_limit: 1000,
172            smoothing: 1.0,
173            alpha_fn: |limit| 3 * log10(limit),
174            beta_fn: |limit| 6 * log10(limit),
175            threshold_fn: log10,
176            increase_fn: |limit| limit + log10(limit as usize) as f64,
177            decrease_fn: |limit| limit - log10(limit as usize) as f64,
178            probe_multiplier: 30,
179        }
180    }
181}
182
183impl VegasBuilder {
184    /// Sets the starting concurrency limit (default: 20).
185    pub fn initial_limit(mut self, limit: usize) -> Self {
186        self.initial_limit = limit;
187        self
188    }
189
190    /// Sets the upper bound the limit can reach (default: 1000).
191    pub fn max_limit(mut self, limit: usize) -> Self {
192        self.max_limit = limit;
193        self
194    }
195
196    /// Sets the exponential smoothing factor applied to limit changes
197    /// (default: 1.0, i.e. no smoothing). Lower values dampen oscillations.
198    pub fn smoothing(mut self, smoothing: f64) -> Self {
199        self.smoothing = smoothing;
200        self
201    }
202
203    /// Sets the function that computes the alpha (queue-too-short) threshold
204    /// from the current limit (default: `3 × log10(limit)`).
205    pub fn alpha(mut self, f: fn(usize) -> usize) -> Self {
206        self.alpha_fn = f;
207        self
208    }
209
210    /// Sets the function that computes the beta (queue-too-long) threshold
211    /// from the current limit (default: `6 × log10(limit)`).
212    pub fn beta(mut self, f: fn(usize) -> usize) -> Self {
213        self.beta_fn = f;
214        self
215    }
216
217    /// Sets the function that computes the aggressive-increase threshold
218    /// from the current limit (default: `log10(limit)`).
219    pub fn threshold(mut self, f: fn(usize) -> usize) -> Self {
220        self.threshold_fn = f;
221        self
222    }
223
224    /// Sets the function used to increase the limit when the queue is short
225    /// (default: `limit + log10(limit)`).
226    pub fn increase(mut self, f: fn(f64) -> f64) -> Self {
227        self.increase_fn = f;
228        self
229    }
230
231    /// Sets the function used to decrease the limit when the queue is long or
232    /// an error occurs (default: `limit − log10(limit)`).
233    pub fn decrease(mut self, f: fn(f64) -> f64) -> Self {
234        self.decrease_fn = f;
235        self
236    }
237
238    /// Sets the probe multiplier that controls how often the baseline RTT is
239    /// reset (default: 30). The probe fires every `multiplier × limit` requests.
240    pub fn probe_multiplier(mut self, multiplier: usize) -> Self {
241        self.probe_multiplier = multiplier;
242        self
243    }
244
245    /// Builds the [`Vegas`] algorithm with the configured parameters.
246    pub fn build(self) -> Vegas {
247        Vegas {
248            estimated_limit: self.initial_limit as f64,
249            max_limit: self.max_limit,
250            rtt_noload: None,
251            smoothing: self.smoothing,
252            alpha_fn: self.alpha_fn,
253            beta_fn: self.beta_fn,
254            threshold_fn: self.threshold_fn,
255            increase_fn: self.increase_fn,
256            decrease_fn: self.decrease_fn,
257            probe_multiplier: self.probe_multiplier,
258            probe_count: 0,
259            probe_jitter: 0.5 + (self.initial_limit as f64 / self.max_limit as f64) * 0.5,
260        }
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267
268    #[test]
269    fn increase_limit_on_low_queue() {
270        let mut vegas = Vegas::builder().initial_limit(10).build();
271        vegas.rtt_noload = Some(Duration::from_millis(10));
272
273        // RTT just slightly above baseline → small queue → should increase.
274        vegas.update(Duration::from_millis(11), 10, false, false);
275        assert!(vegas.max_concurrency() > 10);
276    }
277
278    #[test]
279    fn decrease_limit_on_high_queue() {
280        let mut vegas = Vegas::builder().initial_limit(10).build();
281        vegas.rtt_noload = Some(Duration::from_millis(10));
282
283        // RTT far above baseline → large queue → should decrease.
284        vegas.update(Duration::from_millis(50), 10, false, false);
285        assert!(vegas.max_concurrency() < 10);
286    }
287
288    #[test]
289    fn decrease_limit_on_error() {
290        let mut vegas = Vegas::builder().initial_limit(10).build();
291        vegas.rtt_noload = Some(Duration::from_millis(10));
292
293        vegas.update(Duration::from_millis(10), 10, true, false);
294        assert!(vegas.max_concurrency() < 10);
295    }
296
297    #[test]
298    fn no_change_within_thresholds() {
299        let mut vegas = Vegas::builder().initial_limit(10).build();
300        vegas.rtt_noload = Some(Duration::from_millis(10));
301
302        // RTT producing queue_size in [alpha, beta] range → no change.
303        // alpha(10) = 3, beta(10) = 6. We need queue_size between 3 and 6.
304        // queue = limit * (1 - noload/rtt) = 10 * (1 - 10/rtt)
305        // For queue = 4: rtt = 10 / (1 - 4/10) = 10/0.6 ≈ 16.67ms
306        vegas.update(Duration::from_nanos(16_670_000), 10, false, false);
307        assert_eq!(vegas.max_concurrency(), 10);
308    }
309
310    #[test]
311    fn canceled_requests_are_ignored() {
312        let mut vegas = Vegas::builder().initial_limit(10).build();
313        vegas.rtt_noload = Some(Duration::from_millis(10));
314
315        vegas.update(Duration::from_millis(50), 10, false, true);
316        assert_eq!(vegas.max_concurrency(), 10);
317    }
318
319    #[test]
320    fn smoothing_dampens_changes() {
321        let mut vegas = Vegas::builder().initial_limit(100).smoothing(0.5).build();
322        vegas.rtt_noload = Some(Duration::from_millis(10));
323
324        // Error → decrease. With smoothing 0.5, the change should be dampened.
325        vegas.update(Duration::from_millis(10), 100, true, false);
326        let limit = vegas.max_concurrency();
327        // Without smoothing: 100 - log10(100) = 98
328        // With smoothing 0.5: 0.5 * 100 + 0.5 * 98 = 99
329        assert_eq!(limit, 99);
330    }
331
332    #[test]
333    fn limit_never_below_one() {
334        let mut vegas = Vegas::builder().initial_limit(1).build();
335        vegas.rtt_noload = Some(Duration::from_millis(10));
336
337        for _ in 0..100 {
338            vegas.update(Duration::from_millis(10), 1, true, false);
339        }
340        assert_eq!(vegas.max_concurrency(), 1);
341    }
342}