Skip to main content

throttle_net/
adaptive.rs

1//! Adaptive concurrency limiting: find the right in-flight limit without being
2//! told it.
3//!
4//! A token bucket needs you to know the downstream's capacity up front. An
5//! [`AdaptiveLimiter`] *discovers* it: it caps the number of in-flight requests
6//! at a limit it adjusts from observed outcomes. When requests succeed (and stay
7//! fast) it lets more through; when they fail or slow down it pulls back. The
8//! limit is bounded by a floor and a ceiling, and **never exceeds the ceiling**,
9//! so the adaptation can only ever be more conservative than your hard cap.
10//!
11//! Two strategies ship: [`Aimd`] (additive increase, multiplicative decrease) and
12//! [`Vegas`] (latency-based, after TCP Vegas), and you can supply your own via
13//! [`AdaptiveStrategy`]. Outcomes are fed back through a [`AdaptivePermit`] — settle it
14//! with [`success`](AdaptivePermit::success) or [`failure`](AdaptivePermit::failure), or let it
15//! drop (counted as a failure).
16//!
17//! Behind the `adaptive` feature. Unlike the rate limiters, the waiting
18//! [`acquire`](AdaptiveLimiter::acquire) blocks on a *slot* freeing, not on a
19//! timer, so its clock is only used to measure round-trip time.
20
21use core::sync::atomic::{AtomicU64, Ordering};
22use core::time::Duration;
23
24use clock_lib::{Clock, Monotonic, SystemClock};
25use event_listener::Event;
26
27// `in_flight` and `limit` route through the loom-aware indirection so the
28// concurrency model check can explore their interleavings; `AtomicU64` (Vegas's
29// min-RTT) is not part of the slot-accounting invariant and stays on `core`.
30use crate::sync::AtomicU32;
31
32/// The observed result of one completed request, fed back to the strategy.
33///
34/// `#[non_exhaustive]`: more signals may be added.
35#[non_exhaustive]
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum Outcome {
38    /// The request succeeded, with this round-trip time. Latency-based strategies
39    /// use the timing; count-based ones treat it as a plain success.
40    Success {
41        /// Measured round-trip time of the request.
42        rtt: Duration,
43    },
44    /// The request failed (an error, a timeout, or a downstream rejection) — a
45    /// signal to back off.
46    Failure,
47}
48
49/// How an [`AdaptiveLimiter`] moves its concurrency limit in response to an
50/// [`Outcome`].
51///
52/// The limiter clamps the returned value to `[floor, ceiling]`, so a strategy
53/// need not enforce the bounds itself.
54pub trait AdaptiveStrategy: Send + Sync {
55    /// Returns the proposed new limit given the `current` limit, the `in_flight`
56    /// count at the time of the observation (including the just-finished request),
57    /// and the `outcome`.
58    fn adjust(&self, current: u32, in_flight: u32, outcome: Outcome) -> u32;
59}
60
61/// Additive-increase, multiplicative-decrease.
62///
63/// On success — but only while the limit is actually being used — the limit grows
64/// by a fixed step; on failure it is cut by a multiplier. The classic congestion
65/// response: probe upward gently, retreat sharply.
66///
67/// # Examples
68///
69/// ```
70/// use throttle_net::{Aimd, AdaptiveLimiter};
71///
72/// // Grow by 1 on a saturated success, halve on failure; limit in [4, 200].
73/// let limiter = AdaptiveLimiter::builder()
74///     .floor(4)
75///     .ceiling(200)
76///     .build(Aimd::new(1, 0.5));
77/// # let _ = limiter;
78/// ```
79#[derive(Debug, Clone, Copy)]
80pub struct Aimd {
81    increase: u32,
82    decrease: f64,
83}
84
85impl Aimd {
86    /// Increase the limit by `increase` per saturated success; multiply by
87    /// `decrease` (in `(0.0, 1.0]`) on failure.
88    #[must_use]
89    pub fn new(increase: u32, decrease: f64) -> Self {
90        Self {
91            increase: increase.max(1),
92            decrease: decrease.clamp(0.0, 1.0),
93        }
94    }
95}
96
97impl Default for Aimd {
98    /// Increase by 1, halve on failure.
99    fn default() -> Self {
100        Self::new(1, 0.5)
101    }
102}
103
104impl AdaptiveStrategy for Aimd {
105    fn adjust(&self, current: u32, in_flight: u32, outcome: Outcome) -> u32 {
106        match outcome {
107            // Only grow when the limit is saturated; inflating an idle limit
108            // would let a later burst overwhelm the downstream.
109            Outcome::Success { .. } if in_flight >= current => {
110                current.saturating_add(self.increase)
111            }
112            Outcome::Success { .. } => current,
113            Outcome::Failure => {
114                let cut = (f64::from(current) * self.decrease) as u32;
115                cut.max(1)
116            }
117        }
118    }
119}
120
121/// Latency-based adaptation, after TCP Vegas.
122///
123/// From the round-trip time it estimates the queue depth at the downstream —
124/// `limit * (rtt - min_rtt) / rtt`, where `min_rtt` is the best (no-load) latency
125/// seen so far. A small estimated queue means there is headroom, so the limit
126/// grows; a large one means a queue is forming, so it shrinks. Failures halve the
127/// limit outright.
128///
129/// # Examples
130///
131/// ```
132/// use throttle_net::{AdaptiveLimiter, Vegas};
133///
134/// // Grow while the estimated queue is below 3, shrink above 6.
135/// let limiter = AdaptiveLimiter::builder()
136///     .floor(2)
137///     .ceiling(100)
138///     .build(Vegas::new(3, 6));
139/// # let _ = limiter;
140/// ```
141#[derive(Debug)]
142pub struct Vegas {
143    alpha: u32,
144    beta: u32,
145    /// Best round-trip time seen, in nanoseconds; the no-load latency estimate.
146    min_rtt_ns: AtomicU64,
147}
148
149impl Vegas {
150    /// Grow while the estimated queue is below `alpha`, shrink above `beta`.
151    /// `beta` is raised to at least `alpha` to keep a stable band between them.
152    #[must_use]
153    pub fn new(alpha: u32, beta: u32) -> Self {
154        Self {
155            alpha,
156            beta: beta.max(alpha),
157            min_rtt_ns: AtomicU64::new(u64::MAX),
158        }
159    }
160}
161
162impl Default for Vegas {
163    /// Grow below an estimated queue of 3, shrink above 6.
164    fn default() -> Self {
165        Self::new(3, 6)
166    }
167}
168
169impl AdaptiveStrategy for Vegas {
170    fn adjust(&self, current: u32, _in_flight: u32, outcome: Outcome) -> u32 {
171        let rtt = match outcome {
172            Outcome::Failure => return (current / 2).max(1),
173            Outcome::Success { rtt } => rtt,
174        };
175        let rtt_ns = u64::try_from(rtt.as_nanos()).unwrap_or(u64::MAX).max(1);
176        // Track the best (no-load) latency seen.
177        let min_ns = self
178            .min_rtt_ns
179            .fetch_min(rtt_ns, Ordering::AcqRel)
180            .min(rtt_ns);
181
182        // Estimated queue depth = current * (rtt - min_rtt) / rtt.
183        let queue = u64::from(current).saturating_mul(rtt_ns.saturating_sub(min_ns)) / rtt_ns;
184        if queue < u64::from(self.alpha) {
185            current.saturating_add(1)
186        } else if queue > u64::from(self.beta) {
187            current.saturating_sub(1)
188        } else {
189            current
190        }
191    }
192}
193
194/// A concurrency limiter whose in-flight limit adapts to observed outcomes.
195///
196/// Build one with [`AdaptiveLimiter::builder`]. Behind the `adaptive` feature.
197///
198/// # Examples
199///
200/// ```
201/// # async fn run() {
202/// use throttle_net::{Aimd, AdaptiveLimiter};
203///
204/// let limiter = AdaptiveLimiter::builder()
205///     .floor(2)
206///     .ceiling(50)
207///     .initial(10)
208///     .build(Aimd::default());
209///
210/// if let Some(permit) = limiter.try_acquire() {
211///     // ... call the downstream, then report how it went ...
212///     let ok = true;
213///     if ok { permit.success() } else { permit.failure() }
214/// }
215/// # }
216/// ```
217pub struct AdaptiveLimiter<S, C = SystemClock>
218where
219    C: Clock,
220{
221    strategy: S,
222    limit: AtomicU32,
223    in_flight: AtomicU32,
224    floor: u32,
225    ceiling: u32,
226    notify: Event,
227    clock: C,
228}
229
230impl AdaptiveLimiter<core::convert::Infallible> {
231    /// Starts building an adaptive limiter.
232    #[must_use]
233    pub fn builder() -> AdaptiveLimiterBuilder {
234        AdaptiveLimiterBuilder::new()
235    }
236}
237
238impl<S, C> AdaptiveLimiter<S, C>
239where
240    S: AdaptiveStrategy,
241    C: Clock + Clone,
242{
243    fn new(strategy: S, floor: u32, ceiling: u32, initial: u32, clock: C) -> Self {
244        let floor = floor.max(1);
245        let ceiling = ceiling.max(floor);
246        Self {
247            strategy,
248            limit: AtomicU32::new(initial.clamp(floor, ceiling)),
249            in_flight: AtomicU32::new(0),
250            floor,
251            ceiling,
252            notify: Event::new(),
253            clock,
254        }
255    }
256
257    /// Replaces the time source (used to measure round-trip time), for
258    /// deterministic tests. Resets the limiter.
259    #[must_use]
260    pub fn with_clock<C2>(self, clock: C2) -> AdaptiveLimiter<S, C2>
261    where
262        C2: Clock + Clone,
263    {
264        AdaptiveLimiter::new(
265            self.strategy,
266            self.floor,
267            self.ceiling,
268            self.limit.load(Ordering::Acquire),
269            clock,
270        )
271    }
272
273    /// The current concurrency limit.
274    #[must_use]
275    pub fn current_limit(&self) -> u32 {
276        self.limit.load(Ordering::Acquire)
277    }
278
279    /// The number of requests currently in flight.
280    #[must_use]
281    pub fn in_flight(&self) -> u32 {
282        self.in_flight.load(Ordering::Acquire)
283    }
284
285    /// The hard ceiling the adapting limit can never exceed.
286    #[must_use]
287    pub fn ceiling(&self) -> u32 {
288        self.ceiling
289    }
290
291    /// Attempts to reserve a slot without waiting.
292    fn try_reserve(&self) -> bool {
293        loop {
294            let in_flight = self.in_flight.load(Ordering::Acquire);
295            if in_flight >= self.limit.load(Ordering::Acquire) {
296                return false;
297            }
298            if self
299                .in_flight
300                .compare_exchange_weak(
301                    in_flight,
302                    in_flight + 1,
303                    Ordering::AcqRel,
304                    Ordering::Acquire,
305                )
306                .is_ok()
307            {
308                return true;
309            }
310        }
311    }
312
313    /// Attempts to admit a request without waiting, returning a [`AdaptivePermit`] when a
314    /// slot is free.
315    #[must_use]
316    pub fn try_acquire(&self) -> Option<AdaptivePermit<'_, S, C>> {
317        self.try_reserve().then(|| AdaptivePermit::new(self))
318    }
319
320    /// Applies an [`Outcome`] to the limit and releases the slot.
321    fn settle(&self, outcome: Outcome) {
322        // `in_flight` still counts this request, so the strategy sees whether the
323        // limit was saturated.
324        let in_flight = self.in_flight.load(Ordering::Acquire);
325        let current = self.limit.load(Ordering::Acquire);
326        let proposed = self.strategy.adjust(current, in_flight, outcome);
327        let new = proposed.clamp(self.floor, self.ceiling);
328        self.limit.store(new, Ordering::Release);
329        if new != current {
330            crate::obs::rate_change(current, new);
331        }
332        let _ = self.in_flight.fetch_sub(1, Ordering::AcqRel);
333        // A slot freed (and the limit may have grown): wake a waiter.
334        let _ = self.notify.notify(usize::MAX);
335    }
336
337    /// Round-trip time since `started`, per this limiter's clock.
338    fn rtt_since(&self, started: Monotonic) -> Duration {
339        self.clock.now().saturating_duration_since(started)
340    }
341
342    #[inline]
343    fn now(&self) -> Monotonic {
344        self.clock.now()
345    }
346}
347
348#[cfg(feature = "runtime")]
349#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
350impl<S, C> AdaptiveLimiter<S, C>
351where
352    S: AdaptiveStrategy,
353    C: Clock + Clone,
354{
355    /// Admits a request, waiting until a slot is free.
356    ///
357    /// Unlike the rate limiters, this waits on a slot being released (or the
358    /// limit growing), not on a timer. Returns a [`AdaptivePermit`] to settle with the
359    /// request's outcome.
360    pub async fn acquire(&self) -> AdaptivePermit<'_, S, C> {
361        loop {
362            // `listen` registers immediately, so a slot freed before the await is
363            // delivered on the first poll (no lost wake-up).
364            let listener = self.notify.listen();
365            if self.try_reserve() {
366                return AdaptivePermit::new(self);
367            }
368            listener.await;
369        }
370    }
371}
372
373/// A reserved concurrency slot. Settle it with [`success`](Self::success) or
374/// [`failure`](Self::failure) after the request completes; dropping it unsettled
375/// records a **failure** and frees the slot.
376#[must_use = "settle the permit with `.success()` or `.failure()`; dropping it counts as a failure"]
377pub struct AdaptivePermit<'a, S, C>
378where
379    S: AdaptiveStrategy,
380    C: Clock + Clone,
381{
382    limiter: &'a AdaptiveLimiter<S, C>,
383    started: Monotonic,
384    settled: bool,
385}
386
387impl<'a, S, C> AdaptivePermit<'a, S, C>
388where
389    S: AdaptiveStrategy,
390    C: Clock + Clone,
391{
392    fn new(limiter: &'a AdaptiveLimiter<S, C>) -> Self {
393        Self {
394            started: limiter.now(),
395            limiter,
396            settled: false,
397        }
398    }
399
400    /// Records a successful request; its round-trip time is measured from when the
401    /// permit was acquired.
402    pub fn success(mut self) {
403        let rtt = self.limiter.rtt_since(self.started);
404        self.limiter.settle(Outcome::Success { rtt });
405        self.settled = true;
406    }
407
408    /// Records a failed request.
409    pub fn failure(mut self) {
410        self.limiter.settle(Outcome::Failure);
411        self.settled = true;
412    }
413}
414
415impl<S, C> Drop for AdaptivePermit<'_, S, C>
416where
417    S: AdaptiveStrategy,
418    C: Clock + Clone,
419{
420    fn drop(&mut self) {
421        if !self.settled {
422            self.limiter.settle(Outcome::Failure);
423        }
424    }
425}
426
427/// Builder for an [`AdaptiveLimiter`].
428#[derive(Debug, Clone, Copy)]
429pub struct AdaptiveLimiterBuilder {
430    floor: u32,
431    ceiling: u32,
432    initial: Option<u32>,
433}
434
435impl Default for AdaptiveLimiterBuilder {
436    fn default() -> Self {
437        Self::new()
438    }
439}
440
441impl AdaptiveLimiterBuilder {
442    /// Creates a builder with a floor of 1 and a ceiling of 100.
443    #[must_use]
444    pub fn new() -> Self {
445        Self {
446            floor: 1,
447            ceiling: 100,
448            initial: None,
449        }
450    }
451
452    /// Sets the minimum concurrency limit (clamped to at least 1).
453    #[must_use]
454    pub fn floor(mut self, floor: u32) -> Self {
455        self.floor = floor.max(1);
456        self
457    }
458
459    /// Sets the maximum concurrency limit — the hard ceiling the adapting limit
460    /// never exceeds.
461    #[must_use]
462    pub fn ceiling(mut self, ceiling: u32) -> Self {
463        self.ceiling = ceiling;
464        self
465    }
466
467    /// Sets the starting limit. Defaults to the floor (probe up from cautious).
468    #[must_use]
469    pub fn initial(mut self, initial: u32) -> Self {
470        self.initial = Some(initial);
471        self
472    }
473
474    /// Builds the limiter with the given adaptation `strategy`, driven by the
475    /// system clock.
476    #[must_use]
477    pub fn build<S>(self, strategy: S) -> AdaptiveLimiter<S, SystemClock>
478    where
479        S: AdaptiveStrategy,
480    {
481        let initial = self.initial.unwrap_or(self.floor);
482        AdaptiveLimiter::new(
483            strategy,
484            self.floor,
485            self.ceiling,
486            initial,
487            SystemClock::new(),
488        )
489    }
490}
491
492#[cfg(test)]
493mod tests {
494    #![allow(clippy::unwrap_used, clippy::expect_used)]
495
496    use super::{AdaptiveLimiter, AdaptiveStrategy, Aimd, Outcome, Vegas};
497    use clock_lib::ManualClock;
498    use core::time::Duration;
499    use std::sync::Arc;
500
501    fn assert_send_sync<T: Send + Sync>() {}
502
503    #[test]
504    fn test_adaptive_is_send_sync() {
505        assert_send_sync::<AdaptiveLimiter<Aimd>>();
506        assert_send_sync::<AdaptiveLimiter<Vegas>>();
507    }
508
509    #[test]
510    fn test_aimd_adjust_rules() {
511        let aimd = Aimd::new(2, 0.5);
512        // Saturated success grows by the increase.
513        assert_eq!(
514            aimd.adjust(
515                10,
516                10,
517                Outcome::Success {
518                    rtt: Duration::ZERO
519                }
520            ),
521            12
522        );
523        // Unsaturated success holds.
524        assert_eq!(
525            aimd.adjust(
526                10,
527                3,
528                Outcome::Success {
529                    rtt: Duration::ZERO
530                }
531            ),
532            10
533        );
534        // Failure halves.
535        assert_eq!(aimd.adjust(10, 10, Outcome::Failure), 5);
536    }
537
538    #[test]
539    fn test_degradation_drives_limit_to_floor() {
540        let limiter = AdaptiveLimiter::builder()
541            .floor(4)
542            .ceiling(100)
543            .initial(64)
544            .build(Aimd::new(4, 0.5));
545
546        // A run of failures collapses the limit to the floor, never below.
547        for _ in 0..10 {
548            let permit = limiter.try_acquire().expect("a slot under the limit");
549            permit.failure();
550        }
551        assert_eq!(limiter.current_limit(), 4);
552    }
553
554    #[test]
555    fn test_recovery_drives_limit_up_bounded_by_ceiling() {
556        let limiter = AdaptiveLimiter::builder()
557            .floor(1)
558            .ceiling(8)
559            .initial(1)
560            .build(Aimd::new(1, 0.5));
561
562        // Saturated successes grow the limit one step at a time up to the ceiling,
563        // and never past it. Hold `permit`s so the limit stays saturated.
564        for _ in 0..50 {
565            let mut held = Vec::new();
566            while let Some(p) = limiter.try_acquire() {
567                held.push(p);
568            }
569            // Settle one as a saturated success to nudge the limit up.
570            if let Some(p) = held.pop() {
571                p.success();
572            }
573            for p in held {
574                p.success();
575            }
576        }
577        assert_eq!(limiter.current_limit(), 8, "grows to the ceiling");
578        // Many more successes can never push it past the ceiling.
579        for _ in 0..20 {
580            let p = limiter.try_acquire().expect("slot");
581            p.success();
582        }
583        assert_eq!(limiter.current_limit(), 8, "never exceeds the ceiling");
584    }
585
586    #[test]
587    fn test_never_admits_more_than_the_limit() {
588        let limiter = AdaptiveLimiter::builder()
589            .floor(3)
590            .ceiling(3)
591            .initial(3)
592            .build(Aimd::default());
593
594        let p1 = limiter.try_acquire().expect("1");
595        let p2 = limiter.try_acquire().expect("2");
596        let p3 = limiter.try_acquire().expect("3");
597        assert_eq!(limiter.in_flight(), 3);
598        // The limit (and ceiling) is 3, so a fourth is refused.
599        assert!(limiter.try_acquire().is_none());
600        drop((p1, p2, p3));
601    }
602
603    #[test]
604    fn test_dropping_permit_counts_as_failure() {
605        let limiter = AdaptiveLimiter::builder()
606            .floor(1)
607            .ceiling(100)
608            .initial(10)
609            .build(Aimd::new(1, 0.5));
610        drop(limiter.try_acquire().expect("slot")); // unsettled -> failure
611        assert_eq!(limiter.current_limit(), 5);
612        assert_eq!(limiter.in_flight(), 0, "the slot is released");
613    }
614
615    #[test]
616    fn test_vegas_grows_on_low_latency_shrinks_on_high() {
617        let clock = Arc::new(ManualClock::new());
618        let limiter = AdaptiveLimiter::builder()
619            .floor(1)
620            .ceiling(100)
621            .initial(20)
622            .build(Vegas::new(3, 6))
623            .with_clock(clock.clone());
624
625        // First success establishes the min RTT at 10ms (queue estimate 0 -> grow).
626        let p = limiter.try_acquire().expect("slot");
627        clock.advance(Duration::from_millis(10));
628        p.success();
629        assert_eq!(limiter.current_limit(), 21);
630
631        // A much slower request (200ms) implies a deep queue -> shrink.
632        let p = limiter.try_acquire().expect("slot");
633        clock.advance(Duration::from_millis(200));
634        p.success();
635        assert!(
636            limiter.current_limit() < 21,
637            "high latency shrinks the limit"
638        );
639    }
640
641    #[cfg(feature = "runtime")]
642    #[tokio::test]
643    async fn test_async_acquire_waits_for_a_freed_slot() {
644        let limiter = Arc::new(
645            AdaptiveLimiter::builder()
646                .floor(1)
647                .ceiling(1)
648                .initial(1)
649                .build(Aimd::default()),
650        );
651
652        let held = limiter.try_acquire().expect("the one slot");
653        assert!(limiter.try_acquire().is_none());
654
655        let l = Arc::clone(&limiter);
656        let waiter = tokio::spawn(async move { l.acquire().await.success() });
657        // Give the waiter a moment to park, then free the slot.
658        tokio::task::yield_now().await;
659        held.success();
660        waiter.await.unwrap();
661    }
662}