1use core::sync::atomic::{AtomicU64, Ordering};
22use core::time::Duration;
23
24use clock_lib::{Clock, Monotonic, SystemClock};
25use event_listener::Event;
26
27use crate::sync::AtomicU32;
31
32#[non_exhaustive]
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum Outcome {
38 Success {
41 rtt: Duration,
43 },
44 Failure,
47}
48
49pub trait AdaptiveStrategy: Send + Sync {
55 fn adjust(&self, current: u32, in_flight: u32, outcome: Outcome) -> u32;
59}
60
61#[derive(Debug, Clone, Copy)]
80pub struct Aimd {
81 increase: u32,
82 decrease: f64,
83}
84
85impl Aimd {
86 #[must_use]
89 pub fn new(increase: u32, decrease: f64) -> Self {
90 Self {
91 increase: increase.max(1),
92 decrease: decrease.clamp(0.0, 1.0),
93 }
94 }
95}
96
97impl Default for Aimd {
98 fn default() -> Self {
100 Self::new(1, 0.5)
101 }
102}
103
104impl AdaptiveStrategy for Aimd {
105 fn adjust(&self, current: u32, in_flight: u32, outcome: Outcome) -> u32 {
106 match outcome {
107 Outcome::Success { .. } if in_flight >= current => {
110 current.saturating_add(self.increase)
111 }
112 Outcome::Success { .. } => current,
113 Outcome::Failure => {
114 let cut = (f64::from(current) * self.decrease) as u32;
115 cut.max(1)
116 }
117 }
118 }
119}
120
121#[derive(Debug)]
142pub struct Vegas {
143 alpha: u32,
144 beta: u32,
145 min_rtt_ns: AtomicU64,
147}
148
149impl Vegas {
150 #[must_use]
153 pub fn new(alpha: u32, beta: u32) -> Self {
154 Self {
155 alpha,
156 beta: beta.max(alpha),
157 min_rtt_ns: AtomicU64::new(u64::MAX),
158 }
159 }
160}
161
162impl Default for Vegas {
163 fn default() -> Self {
165 Self::new(3, 6)
166 }
167}
168
169impl AdaptiveStrategy for Vegas {
170 fn adjust(&self, current: u32, _in_flight: u32, outcome: Outcome) -> u32 {
171 let rtt = match outcome {
172 Outcome::Failure => return (current / 2).max(1),
173 Outcome::Success { rtt } => rtt,
174 };
175 let rtt_ns = u64::try_from(rtt.as_nanos()).unwrap_or(u64::MAX).max(1);
176 let min_ns = self
178 .min_rtt_ns
179 .fetch_min(rtt_ns, Ordering::AcqRel)
180 .min(rtt_ns);
181
182 let queue = u64::from(current).saturating_mul(rtt_ns.saturating_sub(min_ns)) / rtt_ns;
184 if queue < u64::from(self.alpha) {
185 current.saturating_add(1)
186 } else if queue > u64::from(self.beta) {
187 current.saturating_sub(1)
188 } else {
189 current
190 }
191 }
192}
193
194pub struct AdaptiveLimiter<S, C = SystemClock>
218where
219 C: Clock,
220{
221 strategy: S,
222 limit: AtomicU32,
223 in_flight: AtomicU32,
224 floor: u32,
225 ceiling: u32,
226 notify: Event,
227 clock: C,
228}
229
230impl AdaptiveLimiter<core::convert::Infallible> {
231 #[must_use]
233 pub fn builder() -> AdaptiveLimiterBuilder {
234 AdaptiveLimiterBuilder::new()
235 }
236}
237
238impl<S, C> AdaptiveLimiter<S, C>
239where
240 S: AdaptiveStrategy,
241 C: Clock + Clone,
242{
243 fn new(strategy: S, floor: u32, ceiling: u32, initial: u32, clock: C) -> Self {
244 let floor = floor.max(1);
245 let ceiling = ceiling.max(floor);
246 Self {
247 strategy,
248 limit: AtomicU32::new(initial.clamp(floor, ceiling)),
249 in_flight: AtomicU32::new(0),
250 floor,
251 ceiling,
252 notify: Event::new(),
253 clock,
254 }
255 }
256
257 #[must_use]
260 pub fn with_clock<C2>(self, clock: C2) -> AdaptiveLimiter<S, C2>
261 where
262 C2: Clock + Clone,
263 {
264 AdaptiveLimiter::new(
265 self.strategy,
266 self.floor,
267 self.ceiling,
268 self.limit.load(Ordering::Acquire),
269 clock,
270 )
271 }
272
273 #[must_use]
275 pub fn current_limit(&self) -> u32 {
276 self.limit.load(Ordering::Acquire)
277 }
278
279 #[must_use]
281 pub fn in_flight(&self) -> u32 {
282 self.in_flight.load(Ordering::Acquire)
283 }
284
285 #[must_use]
287 pub fn ceiling(&self) -> u32 {
288 self.ceiling
289 }
290
291 fn try_reserve(&self) -> bool {
293 loop {
294 let in_flight = self.in_flight.load(Ordering::Acquire);
295 if in_flight >= self.limit.load(Ordering::Acquire) {
296 return false;
297 }
298 if self
299 .in_flight
300 .compare_exchange_weak(
301 in_flight,
302 in_flight + 1,
303 Ordering::AcqRel,
304 Ordering::Acquire,
305 )
306 .is_ok()
307 {
308 return true;
309 }
310 }
311 }
312
313 #[must_use]
316 pub fn try_acquire(&self) -> Option<AdaptivePermit<'_, S, C>> {
317 self.try_reserve().then(|| AdaptivePermit::new(self))
318 }
319
320 fn settle(&self, outcome: Outcome) {
322 let in_flight = self.in_flight.load(Ordering::Acquire);
325 let current = self.limit.load(Ordering::Acquire);
326 let proposed = self.strategy.adjust(current, in_flight, outcome);
327 let new = proposed.clamp(self.floor, self.ceiling);
328 self.limit.store(new, Ordering::Release);
329 if new != current {
330 crate::obs::rate_change(current, new);
331 }
332 let _ = self.in_flight.fetch_sub(1, Ordering::AcqRel);
333 let _ = self.notify.notify(usize::MAX);
335 }
336
337 fn rtt_since(&self, started: Monotonic) -> Duration {
339 self.clock.now().saturating_duration_since(started)
340 }
341
342 #[inline]
343 fn now(&self) -> Monotonic {
344 self.clock.now()
345 }
346}
347
348#[cfg(feature = "runtime")]
349#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
350impl<S, C> AdaptiveLimiter<S, C>
351where
352 S: AdaptiveStrategy,
353 C: Clock + Clone,
354{
355 pub async fn acquire(&self) -> AdaptivePermit<'_, S, C> {
361 loop {
362 let listener = self.notify.listen();
365 if self.try_reserve() {
366 return AdaptivePermit::new(self);
367 }
368 listener.await;
369 }
370 }
371}
372
373#[must_use = "settle the permit with `.success()` or `.failure()`; dropping it counts as a failure"]
377pub struct AdaptivePermit<'a, S, C>
378where
379 S: AdaptiveStrategy,
380 C: Clock + Clone,
381{
382 limiter: &'a AdaptiveLimiter<S, C>,
383 started: Monotonic,
384 settled: bool,
385}
386
387impl<'a, S, C> AdaptivePermit<'a, S, C>
388where
389 S: AdaptiveStrategy,
390 C: Clock + Clone,
391{
392 fn new(limiter: &'a AdaptiveLimiter<S, C>) -> Self {
393 Self {
394 started: limiter.now(),
395 limiter,
396 settled: false,
397 }
398 }
399
400 pub fn success(mut self) {
403 let rtt = self.limiter.rtt_since(self.started);
404 self.limiter.settle(Outcome::Success { rtt });
405 self.settled = true;
406 }
407
408 pub fn failure(mut self) {
410 self.limiter.settle(Outcome::Failure);
411 self.settled = true;
412 }
413}
414
415impl<S, C> Drop for AdaptivePermit<'_, S, C>
416where
417 S: AdaptiveStrategy,
418 C: Clock + Clone,
419{
420 fn drop(&mut self) {
421 if !self.settled {
422 self.limiter.settle(Outcome::Failure);
423 }
424 }
425}
426
427#[derive(Debug, Clone, Copy)]
429pub struct AdaptiveLimiterBuilder {
430 floor: u32,
431 ceiling: u32,
432 initial: Option<u32>,
433}
434
435impl Default for AdaptiveLimiterBuilder {
436 fn default() -> Self {
437 Self::new()
438 }
439}
440
441impl AdaptiveLimiterBuilder {
442 #[must_use]
444 pub fn new() -> Self {
445 Self {
446 floor: 1,
447 ceiling: 100,
448 initial: None,
449 }
450 }
451
452 #[must_use]
454 pub fn floor(mut self, floor: u32) -> Self {
455 self.floor = floor.max(1);
456 self
457 }
458
459 #[must_use]
462 pub fn ceiling(mut self, ceiling: u32) -> Self {
463 self.ceiling = ceiling;
464 self
465 }
466
467 #[must_use]
469 pub fn initial(mut self, initial: u32) -> Self {
470 self.initial = Some(initial);
471 self
472 }
473
474 #[must_use]
477 pub fn build<S>(self, strategy: S) -> AdaptiveLimiter<S, SystemClock>
478 where
479 S: AdaptiveStrategy,
480 {
481 let initial = self.initial.unwrap_or(self.floor);
482 AdaptiveLimiter::new(
483 strategy,
484 self.floor,
485 self.ceiling,
486 initial,
487 SystemClock::new(),
488 )
489 }
490}
491
492#[cfg(test)]
493mod tests {
494 #![allow(clippy::unwrap_used, clippy::expect_used)]
495
496 use super::{AdaptiveLimiter, AdaptiveStrategy, Aimd, Outcome, Vegas};
497 use clock_lib::ManualClock;
498 use core::time::Duration;
499 use std::sync::Arc;
500
501 fn assert_send_sync<T: Send + Sync>() {}
502
503 #[test]
504 fn test_adaptive_is_send_sync() {
505 assert_send_sync::<AdaptiveLimiter<Aimd>>();
506 assert_send_sync::<AdaptiveLimiter<Vegas>>();
507 }
508
509 #[test]
510 fn test_aimd_adjust_rules() {
511 let aimd = Aimd::new(2, 0.5);
512 assert_eq!(
514 aimd.adjust(
515 10,
516 10,
517 Outcome::Success {
518 rtt: Duration::ZERO
519 }
520 ),
521 12
522 );
523 assert_eq!(
525 aimd.adjust(
526 10,
527 3,
528 Outcome::Success {
529 rtt: Duration::ZERO
530 }
531 ),
532 10
533 );
534 assert_eq!(aimd.adjust(10, 10, Outcome::Failure), 5);
536 }
537
538 #[test]
539 fn test_degradation_drives_limit_to_floor() {
540 let limiter = AdaptiveLimiter::builder()
541 .floor(4)
542 .ceiling(100)
543 .initial(64)
544 .build(Aimd::new(4, 0.5));
545
546 for _ in 0..10 {
548 let permit = limiter.try_acquire().expect("a slot under the limit");
549 permit.failure();
550 }
551 assert_eq!(limiter.current_limit(), 4);
552 }
553
554 #[test]
555 fn test_recovery_drives_limit_up_bounded_by_ceiling() {
556 let limiter = AdaptiveLimiter::builder()
557 .floor(1)
558 .ceiling(8)
559 .initial(1)
560 .build(Aimd::new(1, 0.5));
561
562 for _ in 0..50 {
565 let mut held = Vec::new();
566 while let Some(p) = limiter.try_acquire() {
567 held.push(p);
568 }
569 if let Some(p) = held.pop() {
571 p.success();
572 }
573 for p in held {
574 p.success();
575 }
576 }
577 assert_eq!(limiter.current_limit(), 8, "grows to the ceiling");
578 for _ in 0..20 {
580 let p = limiter.try_acquire().expect("slot");
581 p.success();
582 }
583 assert_eq!(limiter.current_limit(), 8, "never exceeds the ceiling");
584 }
585
586 #[test]
587 fn test_never_admits_more_than_the_limit() {
588 let limiter = AdaptiveLimiter::builder()
589 .floor(3)
590 .ceiling(3)
591 .initial(3)
592 .build(Aimd::default());
593
594 let p1 = limiter.try_acquire().expect("1");
595 let p2 = limiter.try_acquire().expect("2");
596 let p3 = limiter.try_acquire().expect("3");
597 assert_eq!(limiter.in_flight(), 3);
598 assert!(limiter.try_acquire().is_none());
600 drop((p1, p2, p3));
601 }
602
603 #[test]
604 fn test_dropping_permit_counts_as_failure() {
605 let limiter = AdaptiveLimiter::builder()
606 .floor(1)
607 .ceiling(100)
608 .initial(10)
609 .build(Aimd::new(1, 0.5));
610 drop(limiter.try_acquire().expect("slot")); assert_eq!(limiter.current_limit(), 5);
612 assert_eq!(limiter.in_flight(), 0, "the slot is released");
613 }
614
615 #[test]
616 fn test_vegas_grows_on_low_latency_shrinks_on_high() {
617 let clock = Arc::new(ManualClock::new());
618 let limiter = AdaptiveLimiter::builder()
619 .floor(1)
620 .ceiling(100)
621 .initial(20)
622 .build(Vegas::new(3, 6))
623 .with_clock(clock.clone());
624
625 let p = limiter.try_acquire().expect("slot");
627 clock.advance(Duration::from_millis(10));
628 p.success();
629 assert_eq!(limiter.current_limit(), 21);
630
631 let p = limiter.try_acquire().expect("slot");
633 clock.advance(Duration::from_millis(200));
634 p.success();
635 assert!(
636 limiter.current_limit() < 21,
637 "high latency shrinks the limit"
638 );
639 }
640
641 #[cfg(feature = "runtime")]
642 #[tokio::test]
643 async fn test_async_acquire_waits_for_a_freed_slot() {
644 let limiter = Arc::new(
645 AdaptiveLimiter::builder()
646 .floor(1)
647 .ceiling(1)
648 .initial(1)
649 .build(Aimd::default()),
650 );
651
652 let held = limiter.try_acquire().expect("the one slot");
653 assert!(limiter.try_acquire().is_none());
654
655 let l = Arc::clone(&limiter);
656 let waiter = tokio::spawn(async move { l.acquire().await.success() });
657 tokio::task::yield_now().await;
659 held.success();
660 waiter.await.unwrap();
661 }
662}