Skip to main content

dynomite/runtime/
throttle.rs

1//! Token-bucket admission control gate.
2//!
3//! [`Throttle`] is a thin tokio-aware wrapper around the
4//! algorithm in the `throttle-core` crate. The core type owns
5//! the atomic bucket state and the [`SystemClock`]-driven refill
6//! arithmetic; this module adds:
7//!
8//! * a tokio-async [`Throttle::acquire`] that uses
9//!   [`tokio::time::sleep`] in the wait loop, and
10//! * a `throttle_wait_seconds{queue="..."}` Prometheus histogram
11//!   that records the time spent inside [`Throttle::acquire`].
12//!
13//! The split lets the algorithm be model-checked under loom
14//! (where tokio cannot link) while the production embed keeps
15//! the same async-friendly surface.
16//!
17//! # Examples
18//!
19//! ```
20//! use dynomite::runtime::Throttle;
21//! let t = Throttle::new(8, 4); // burst 8, sustain 4 tokens/sec
22//! assert!(t.try_acquire(8));   // burst the whole bucket
23//! assert!(!t.try_acquire(8));  // empty now, fast-fail
24//! ```
25
26use std::time::{Duration, Instant};
27
28use throttle_core::{SystemClock, Throttle as Inner};
29
30pub use throttle_core::ThrottleError;
31
32use crate::runtime::metrics;
33
34/// Token-bucket admission control gate.
35///
36/// The bucket is initialised full, so the first burst of up to
37/// `capacity` tokens is granted immediately.
38pub struct Throttle {
39    /// Static name used as the `queue` label on
40    /// `throttle_wait_seconds`.
41    name: &'static str,
42    inner: Inner<SystemClock>,
43}
44
45impl Throttle {
46    /// Build a new throttle with the given burst capacity and
47    /// sustained refill rate. The bucket starts full.
48    ///
49    /// The `queue` metric label defaults to `"default"`. Use
50    /// [`Throttle::with_name`] to pick a meaningful label.
51    pub fn new(capacity: u64, refill_per_sec: u64) -> Self {
52        Self::with_name("default", capacity, refill_per_sec)
53    }
54
55    /// Build a new throttle with an explicit metric label.
56    pub fn with_name(name: &'static str, capacity: u64, refill_per_sec: u64) -> Self {
57        // Eagerly touch the histogram so the first acquire does
58        // not pay the registry-lock cost.
59        let _ = metrics::throttle_wait().with_label_values(&[name]);
60        Self {
61            name,
62            inner: Inner::new(capacity, refill_per_sec),
63        }
64    }
65
66    /// Burst capacity (the maximum number of tokens that may be
67    /// acquired in one go).
68    pub fn capacity(&self) -> u64 {
69        self.inner.capacity()
70    }
71
72    /// Sustained refill rate in tokens per second.
73    pub fn refill_per_sec(&self) -> u64 {
74        self.inner.refill_per_sec()
75    }
76
77    /// Best-effort snapshot of the currently available tokens.
78    /// Useful for tests and diagnostics; do not branch on this in
79    /// admission code (use [`Throttle::try_acquire`] instead).
80    pub fn available(&self) -> u64 {
81        self.inner.available()
82    }
83
84    /// Try to take `n` tokens. Returns `true` on success and
85    /// `false` if the bucket does not currently hold `n` tokens.
86    /// Refills the bucket from the elapsed wall-clock interval
87    /// before checking.
88    ///
89    /// Requesting `n > capacity` always returns `false`: the
90    /// bucket can never hold that many tokens, so blocking is
91    /// pointless.
92    pub fn try_acquire(&self, n: u64) -> bool {
93        self.inner.try_acquire(n)
94    }
95
96    /// Acquire `n` tokens, waiting if necessary for the bucket to
97    /// refill. The time spent waiting is recorded on the
98    /// `throttle_wait_seconds` histogram.
99    ///
100    /// # Panics
101    ///
102    /// Panics if `n > capacity`. The bucket can never hold that
103    /// many tokens, so an unconditional wait would be a deadlock.
104    /// Panics if `refill_per_sec` is zero and the initial bucket
105    /// cannot satisfy the request: the throttle has no way to
106    /// ever recover, so a deadlock is the alternative.
107    pub async fn acquire(&self, n: u64) {
108        let capacity = self.inner.capacity();
109        assert!(
110            n <= capacity,
111            "throttle: requested {n} tokens > capacity {capacity}",
112        );
113        if n == 0 {
114            return;
115        }
116        let start = Instant::now();
117        // Fast path: enough tokens already, no histogram cost.
118        if self.inner.try_acquire(n) {
119            return;
120        }
121        let refill = self.inner.refill_per_sec();
122        assert!(
123            refill > 0,
124            "throttle: zero refill rate cannot satisfy acquire({n})",
125        );
126        loop {
127            // Sleep just long enough that the missing fraction of
128            // tokens is expected to have been refilled. The
129            // `try_acquire` after the sleep folds in real elapsed
130            // time, so we never grant more than the contract.
131            let needed = n.saturating_sub(self.inner.available());
132            let needed = needed.max(1);
133            // Compute the wait in integer nanoseconds. Multiplying
134            // by a billion in u128 avoids float precision loss
135            // and tracks the same time domain as the core refill.
136            let want_nanos = u128::from(needed).saturating_mul(1_000_000_000) / u128::from(refill);
137            // Floor at 1ms so a fractional refill never spins
138            // tightly, and ceiling at 1s so a misconfigured
139            // throttle still polls regularly.
140            let want_nanos = want_nanos.clamp(1_000_000, 1_000_000_000);
141            let dur = Duration::from_nanos(u64::try_from(want_nanos).unwrap_or(u64::MAX));
142            tokio::time::sleep(dur).await;
143            if self.inner.try_acquire(n) {
144                let waited = start.elapsed().as_secs_f64();
145                metrics::throttle_wait()
146                    .with_label_values(&[self.name])
147                    .observe(waited);
148                return;
149            }
150        }
151    }
152}
153
154impl std::fmt::Debug for Throttle {
155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156        f.debug_struct("Throttle")
157            .field("name", &self.name)
158            .field("capacity", &self.capacity())
159            .field("refill_per_sec", &self.refill_per_sec())
160            .field("available", &self.available())
161            .finish_non_exhaustive()
162    }
163}