congestion_limiter/limits/
vegas.rs

1use std::{
2    fmt::Debug,
3    ops::RangeInclusive,
4    sync::atomic::{AtomicUsize, Ordering},
5    time::Duration,
6};
7
8use async_trait::async_trait;
9use tokio::sync::Mutex;
10
11use crate::{limiter::Outcome, limits::defaults};
12
13use super::{aimd::multiplicative_decrease, defaults::MIN_SAMPLE_LATENCY, LimitAlgorithm, Sample};
14
15/// Loss- and delay-based congestion avoidance.
16///
17/// Additive increase, additive decrease. Multiplicative decrease when overload detected.
18///
19/// Estimates queuing delay by comparing the current latency with the minimum observed latency to
20/// estimate the number of jobs being queued.
21///
22/// For greater stability consider wrapping with a percentile window sampler. This calculates a
23/// percentile (e.g. P90) over a period of time and provides that as a sample. Vegas then compares
24/// recent P90 latency with the minimum observed P90. Used this way, Vegas can handle heterogeneous
25/// workloads, as long as the percentile latency is fairly stable.
26///
27/// Can fairly distribute concurrency between independent clients as long as there is enough server
28/// capacity to handle the requests. That is: as long as the server isn't overloaded and failing to
29/// handle requests as a result.
30///
31/// Inspired by TCP Vegas.
32///
33/// - [TCP Vegas: End to End Congestion Avoidance on a Global
34///   Internet](https://www.cs.princeton.edu/courses/archive/fall06/cos561/papers/vegas.pdf)
35/// - [Understanding TCP Vegas: Theory and
36///   Practice](https://www.cs.princeton.edu/research/techreps/TR-628-00)
37/// - [A TCP Vegas Implementation for Linux](http://neal.nu/uw/linux-vegas/)
38/// - [Linux kernel
39///   implementation](https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/net/ipv4/tcp_vegas.c)
40pub struct Vegas {
41    min_limit: usize,
42    max_limit: usize,
43
44    /// Lower queueing threshold, as a function of the current limit.
45    alpha: Box<dyn (Fn(usize) -> f64) + Send + Sync>,
46    /// Upper queueing threshold, as a function of the current limit.
47    beta: Box<dyn (Fn(usize) -> f64) + Send + Sync>,
48
49    limit: AtomicUsize,
50    inner: Mutex<Inner>,
51}
52
53#[derive(Debug)]
54struct Inner {
55    /// The minimum observed latency, used as a baseline.
56    ///
57    /// This is the latency we would expect to see if there is no congestion.
58    base_latency: Duration,
59}
60
61impl Vegas {
62    const DEFAULT_ALPHA_MULTIPLIER: f64 = 3_f64;
63    const DEFAULT_BETA_MULTIPLIER: f64 = 6_f64;
64
65    /// Used when we see overload occurring.
66    const DEFAULT_DECREASE_FACTOR: f64 = 0.9;
67
68    /// Utilisation needs to be above this to increase the limit.
69    const DEFAULT_INCREASE_MIN_UTILISATION: f64 = 0.8;
70
71    #[allow(missing_docs)]
72    pub fn new_with_initial_limit(initial_limit: usize) -> Self {
73        Self::new(
74            initial_limit,
75            defaults::DEFAULT_MIN_LIMIT..=defaults::DEFAULT_MAX_LIMIT,
76        )
77    }
78
79    #[allow(missing_docs)]
80    pub fn new(initial_limit: usize, limit_range: RangeInclusive<usize>) -> Self {
81        assert!(*limit_range.start() >= 1, "Limits must be at least 1");
82        assert!(
83            initial_limit >= *limit_range.start(),
84            "Initial limit less than minimum"
85        );
86        assert!(
87            initial_limit <= *limit_range.end(),
88            "Initial limit more than maximum"
89        );
90
91        Self {
92            limit: AtomicUsize::new(initial_limit),
93            min_limit: *limit_range.start(),
94            max_limit: *limit_range.end(),
95
96            alpha: Box::new(|limit| {
97                Self::DEFAULT_ALPHA_MULTIPLIER * (limit as f64).log10().max(1_f64)
98            }),
99            beta: Box::new(|limit| {
100                Self::DEFAULT_BETA_MULTIPLIER * (limit as f64).log10().max(1_f64)
101            }),
102
103            inner: Mutex::new(Inner {
104                base_latency: Duration::MAX,
105            }),
106        }
107    }
108
109    #[allow(missing_docs)]
110    pub fn with_max_limit(self, max: usize) -> Self {
111        assert!(max > 0);
112        Self {
113            max_limit: max,
114            ..self
115        }
116    }
117}
118
119#[async_trait]
120impl LimitAlgorithm for Vegas {
121    fn limit(&self) -> usize {
122        self.limit.load(Ordering::Acquire)
123    }
124
125    /// Vegas algorithm.
126    ///
127    /// Generally applied over a window size of one or two RTTs.
128    ///
129    /// Little's law: `L = λW = concurrency = rate * latency` (averages).
130    ///
131    /// The algorithm in terms of rates:
132    ///
133    /// ```text
134    /// BASE_D = estimated base latency with no queueing
135    /// D(w)   = observed average latency per job over window w
136    /// L(w)   = concurrency limit for window w
137    /// F(w)   = average jobs in flight during window w
138    ///
139    /// L(w) / BASE_D = E    = expected rate (no queueing)
140    /// F(w) / D(w)   = A(w) = actual rate during window w
141    ///
142    /// E - A(w) = DIFF [>= 0]
143    ///
144    /// alpha = low rate threshold: too little queueing
145    /// beta  = high rate threshold: too much queueing
146    ///
147    /// L(w+1) = L(w) + 1 if DIFF < alpha
148    ///               - 1 if DIFF > beta
149    /// ```
150    ///
151    /// Or, using queue size instead of rate:
152    ///
153    /// ```text
154    /// D(w) - BASE_D = ΔD(w) = extra average latency in window w caused by queueing
155    /// A(w) * ΔD(w)  = Q(w)  = estimated average queue size in window w
156    ///
157    /// alpha = low queueing threshold
158    /// beta  = high queueing threshold
159    ///
160    /// L(w+1) = L(w) + 1 if Q(w) < alpha
161    ///               - 1 if Q(w) > beta
162    /// ```
163    async fn update(&self, sample: Sample) -> usize {
164        if sample.latency < MIN_SAMPLE_LATENCY {
165            return self.limit.load(Ordering::Acquire);
166        }
167
168        let mut inner = self.inner.lock().await;
169
170        if sample.latency < inner.base_latency {
171            // Record a baseline "no load" latency and keep the limit.
172            inner.base_latency = sample.latency;
173            // return self.limit.load(Ordering::Acquire);
174        }
175
176        let update_limit = |limit: usize| {
177            // TODO: periodically reset baseline latency measurement.
178
179            let actual_rate = sample.in_flight as f64 / sample.latency.as_secs_f64();
180
181            let extra_latency = sample.latency.as_secs_f64() - inner.base_latency.as_secs_f64();
182
183            let estimated_queued_jobs = actual_rate * extra_latency;
184
185            let utilisation = sample.in_flight as f64 / limit as f64;
186
187            let increment = limit.ilog10().max(1) as usize;
188
189            let limit = if sample.outcome == Outcome::Overload {
190                // Limit too big – overload
191                multiplicative_decrease(limit, Self::DEFAULT_DECREASE_FACTOR)
192            } else if estimated_queued_jobs > (self.beta)(limit) {
193                // Limit too big – too much queueing
194                limit - increment
195            } else if estimated_queued_jobs < (self.alpha)(limit)
196                && utilisation >= Self::DEFAULT_INCREASE_MIN_UTILISATION
197            {
198                // Limit too small – low queueing + high utilisation
199
200                // TODO: support some kind of fast start, e.g. increase by beta when almost no queueing
201                limit + increment
202            } else {
203                // Perfect porridge
204                limit
205            };
206
207            Some(limit.clamp(self.min_limit, self.max_limit))
208        };
209
210        self.limit
211            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, update_limit)
212            .expect("we always return Some(limit)");
213
214        self.limit.load(Ordering::SeqCst)
215    }
216}
217
218impl Debug for Vegas {
219    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
220        f.debug_struct("Vegas")
221            .field("limit", &self.limit)
222            .field("min_limit", &self.min_limit)
223            .field("max_limit", &self.max_limit)
224            .field("alpha(1)", &(self.alpha)(1))
225            .field("beta(1)", &(self.beta)(1))
226            .field("inner", &self.inner)
227            .finish()
228    }
229}
230
231#[cfg(test)]
232mod tests {
233    use std::{collections::VecDeque, time::Duration};
234
235    use itertools::Itertools;
236
237    use crate::limiter::{DefaultLimiter, Limiter, Outcome};
238
239    use super::*;
240
241    #[tokio::test]
242    async fn it_works() {
243        static INIT_LIMIT: usize = 10;
244        let vegas = Vegas::new_with_initial_limit(INIT_LIMIT);
245
246        let limiter = DefaultLimiter::new(vegas);
247
248        /*
249         * Warm up
250         *
251         * Concurrency = 5
252         * Steady latency
253         */
254        let mut tokens = Vec::with_capacity(5);
255        for _ in 0..5 {
256            let token = limiter.try_acquire().await.unwrap();
257            tokens.push(token);
258        }
259        for mut token in tokens {
260            token.set_latency(Duration::from_millis(25));
261            limiter.release(token, Some(Outcome::Success)).await;
262        }
263
264        /*
265         * Concurrency = 9
266         * Steady latency
267         */
268        let mut tokens = Vec::with_capacity(9);
269        for _ in 0..9 {
270            let token = limiter.try_acquire().await.unwrap();
271            tokens.push(token);
272        }
273        for mut token in tokens {
274            token.set_latency(Duration::from_millis(25));
275            limiter.release(token, Some(Outcome::Success)).await;
276        }
277        let higher_limit = limiter.limit();
278        assert!(
279            higher_limit > INIT_LIMIT,
280            "Steady latency + high concurrency => increase limit"
281        );
282
283        /*
284         * Concurrency = 10
285         * 10x previous latency
286         */
287        let mut tokens = Vec::with_capacity(10);
288        for _ in 0..10 {
289            let mut token = limiter.try_acquire().await.unwrap();
290            token.set_latency(Duration::from_millis(250));
291            tokens.push(token);
292        }
293        for token in tokens {
294            limiter.release(token, Some(Outcome::Success)).await;
295        }
296        assert!(
297            limiter.limit() < higher_limit,
298            "Increased latency => decrease limit"
299        );
300    }
301
302    #[tokio::test]
303    async fn windowed() {
304        use crate::aggregation::Percentile;
305        use crate::limits::Windowed;
306
307        static INIT_LIMIT: usize = 10;
308        let vegas = Windowed::new(
309            Vegas::new_with_initial_limit(INIT_LIMIT),
310            Percentile::default(),
311        )
312        .with_min_samples(3)
313        .with_min_window(Duration::ZERO)
314        .with_max_window(Duration::ZERO);
315
316        let limiter = DefaultLimiter::new(vegas);
317
318        let mut next_tokens = VecDeque::with_capacity(9);
319
320        /*
321         * Warm up
322         *
323         * Steady latency, keeping concurrency high
324         */
325        for _ in 0..9 {
326            let token = limiter.try_acquire().await.unwrap();
327            next_tokens.push_back(token);
328        }
329
330        let release_tokens = next_tokens.drain(0..).collect_vec();
331        for mut token in release_tokens {
332            token.set_latency(Duration::from_millis(25));
333            limiter.release(token, Some(Outcome::Success)).await;
334
335            let token = limiter.try_acquire().await.unwrap();
336            next_tokens.push_back(token);
337        }
338
339        /*
340         * Steady latency
341         */
342        let release_tokens = next_tokens.drain(0..).collect_vec();
343        for mut token in release_tokens {
344            token.set_latency(Duration::from_millis(25));
345            limiter.release(token, Some(Outcome::Success)).await;
346
347            let token = limiter.try_acquire().await.unwrap();
348            next_tokens.push_back(token);
349        }
350
351        let higher_limit = limiter.limit();
352        assert!(
353            higher_limit > INIT_LIMIT,
354            "Steady latency + high concurrency => increase limit. Limit: {}",
355            higher_limit
356        );
357
358        /*
359         * 40x previous latency
360         */
361        let release_tokens = next_tokens.drain(0..).collect_vec();
362        for mut token in release_tokens {
363            token.set_latency(Duration::from_millis(1000));
364            limiter.release(token, Some(Outcome::Success)).await;
365
366            let token = limiter.try_acquire().await.unwrap();
367            next_tokens.push_back(token);
368        }
369
370        let lower_limit = limiter.limit();
371        assert!(
372            lower_limit < higher_limit,
373            "Increased latency => decrease limit. Limit: {}",
374            lower_limit
375        );
376    }
377}