dynomite/runtime/throttle.rs
1//! Token-bucket admission control gate.
2//!
3//! [`Throttle`] is a thin tokio-aware wrapper around the
4//! algorithm in the `throttle-core` crate. The core type owns
5//! the atomic bucket state and the [`SystemClock`]-driven refill
6//! arithmetic; this module adds:
7//!
8//! * a tokio-async [`Throttle::acquire`] that uses
9//! [`tokio::time::sleep`] in the wait loop, and
10//! * a `throttle_wait_seconds{queue="..."}` Prometheus histogram
11//! that records the time spent inside [`Throttle::acquire`].
12//!
13//! The split lets the algorithm be model-checked under loom
14//! (where tokio cannot link) while the production embed keeps
15//! the same async-friendly surface.
16//!
17//! # Examples
18//!
19//! ```
20//! use dynomite::runtime::Throttle;
21//! let t = Throttle::new(8, 4); // burst 8, sustain 4 tokens/sec
22//! assert!(t.try_acquire(8)); // burst the whole bucket
23//! assert!(!t.try_acquire(8)); // empty now, fast-fail
24//! ```
25
26use std::time::{Duration, Instant};
27
28use throttle_core::{SystemClock, Throttle as Inner};
29
30pub use throttle_core::ThrottleError;
31
32use crate::runtime::metrics;
33
34/// Token-bucket admission control gate.
35///
36/// The bucket is initialised full, so the first burst of up to
37/// `capacity` tokens is granted immediately.
38pub struct Throttle {
39 /// Static name used as the `queue` label on
40 /// `throttle_wait_seconds`.
41 name: &'static str,
42 inner: Inner<SystemClock>,
43}
44
45impl Throttle {
46 /// Build a new throttle with the given burst capacity and
47 /// sustained refill rate. The bucket starts full.
48 ///
49 /// The `queue` metric label defaults to `"default"`. Use
50 /// [`Throttle::with_name`] to pick a meaningful label.
51 pub fn new(capacity: u64, refill_per_sec: u64) -> Self {
52 Self::with_name("default", capacity, refill_per_sec)
53 }
54
55 /// Build a new throttle with an explicit metric label.
56 pub fn with_name(name: &'static str, capacity: u64, refill_per_sec: u64) -> Self {
57 // Eagerly touch the histogram so the first acquire does
58 // not pay the registry-lock cost.
59 let _ = metrics::throttle_wait().with_label_values(&[name]);
60 Self {
61 name,
62 inner: Inner::new(capacity, refill_per_sec),
63 }
64 }
65
66 /// Burst capacity (the maximum number of tokens that may be
67 /// acquired in one go).
68 pub fn capacity(&self) -> u64 {
69 self.inner.capacity()
70 }
71
72 /// Sustained refill rate in tokens per second.
73 pub fn refill_per_sec(&self) -> u64 {
74 self.inner.refill_per_sec()
75 }
76
77 /// Best-effort snapshot of the currently available tokens.
78 /// Useful for tests and diagnostics; do not branch on this in
79 /// admission code (use [`Throttle::try_acquire`] instead).
80 pub fn available(&self) -> u64 {
81 self.inner.available()
82 }
83
84 /// Try to take `n` tokens. Returns `true` on success and
85 /// `false` if the bucket does not currently hold `n` tokens.
86 /// Refills the bucket from the elapsed wall-clock interval
87 /// before checking.
88 ///
89 /// Requesting `n > capacity` always returns `false`: the
90 /// bucket can never hold that many tokens, so blocking is
91 /// pointless.
92 pub fn try_acquire(&self, n: u64) -> bool {
93 self.inner.try_acquire(n)
94 }
95
96 /// Acquire `n` tokens, waiting if necessary for the bucket to
97 /// refill. The time spent waiting is recorded on the
98 /// `throttle_wait_seconds` histogram.
99 ///
100 /// # Panics
101 ///
102 /// Panics if `n > capacity`. The bucket can never hold that
103 /// many tokens, so an unconditional wait would be a deadlock.
104 /// Panics if `refill_per_sec` is zero and the initial bucket
105 /// cannot satisfy the request: the throttle has no way to
106 /// ever recover, so a deadlock is the alternative.
107 pub async fn acquire(&self, n: u64) {
108 let capacity = self.inner.capacity();
109 assert!(
110 n <= capacity,
111 "throttle: requested {n} tokens > capacity {capacity}",
112 );
113 if n == 0 {
114 return;
115 }
116 let start = Instant::now();
117 // Fast path: enough tokens already, no histogram cost.
118 if self.inner.try_acquire(n) {
119 return;
120 }
121 let refill = self.inner.refill_per_sec();
122 assert!(
123 refill > 0,
124 "throttle: zero refill rate cannot satisfy acquire({n})",
125 );
126 loop {
127 // Sleep just long enough that the missing fraction of
128 // tokens is expected to have been refilled. The
129 // `try_acquire` after the sleep folds in real elapsed
130 // time, so we never grant more than the contract.
131 let needed = n.saturating_sub(self.inner.available());
132 let needed = needed.max(1);
133 // Compute the wait in integer nanoseconds. Multiplying
134 // by a billion in u128 avoids float precision loss
135 // and tracks the same time domain as the core refill.
136 let want_nanos = u128::from(needed).saturating_mul(1_000_000_000) / u128::from(refill);
137 // Floor at 1ms so a fractional refill never spins
138 // tightly, and ceiling at 1s so a misconfigured
139 // throttle still polls regularly.
140 let want_nanos = want_nanos.clamp(1_000_000, 1_000_000_000);
141 let dur = Duration::from_nanos(u64::try_from(want_nanos).unwrap_or(u64::MAX));
142 tokio::time::sleep(dur).await;
143 if self.inner.try_acquire(n) {
144 let waited = start.elapsed().as_secs_f64();
145 metrics::throttle_wait()
146 .with_label_values(&[self.name])
147 .observe(waited);
148 return;
149 }
150 }
151 }
152}
153
154impl std::fmt::Debug for Throttle {
155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 f.debug_struct("Throttle")
157 .field("name", &self.name)
158 .field("capacity", &self.capacity())
159 .field("refill_per_sec", &self.refill_per_sec())
160 .field("available", &self.available())
161 .finish_non_exhaustive()
162 }
163}