congestion_limiter/limits/
vegas.rs1use std::{
2 fmt::Debug,
3 ops::RangeInclusive,
4 sync::atomic::{AtomicUsize, Ordering},
5 time::Duration,
6};
7
8use async_trait::async_trait;
9use tokio::sync::Mutex;
10
11use crate::{limiter::Outcome, limits::defaults};
12
13use super::{aimd::multiplicative_decrease, defaults::MIN_SAMPLE_LATENCY, LimitAlgorithm, Sample};
14
15pub struct Vegas {
41 min_limit: usize,
42 max_limit: usize,
43
44 alpha: Box<dyn (Fn(usize) -> f64) + Send + Sync>,
46 beta: Box<dyn (Fn(usize) -> f64) + Send + Sync>,
48
49 limit: AtomicUsize,
50 inner: Mutex<Inner>,
51}
52
53#[derive(Debug)]
54struct Inner {
55 base_latency: Duration,
59}
60
61impl Vegas {
62 const DEFAULT_ALPHA_MULTIPLIER: f64 = 3_f64;
63 const DEFAULT_BETA_MULTIPLIER: f64 = 6_f64;
64
65 const DEFAULT_DECREASE_FACTOR: f64 = 0.9;
67
68 const DEFAULT_INCREASE_MIN_UTILISATION: f64 = 0.8;
70
71 #[allow(missing_docs)]
72 pub fn new_with_initial_limit(initial_limit: usize) -> Self {
73 Self::new(
74 initial_limit,
75 defaults::DEFAULT_MIN_LIMIT..=defaults::DEFAULT_MAX_LIMIT,
76 )
77 }
78
79 #[allow(missing_docs)]
80 pub fn new(initial_limit: usize, limit_range: RangeInclusive<usize>) -> Self {
81 assert!(*limit_range.start() >= 1, "Limits must be at least 1");
82 assert!(
83 initial_limit >= *limit_range.start(),
84 "Initial limit less than minimum"
85 );
86 assert!(
87 initial_limit <= *limit_range.end(),
88 "Initial limit more than maximum"
89 );
90
91 Self {
92 limit: AtomicUsize::new(initial_limit),
93 min_limit: *limit_range.start(),
94 max_limit: *limit_range.end(),
95
96 alpha: Box::new(|limit| {
97 Self::DEFAULT_ALPHA_MULTIPLIER * (limit as f64).log10().max(1_f64)
98 }),
99 beta: Box::new(|limit| {
100 Self::DEFAULT_BETA_MULTIPLIER * (limit as f64).log10().max(1_f64)
101 }),
102
103 inner: Mutex::new(Inner {
104 base_latency: Duration::MAX,
105 }),
106 }
107 }
108
109 #[allow(missing_docs)]
110 pub fn with_max_limit(self, max: usize) -> Self {
111 assert!(max > 0);
112 Self {
113 max_limit: max,
114 ..self
115 }
116 }
117}
118
119#[async_trait]
120impl LimitAlgorithm for Vegas {
121 fn limit(&self) -> usize {
122 self.limit.load(Ordering::Acquire)
123 }
124
125 async fn update(&self, sample: Sample) -> usize {
164 if sample.latency < MIN_SAMPLE_LATENCY {
165 return self.limit.load(Ordering::Acquire);
166 }
167
168 let mut inner = self.inner.lock().await;
169
170 if sample.latency < inner.base_latency {
171 inner.base_latency = sample.latency;
173 }
175
176 let update_limit = |limit: usize| {
177 let actual_rate = sample.in_flight as f64 / sample.latency.as_secs_f64();
180
181 let extra_latency = sample.latency.as_secs_f64() - inner.base_latency.as_secs_f64();
182
183 let estimated_queued_jobs = actual_rate * extra_latency;
184
185 let utilisation = sample.in_flight as f64 / limit as f64;
186
187 let increment = limit.ilog10().max(1) as usize;
188
189 let limit = if sample.outcome == Outcome::Overload {
190 multiplicative_decrease(limit, Self::DEFAULT_DECREASE_FACTOR)
192 } else if estimated_queued_jobs > (self.beta)(limit) {
193 limit - increment
195 } else if estimated_queued_jobs < (self.alpha)(limit)
196 && utilisation >= Self::DEFAULT_INCREASE_MIN_UTILISATION
197 {
198 limit + increment
202 } else {
203 limit
205 };
206
207 Some(limit.clamp(self.min_limit, self.max_limit))
208 };
209
210 self.limit
211 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, update_limit)
212 .expect("we always return Some(limit)");
213
214 self.limit.load(Ordering::SeqCst)
215 }
216}
217
218impl Debug for Vegas {
219 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
220 f.debug_struct("Vegas")
221 .field("limit", &self.limit)
222 .field("min_limit", &self.min_limit)
223 .field("max_limit", &self.max_limit)
224 .field("alpha(1)", &(self.alpha)(1))
225 .field("beta(1)", &(self.beta)(1))
226 .field("inner", &self.inner)
227 .finish()
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use std::{collections::VecDeque, time::Duration};
234
235 use itertools::Itertools;
236
237 use crate::limiter::{DefaultLimiter, Limiter, Outcome};
238
239 use super::*;
240
241 #[tokio::test]
242 async fn it_works() {
243 static INIT_LIMIT: usize = 10;
244 let vegas = Vegas::new_with_initial_limit(INIT_LIMIT);
245
246 let limiter = DefaultLimiter::new(vegas);
247
248 let mut tokens = Vec::with_capacity(5);
255 for _ in 0..5 {
256 let token = limiter.try_acquire().await.unwrap();
257 tokens.push(token);
258 }
259 for mut token in tokens {
260 token.set_latency(Duration::from_millis(25));
261 limiter.release(token, Some(Outcome::Success)).await;
262 }
263
264 let mut tokens = Vec::with_capacity(9);
269 for _ in 0..9 {
270 let token = limiter.try_acquire().await.unwrap();
271 tokens.push(token);
272 }
273 for mut token in tokens {
274 token.set_latency(Duration::from_millis(25));
275 limiter.release(token, Some(Outcome::Success)).await;
276 }
277 let higher_limit = limiter.limit();
278 assert!(
279 higher_limit > INIT_LIMIT,
280 "Steady latency + high concurrency => increase limit"
281 );
282
283 let mut tokens = Vec::with_capacity(10);
288 for _ in 0..10 {
289 let mut token = limiter.try_acquire().await.unwrap();
290 token.set_latency(Duration::from_millis(250));
291 tokens.push(token);
292 }
293 for token in tokens {
294 limiter.release(token, Some(Outcome::Success)).await;
295 }
296 assert!(
297 limiter.limit() < higher_limit,
298 "Increased latency => decrease limit"
299 );
300 }
301
302 #[tokio::test]
303 async fn windowed() {
304 use crate::aggregation::Percentile;
305 use crate::limits::Windowed;
306
307 static INIT_LIMIT: usize = 10;
308 let vegas = Windowed::new(
309 Vegas::new_with_initial_limit(INIT_LIMIT),
310 Percentile::default(),
311 )
312 .with_min_samples(3)
313 .with_min_window(Duration::ZERO)
314 .with_max_window(Duration::ZERO);
315
316 let limiter = DefaultLimiter::new(vegas);
317
318 let mut next_tokens = VecDeque::with_capacity(9);
319
320 for _ in 0..9 {
326 let token = limiter.try_acquire().await.unwrap();
327 next_tokens.push_back(token);
328 }
329
330 let release_tokens = next_tokens.drain(0..).collect_vec();
331 for mut token in release_tokens {
332 token.set_latency(Duration::from_millis(25));
333 limiter.release(token, Some(Outcome::Success)).await;
334
335 let token = limiter.try_acquire().await.unwrap();
336 next_tokens.push_back(token);
337 }
338
339 let release_tokens = next_tokens.drain(0..).collect_vec();
343 for mut token in release_tokens {
344 token.set_latency(Duration::from_millis(25));
345 limiter.release(token, Some(Outcome::Success)).await;
346
347 let token = limiter.try_acquire().await.unwrap();
348 next_tokens.push_back(token);
349 }
350
351 let higher_limit = limiter.limit();
352 assert!(
353 higher_limit > INIT_LIMIT,
354 "Steady latency + high concurrency => increase limit. Limit: {}",
355 higher_limit
356 );
357
358 let release_tokens = next_tokens.drain(0..).collect_vec();
362 for mut token in release_tokens {
363 token.set_latency(Duration::from_millis(1000));
364 limiter.release(token, Some(Outcome::Success)).await;
365
366 let token = limiter.try_acquire().await.unwrap();
367 next_tokens.push_back(token);
368 }
369
370 let lower_limit = limiter.limit();
371 assert!(
372 lower_limit < higher_limit,
373 "Increased latency => decrease limit. Limit: {}",
374 lower_limit
375 );
376 }
377}