congestion_limiter/limits/
gradient.rs1use std::{
2 ops::RangeInclusive,
3 sync::atomic::{AtomicUsize, Ordering},
4};
5
6use async_trait::async_trait;
7use conv::ConvAsUtil;
8use tokio::sync::Mutex;
9
10use crate::{
11 limits::{defaults, Sample},
12 moving_avg,
13};
14
15use super::{defaults::MIN_SAMPLE_LATENCY, LimitAlgorithm};
16
17#[derive(Debug)]
31pub struct Gradient {
32 min_limit: usize,
33 max_limit: usize,
34
35 limit: AtomicUsize,
36 inner: Mutex<Inner>,
37}
38
39#[derive(Debug)]
40struct Inner {
41 long_window_latency: moving_avg::ExpSmoothed,
42 limit: f64,
43}
44
45impl Gradient {
46 const DEFAULT_INCREASE: f64 = 4.;
47 const DEFAULT_INCREASE_MIN_UTILISATION: f64 = 0.8;
48 const DEFAULT_INCREASE_MIN_GRADIENT: f64 = 0.9;
49
50 const DEFAULT_LONG_WINDOW_SAMPLES: u16 = 500;
51
52 const DEFAULT_TOLERANCE: f64 = 2.;
53 const DEFAULT_SMOOTHING: f64 = 0.2;
54
55 #[allow(missing_docs)]
56 pub fn new_with_initial_limit(initial_limit: usize) -> Self {
57 Self::new(
58 initial_limit,
59 defaults::DEFAULT_MIN_LIMIT..=defaults::DEFAULT_MAX_LIMIT,
60 )
61 }
62
63 #[allow(missing_docs)]
64 pub fn new(initial_limit: usize, limit_range: RangeInclusive<usize>) -> Self {
65 assert!(*limit_range.start() >= 1, "Limits must be at least 1");
66 assert!(
67 initial_limit >= *limit_range.start(),
68 "Initial limit less than minimum"
69 );
70 assert!(
71 initial_limit <= *limit_range.end(),
72 "Initial limit more than maximum"
73 );
74
75 Self {
76 min_limit: *limit_range.start(),
77 max_limit: *limit_range.end(),
78
79 limit: AtomicUsize::new(initial_limit),
80 inner: Mutex::new(Inner {
81 long_window_latency: moving_avg::ExpSmoothed::new_with_window_size(
82 Self::DEFAULT_LONG_WINDOW_SAMPLES,
83 ),
84 limit: initial_limit as f64,
85 }),
86 }
87 }
88
89 #[allow(missing_docs)]
90 pub fn with_max_limit(self, max: usize) -> Self {
91 assert!(max > 0);
92 Self {
93 max_limit: max,
94 ..self
95 }
96 }
97}
98
99#[async_trait]
100impl LimitAlgorithm for Gradient {
101 fn limit(&self) -> usize {
102 self.limit.load(Ordering::Acquire)
103 }
104
105 async fn update(&self, sample: Sample) -> usize {
106 if sample.latency < MIN_SAMPLE_LATENCY {
107 return self.limit.load(Ordering::Acquire);
108 }
109
110 let mut inner = self.inner.lock().await;
111
112 let long = inner.long_window_latency.sample(sample.latency);
114
115 let ratio = long.as_secs_f64() / sample.latency.as_secs_f64();
116
117 if ratio > 2.0 {
119 inner.long_window_latency.set(long.mul_f64(0.95));
120 }
121
122 let old_limit = inner.limit;
123
124 let gradient = (Self::DEFAULT_TOLERANCE * ratio).clamp(0.5, 1.0);
128
129 let utilisation = sample.in_flight as f64 / old_limit;
130
131 let increase = if utilisation > Self::DEFAULT_INCREASE_MIN_UTILISATION
134 && gradient > Self::DEFAULT_INCREASE_MIN_GRADIENT
135 {
136 Self::DEFAULT_INCREASE
137 } else {
138 0.0
139 };
140
141 let mut new_limit = old_limit * gradient + increase;
143 new_limit =
144 old_limit * (1.0 - Self::DEFAULT_SMOOTHING) + new_limit * Self::DEFAULT_SMOOTHING;
145
146 new_limit = (new_limit).clamp(self.min_limit as f64, self.max_limit as f64);
147
148 inner.limit = new_limit;
149
150 let rounded_limit = new_limit
151 .approx()
152 .expect("should be clamped within usize bounds");
153 self.limit.store(rounded_limit, Ordering::Release);
154
155 rounded_limit
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use std::time::Duration;
162
163 use crate::limiter::{DefaultLimiter, Limiter, Outcome};
164
165 use super::*;
166
167 #[tokio::test]
168 async fn it_works() {
169 static INIT_LIMIT: usize = 10;
170 let gradient = Gradient::new_with_initial_limit(INIT_LIMIT);
171
172 let limiter = DefaultLimiter::new(gradient);
173
174 let mut tokens = Vec::with_capacity(10);
179 for _ in 0..10 {
180 let token = limiter.try_acquire().await.unwrap();
181 tokens.push(token);
182 }
183 for mut token in tokens {
184 token.set_latency(Duration::from_millis(25));
185 limiter.release(token, Some(Outcome::Success)).await;
186 }
187 let higher_limit = limiter.limit();
188 assert!(
189 higher_limit > INIT_LIMIT,
190 "steady latency + high concurrency: increase limit"
191 );
192
193 let mut tokens = Vec::with_capacity(10);
198 for _ in 0..10 {
199 let mut token = limiter.try_acquire().await.unwrap();
200 token.set_latency(Duration::from_millis(250));
201 tokens.push(token);
202 }
203 for token in tokens {
204 limiter.release(token, Some(Outcome::Success)).await;
205 }
206 assert!(
207 limiter.limit() < higher_limit,
208 "increased latency: decrease limit"
209 );
210 }
211}