Skip to main content

throttle_core/
lib.rs

1//! Tokio-free token-bucket admission control with loom-checkable
2//! atomics.
3//!
4//! [`Throttle`] is a fixed-capacity bucket that refills at a
5//! configured rate. Callers ask for `n` tokens with
6//! [`Throttle::try_acquire`] (non-blocking) or
7//! [`Throttle::acquire_blocking`] (sync, sleeps the calling
8//! thread until the bucket has enough). The bucket caps the
9//! long-run rate at `refill_per_sec` tokens per second while
10//! still allowing short bursts up to `capacity` tokens.
11//!
12//! This crate intentionally has no async runtime dependency. The
13//! `dynomite` crate wraps [`Throttle`] in a tokio-aware adapter
14//! that uses `tokio::time::sleep` instead of
15//! [`std::thread::sleep`] in the wait loop. The two layers share
16//! the same algorithm and the same invariants.
17//!
18//! # Loom support
19//!
20//! Under `RUSTFLAGS='--cfg loom'` the atomics and mutex used by
21//! [`Throttle`] are sourced from the [`loom`] crate's
22//! shadow-`std` modules. This lets a model checker explore every
23//! legal interleaving of the CAS loop in
24//! [`Throttle::try_acquire`] and verify that no interleaving
25//! over-grants tokens. The clock is abstracted behind the
26//! [`Clock`] trait so loom tests can drive a deterministic
27//! [`ManualClock`] rather than wall-clock time.
28//!
29//! # Examples
30//!
31//! ```no_run
32//! use throttle_core::Throttle;
33//! let t: Throttle = Throttle::new(8, 4); // burst 8, sustain 4 tokens/sec
34//! assert!(t.try_acquire(8));   // burst the whole bucket
35//! assert!(!t.try_acquire(8));  // empty now, fast-fail
36//! ```
37//!
38//! The example uses `no_run` because the actual atomics under
39//! `RUSTFLAGS='--cfg loom'` require a `loom::model()` context.
40//! Behavioural correctness is exercised by the in-crate unit
41//! tests and the integration tests under `tests/`.
42
43#![forbid(unsafe_code)]
44
45use std::time::{Duration, Instant};
46
47#[cfg(loom)]
48use loom::sync::atomic::{AtomicU64, Ordering};
49#[cfg(loom)]
50use loom::sync::Mutex;
51
52#[cfg(not(loom))]
53use std::sync::atomic::{AtomicU64, Ordering};
54#[cfg(not(loom))]
55use std::sync::Mutex;
56
57/// A monotonic time source.
58///
59/// Production code uses [`SystemClock`], which delegates to
60/// [`Instant::now`]. Tests and loom models inject a
61/// [`ManualClock`] so the refill computation runs against
62/// deterministic timestamps.
63pub trait Clock: Send + Sync {
64    /// Returns a monotonic instant. Successive calls must never
65    /// return an earlier instant than a prior call.
66    fn now(&self) -> Instant;
67}
68
69/// Clock implementation backed by [`Instant::now`].
70#[derive(Debug, Default, Clone, Copy)]
71pub struct SystemClock;
72
73impl Clock for SystemClock {
74    fn now(&self) -> Instant {
75        Instant::now()
76    }
77}
78
79// Blanket impls so callers can share a clock through `Arc` or
80// pass a borrowed reference into [`Throttle::with_clock`]
81// without wrapping it in a newtype. We provide separate impls
82// for the two `Arc` flavours (`std`'s and `loom`'s) so loom
83// tests can build `Arc<ManualClock>` from `loom::sync::Arc`
84// while the default build keeps using `std::sync::Arc`.
85impl<C: Clock + ?Sized> Clock for &C {
86    fn now(&self) -> Instant {
87        (**self).now()
88    }
89}
90
91impl<C: Clock + ?Sized> Clock for std::sync::Arc<C> {
92    fn now(&self) -> Instant {
93        (**self).now()
94    }
95}
96
97#[cfg(loom)]
98impl<C: Clock + ?Sized> Clock for loom::sync::Arc<C> {
99    fn now(&self) -> Instant {
100        (**self).now()
101    }
102}
103
104/// Deterministic clock for tests and loom models.
105///
106/// The clock starts at the [`Instant`] captured at construction
107/// time and advances only when callers invoke
108/// [`ManualClock::advance`]. Internally the offset is stored as
109/// nanoseconds in an [`AtomicU64`], so the clock is `Send + Sync`
110/// and can be shared across threads (or loom-modelled threads)
111/// without further wrapping.
112#[derive(Debug)]
113pub struct ManualClock {
114    base: Instant,
115    offset_nanos: AtomicU64,
116}
117
118impl ManualClock {
119    /// Creates a clock anchored at the calling-thread's current
120    /// [`Instant`] with zero offset.
121    pub fn new() -> Self {
122        Self {
123            base: Instant::now(),
124            offset_nanos: AtomicU64::new(0),
125        }
126    }
127
128    /// Advances the clock by `delta`. Saturates at
129    /// [`u64::MAX`] nanoseconds (~584 years), which is plenty for
130    /// any realistic test horizon.
131    pub fn advance(&self, delta: Duration) {
132        let add = u64::try_from(delta.as_nanos()).unwrap_or(u64::MAX);
133        // Saturating add: a wraparound here would silently rewind
134        // the clock and break the monotonicity contract.
135        let mut cur = self.offset_nanos.load(Ordering::Acquire);
136        loop {
137            let next = cur.saturating_add(add);
138            match self.offset_nanos.compare_exchange_weak(
139                cur,
140                next,
141                Ordering::AcqRel,
142                Ordering::Acquire,
143            ) {
144                Ok(_) => return,
145                Err(actual) => cur = actual,
146            }
147        }
148    }
149}
150
151impl Default for ManualClock {
152    fn default() -> Self {
153        Self::new()
154    }
155}
156
157impl Clock for ManualClock {
158    fn now(&self) -> Instant {
159        self.base + Duration::from_nanos(self.offset_nanos.load(Ordering::Acquire))
160    }
161}
162
163/// Errors returned by [`Throttle::acquire_blocking`].
164#[derive(Debug, thiserror::Error, PartialEq, Eq)]
165pub enum ThrottleError {
166    /// The caller asked for more tokens than the bucket can ever
167    /// hold. Waiting would deadlock; the request must be rejected
168    /// at the source.
169    #[error("requested {requested} tokens exceeds capacity {capacity}")]
170    RequestExceedsCapacity {
171        /// Tokens the caller asked for.
172        requested: u64,
173        /// Bucket capacity.
174        capacity: u64,
175    },
176    /// The bucket has a zero refill rate and the initial capacity
177    /// is insufficient for the request. There is no way to ever
178    /// satisfy the request, so blocking is pointless.
179    #[error(
180        "zero refill rate cannot satisfy acquire of {requested} tokens \
181         (only {available} available)"
182    )]
183    ZeroRefillExhausted {
184        /// Tokens the caller asked for.
185        requested: u64,
186        /// Tokens currently in the bucket.
187        available: u64,
188    },
189}
190
191/// Token-bucket admission control gate.
192///
193/// The bucket is initialised full, so the first burst of up to
194/// `capacity` tokens is granted immediately. Tokens accrue at
195/// `refill_per_sec` per second up to `capacity`, computed lazily
196/// against the [`Clock`] on each [`Throttle::try_acquire`] call.
197///
198/// `Throttle` is generic over the clock so loom and unit tests
199/// can inject a [`ManualClock`]. Production code uses the
200/// [`SystemClock`] default.
201pub struct Throttle<C: Clock = SystemClock> {
202    capacity: u64,
203    refill_per_sec: u64,
204    available: AtomicU64,
205    last_refill: Mutex<Instant>,
206    clock: C,
207}
208
209impl Throttle<SystemClock> {
210    /// Builds a throttle backed by the [`SystemClock`].
211    ///
212    /// The bucket starts full; the first acquire of up to
213    /// `capacity` tokens succeeds without waiting.
214    pub fn new(capacity: u64, refill_per_sec: u64) -> Self {
215        Self::with_clock(capacity, refill_per_sec, SystemClock)
216    }
217}
218
219impl<C: Clock> Throttle<C> {
220    /// Builds a throttle that consults `clock` for refill timing.
221    ///
222    /// The bucket starts full and the last-refill timestamp is
223    /// captured from `clock` at construction time. Subsequent
224    /// `clock.now()` values must be monotonic relative to that
225    /// initial reading.
226    pub fn with_clock(capacity: u64, refill_per_sec: u64, clock: C) -> Self {
227        let now = clock.now();
228        Self {
229            capacity,
230            refill_per_sec,
231            available: AtomicU64::new(capacity),
232            last_refill: Mutex::new(now),
233            clock,
234        }
235    }
236
237    /// Burst capacity (the maximum number of tokens that may be
238    /// acquired in one go).
239    pub fn capacity(&self) -> u64 {
240        self.capacity
241    }
242
243    /// Sustained refill rate in tokens per second.
244    pub fn refill_per_sec(&self) -> u64 {
245        self.refill_per_sec
246    }
247
248    /// Best-effort snapshot of the currently available tokens.
249    ///
250    /// Useful for tests and diagnostics; do not branch on this
251    /// in admission code (use [`Throttle::try_acquire`] instead).
252    pub fn available(&self) -> u64 {
253        self.available.load(Ordering::Acquire)
254    }
255
256    /// Tries to take `n` tokens.
257    ///
258    /// Returns `true` on success and `false` if the bucket does
259    /// not currently hold `n` tokens. The bucket is refilled
260    /// from the clock-elapsed interval before the check.
261    ///
262    /// Requesting `n > capacity` always returns `false`: the
263    /// bucket can never hold that many tokens, so blocking would
264    /// be pointless.
265    pub fn try_acquire(&self, n: u64) -> bool {
266        if n > self.capacity {
267            return false;
268        }
269        self.refill();
270        if n == 0 {
271            return true;
272        }
273        self.consume(n)
274    }
275
276    /// Acquires `n` tokens, sleeping the calling thread if the
277    /// bucket is empty.
278    ///
279    /// This is the synchronous counterpart of `dynomite`'s
280    /// async `Throttle::acquire`. The wait loop sleeps for the
281    /// time required to refill the missing tokens at
282    /// `refill_per_sec`, clamped to the range 1 ms .. 1 s so a
283    /// fractional refill never spins tightly and a misconfigured
284    /// throttle still polls regularly.
285    ///
286    /// # Errors
287    ///
288    /// * [`ThrottleError::RequestExceedsCapacity`] if `n` is
289    ///   larger than the bucket's capacity.
290    /// * [`ThrottleError::ZeroRefillExhausted`] if
291    ///   `refill_per_sec == 0` and the initial bucket cannot
292    ///   satisfy the request.
293    pub fn acquire_blocking(&self, n: u64) -> Result<(), ThrottleError> {
294        if n > self.capacity {
295            return Err(ThrottleError::RequestExceedsCapacity {
296                requested: n,
297                capacity: self.capacity,
298            });
299        }
300        if n == 0 {
301            return Ok(());
302        }
303        if self.try_acquire(n) {
304            return Ok(());
305        }
306        if self.refill_per_sec == 0 {
307            return Err(ThrottleError::ZeroRefillExhausted {
308                requested: n,
309                available: self.available(),
310            });
311        }
312        loop {
313            let needed = n.saturating_sub(self.available.load(Ordering::Acquire));
314            let needed = needed.max(1);
315            // Compute the wait in integer nanoseconds. Multiplying
316            // by a billion in u128 avoids float precision loss
317            // and tracks the same time domain as `refill`.
318            let want_nanos =
319                u128::from(needed).saturating_mul(1_000_000_000) / u128::from(self.refill_per_sec);
320            let want_nanos = want_nanos.clamp(1_000_000, 1_000_000_000);
321            let dur = Duration::from_nanos(u64::try_from(want_nanos).unwrap_or(u64::MAX));
322            std::thread::sleep(dur);
323            if self.try_acquire(n) {
324                return Ok(());
325            }
326        }
327    }
328
329    /// Adds tokens earned since the last refill instant. Does not
330    /// exceed `capacity`. Holds the `last_refill` mutex while
331    /// computing the increment so concurrent refillers add up to
332    /// the same total.
333    fn refill(&self) {
334        if self.refill_per_sec == 0 {
335            return;
336        }
337        let now = self.clock.now();
338        let mut last = self
339            .last_refill
340            .lock()
341            .expect("invariant: throttle last_refill mutex must not be poisoned");
342        let elapsed = now.duration_since(*last);
343        // Convert elapsed time to whole tokens. Using nanoseconds
344        // and integer math avoids the precision drift floats would
345        // cause across many short refills.
346        let elapsed_nanos: u128 = elapsed.as_nanos();
347        let rate = u128::from(self.refill_per_sec);
348        let new_tokens_u128 = elapsed_nanos.saturating_mul(rate) / 1_000_000_000_u128;
349        if new_tokens_u128 == 0 {
350            return;
351        }
352        // Advance `last_refill` only by the integer-token slice
353        // we are crediting; the fractional tail rolls into the
354        // next refill.
355        let new_tokens = u64::try_from(new_tokens_u128).unwrap_or(u64::MAX);
356        let credited_nanos = (u128::from(new_tokens) * 1_000_000_000_u128) / rate;
357        *last += Duration::from_nanos(u64::try_from(credited_nanos).unwrap_or(u64::MAX));
358        // Saturating add into `available`, capped at `capacity`.
359        let mut cur = self.available.load(Ordering::Acquire);
360        loop {
361            let target = cur.saturating_add(new_tokens).min(self.capacity);
362            if target == cur {
363                break;
364            }
365            match self.available.compare_exchange_weak(
366                cur,
367                target,
368                Ordering::AcqRel,
369                Ordering::Acquire,
370            ) {
371                Ok(_) => break,
372                Err(actual) => cur = actual,
373            }
374        }
375    }
376
377    /// Atomically subtracts `n` tokens from `available` if at
378    /// least `n` are currently held. Returns whether the
379    /// subtraction succeeded.
380    fn consume(&self, n: u64) -> bool {
381        let mut cur = self.available.load(Ordering::Acquire);
382        loop {
383            if cur < n {
384                return false;
385            }
386            match self.available.compare_exchange_weak(
387                cur,
388                cur - n,
389                Ordering::AcqRel,
390                Ordering::Acquire,
391            ) {
392                Ok(_) => return true,
393                Err(actual) => cur = actual,
394            }
395        }
396    }
397}
398
399impl<C: Clock> std::fmt::Debug for Throttle<C> {
400    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
401        f.debug_struct("Throttle")
402            .field("capacity", &self.capacity)
403            .field("refill_per_sec", &self.refill_per_sec)
404            .field("available", &self.available())
405            .finish_non_exhaustive()
406    }
407}
408
409#[cfg(all(test, not(loom)))]
410mod tests {
411    use super::*;
412    use hegel::generators as gs;
413    use hegel::TestCase;
414    use std::sync::Arc;
415
416    #[test]
417    fn new_starts_full() {
418        let t: Throttle = Throttle::new(8, 1);
419        assert_eq!(t.capacity(), 8);
420        assert_eq!(t.refill_per_sec(), 1);
421        assert_eq!(t.available(), 8);
422    }
423
424    #[test]
425    fn try_acquire_zero_is_always_true_even_when_empty() {
426        let t: Throttle = Throttle::new(2, 0);
427        assert!(t.try_acquire(2));
428        assert_eq!(t.available(), 0);
429        assert!(t.try_acquire(0));
430    }
431
432    #[test]
433    fn try_acquire_above_capacity_fails_fast() {
434        let t: Throttle = Throttle::new(4, 100);
435        assert!(!t.try_acquire(5));
436        // The bucket is unchanged.
437        assert_eq!(t.available(), 4);
438    }
439
440    #[test]
441    fn manual_clock_drives_refill() {
442        let clock = Arc::new(ManualClock::new());
443        let t = Throttle::with_clock(10, 100, Arc::clone(&clock));
444        assert!(t.try_acquire(10));
445        assert_eq!(t.available(), 0);
446        // 100 ms at 100 tokens/s yields exactly 10 tokens.
447        clock.advance(Duration::from_millis(100));
448        // try_acquire(0) triggers refill but consumes nothing.
449        assert!(t.try_acquire(0));
450        assert_eq!(t.available(), 10);
451    }
452
453    #[test]
454    fn manual_clock_caps_at_capacity() {
455        let clock = Arc::new(ManualClock::new());
456        let t = Throttle::with_clock(5, 1, Arc::clone(&clock));
457        // Drain.
458        assert!(t.try_acquire(5));
459        // Advance an hour at 1 token/s == 3600 tokens; should
460        // saturate at capacity 5.
461        clock.advance(Duration::from_secs(3600));
462        assert!(t.try_acquire(0));
463        assert_eq!(t.available(), 5);
464    }
465
466    #[test]
467    fn manual_clock_zero_refill_does_not_replenish() {
468        let clock = Arc::new(ManualClock::new());
469        let t = Throttle::with_clock(3, 0, Arc::clone(&clock));
470        assert!(t.try_acquire(3));
471        clock.advance(Duration::from_secs(60));
472        assert!(!t.try_acquire(1));
473    }
474
475    #[test]
476    fn acquire_blocking_above_capacity_returns_typed_error() {
477        let t: Throttle = Throttle::new(2, 1);
478        let err = t.acquire_blocking(5).unwrap_err();
479        assert_eq!(
480            err,
481            ThrottleError::RequestExceedsCapacity {
482                requested: 5,
483                capacity: 2,
484            }
485        );
486    }
487
488    #[test]
489    fn acquire_blocking_zero_refill_with_empty_bucket_returns_error() {
490        let t: Throttle = Throttle::new(1, 0);
491        assert!(t.try_acquire(1));
492        let err = t.acquire_blocking(1).unwrap_err();
493        assert!(matches!(
494            err,
495            ThrottleError::ZeroRefillExhausted {
496                requested: 1,
497                available: 0,
498            }
499        ));
500    }
501
502    #[test]
503    fn acquire_blocking_zero_request_is_noop() {
504        let t: Throttle = Throttle::new(1, 0);
505        // Even with refill=0 and capacity=1, an acquire of zero
506        // succeeds without blocking.
507        t.acquire_blocking(0).unwrap();
508        assert_eq!(t.available(), 1);
509    }
510
511    #[test]
512    fn acquire_blocking_waits_for_refill() {
513        let t = Throttle::new(2, 200); // refill 200/s -> 5ms/token
514        assert!(t.try_acquire(2));
515        let start = Instant::now();
516        t.acquire_blocking(2).unwrap();
517        let elapsed = start.elapsed();
518        assert!(
519            elapsed >= Duration::from_millis(5),
520            "acquire returned in {elapsed:?}, expected at least ~10ms"
521        );
522        assert!(
523            elapsed < Duration::from_secs(2),
524            "acquire took unexpectedly long: {elapsed:?}"
525        );
526    }
527
528    #[hegel::test(test_cases = 64)]
529    fn manual_clock_envelope_holds(tc: TestCase) {
530        let capacity = tc.draw(gs::integers::<u64>().min_value(1).max_value(64));
531        let refill = tc.draw(gs::integers::<u64>().min_value(1).max_value(1_000));
532        let req_sizes: Vec<u64> = (0..16)
533            .map(|_| tc.draw(gs::integers::<u64>().min_value(0).max_value(capacity)))
534            .collect();
535        // Test both wall-clock (via the Hegel harness) and a
536        // ManualClock that does not advance: the second case
537        // pins the envelope to capacity exactly.
538        let clock = Arc::new(ManualClock::new());
539        let t = Throttle::with_clock(capacity, refill, Arc::clone(&clock));
540        let mut granted: u128 = 0;
541        for n in &req_sizes {
542            if t.try_acquire(*n) {
543                granted += u128::from(*n);
544            }
545        }
546        // No clock advance => envelope is exactly capacity.
547        assert!(
548            granted <= u128::from(capacity),
549            "granted {granted} > capacity {capacity} with frozen clock"
550        );
551    }
552}