congestion_limiter/limits/
gradient.rs

1use std::{
2    ops::RangeInclusive,
3    sync::atomic::{AtomicUsize, Ordering},
4};
5
6use async_trait::async_trait;
7use conv::ConvAsUtil;
8use tokio::sync::Mutex;
9
10use crate::{
11    limits::{defaults, Sample},
12    moving_avg,
13};
14
15use super::{defaults::MIN_SAMPLE_LATENCY, LimitAlgorithm};
16
17/// Delay-based congestion avoidance.
18///
19/// Additive-increase, multiplicative decrease based on change in average latency.
20///
21/// Considers the difference in average latency between a short time window and a longer window.
22/// Changes in these values is considered an indicator of a change in load on the system.
23///
24/// Wrap with a [`crate::limits::windowed::Windowed`] to control the short time window, otherwise the latest
25/// sample is used.
26///
27/// Inspired by TCP congestion control algorithms using delay gradients.
28///
29/// - [Revisiting TCP Congestion Control Using Delay Gradients](https://hal.science/hal-01597987/)
30#[derive(Debug)]
31pub struct Gradient {
32    min_limit: usize,
33    max_limit: usize,
34
35    limit: AtomicUsize,
36    inner: Mutex<Inner>,
37}
38
39#[derive(Debug)]
40struct Inner {
41    long_window_latency: moving_avg::ExpSmoothed,
42    limit: f64,
43}
44
45impl Gradient {
46    const DEFAULT_INCREASE: f64 = 4.;
47    const DEFAULT_INCREASE_MIN_UTILISATION: f64 = 0.8;
48    const DEFAULT_INCREASE_MIN_GRADIENT: f64 = 0.9;
49
50    const DEFAULT_LONG_WINDOW_SAMPLES: u16 = 500;
51
52    const DEFAULT_TOLERANCE: f64 = 2.;
53    const DEFAULT_SMOOTHING: f64 = 0.2;
54
55    #[allow(missing_docs)]
56    pub fn new_with_initial_limit(initial_limit: usize) -> Self {
57        Self::new(
58            initial_limit,
59            defaults::DEFAULT_MIN_LIMIT..=defaults::DEFAULT_MAX_LIMIT,
60        )
61    }
62
63    #[allow(missing_docs)]
64    pub fn new(initial_limit: usize, limit_range: RangeInclusive<usize>) -> Self {
65        assert!(*limit_range.start() >= 1, "Limits must be at least 1");
66        assert!(
67            initial_limit >= *limit_range.start(),
68            "Initial limit less than minimum"
69        );
70        assert!(
71            initial_limit <= *limit_range.end(),
72            "Initial limit more than maximum"
73        );
74
75        Self {
76            min_limit: *limit_range.start(),
77            max_limit: *limit_range.end(),
78
79            limit: AtomicUsize::new(initial_limit),
80            inner: Mutex::new(Inner {
81                long_window_latency: moving_avg::ExpSmoothed::new_with_window_size(
82                    Self::DEFAULT_LONG_WINDOW_SAMPLES,
83                ),
84                limit: initial_limit as f64,
85            }),
86        }
87    }
88
89    #[allow(missing_docs)]
90    pub fn with_max_limit(self, max: usize) -> Self {
91        assert!(max > 0);
92        Self {
93            max_limit: max,
94            ..self
95        }
96    }
97}
98
99#[async_trait]
100impl LimitAlgorithm for Gradient {
101    fn limit(&self) -> usize {
102        self.limit.load(Ordering::Acquire)
103    }
104
105    async fn update(&self, sample: Sample) -> usize {
106        if sample.latency < MIN_SAMPLE_LATENCY {
107            return self.limit.load(Ordering::Acquire);
108        }
109
110        let mut inner = self.inner.lock().await;
111
112        // Update long window
113        let long = inner.long_window_latency.sample(sample.latency);
114
115        let ratio = long.as_secs_f64() / sample.latency.as_secs_f64();
116
117        // Speed up return to baseline after long period of increased load.
118        if ratio > 2.0 {
119            inner.long_window_latency.set(long.mul_f64(0.95));
120        }
121
122        let old_limit = inner.limit;
123
124        // Only apply downwards gradient (when latency has increased).
125        // Limit to >= 0.5 to prevent aggressive load shedding.
126        // Tolerate a given amount of latency difference.
127        let gradient = (Self::DEFAULT_TOLERANCE * ratio).clamp(0.5, 1.0);
128
129        let utilisation = sample.in_flight as f64 / old_limit;
130
131        // Only apply an increase if we're using enough to justify it
132        // and we're not trying to reduce the limit by much.
133        let increase = if utilisation > Self::DEFAULT_INCREASE_MIN_UTILISATION
134            && gradient > Self::DEFAULT_INCREASE_MIN_GRADIENT
135        {
136            Self::DEFAULT_INCREASE
137        } else {
138            0.0
139        };
140
141        // Apply gradient, and allow an additive increase.
142        let mut new_limit = old_limit * gradient + increase;
143        new_limit =
144            old_limit * (1.0 - Self::DEFAULT_SMOOTHING) + new_limit * Self::DEFAULT_SMOOTHING;
145
146        new_limit = (new_limit).clamp(self.min_limit as f64, self.max_limit as f64);
147
148        inner.limit = new_limit;
149
150        let rounded_limit = new_limit
151            .approx()
152            .expect("should be clamped within usize bounds");
153        self.limit.store(rounded_limit, Ordering::Release);
154
155        rounded_limit
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use std::time::Duration;
162
163    use crate::limiter::{DefaultLimiter, Limiter, Outcome};
164
165    use super::*;
166
167    #[tokio::test]
168    async fn it_works() {
169        static INIT_LIMIT: usize = 10;
170        let gradient = Gradient::new_with_initial_limit(INIT_LIMIT);
171
172        let limiter = DefaultLimiter::new(gradient);
173
174        /*
175         * Concurrency = 10
176         * Steady latency
177         */
178        let mut tokens = Vec::with_capacity(10);
179        for _ in 0..10 {
180            let token = limiter.try_acquire().await.unwrap();
181            tokens.push(token);
182        }
183        for mut token in tokens {
184            token.set_latency(Duration::from_millis(25));
185            limiter.release(token, Some(Outcome::Success)).await;
186        }
187        let higher_limit = limiter.limit();
188        assert!(
189            higher_limit > INIT_LIMIT,
190            "steady latency + high concurrency: increase limit"
191        );
192
193        /*
194         * Concurrency = 10
195         * 10x previous latency
196         */
197        let mut tokens = Vec::with_capacity(10);
198        for _ in 0..10 {
199            let mut token = limiter.try_acquire().await.unwrap();
200            token.set_latency(Duration::from_millis(250));
201            tokens.push(token);
202        }
203        for token in tokens {
204            limiter.release(token, Some(Outcome::Success)).await;
205        }
206        assert!(
207            limiter.limit() < higher_limit,
208            "increased latency: decrease limit"
209        );
210    }
211}