throttle_core/lib.rs
1//! Tokio-free token-bucket admission control with loom-checkable
2//! atomics.
3//!
4//! [`Throttle`] is a fixed-capacity bucket that refills at a
5//! configured rate. Callers ask for `n` tokens with
6//! [`Throttle::try_acquire`] (non-blocking) or
7//! [`Throttle::acquire_blocking`] (sync, sleeps the calling
8//! thread until the bucket has enough). The bucket caps the
9//! long-run rate at `refill_per_sec` tokens per second while
10//! still allowing short bursts up to `capacity` tokens.
11//!
12//! This crate intentionally has no async runtime dependency. The
13//! `dynomite` crate wraps [`Throttle`] in a tokio-aware adapter
14//! that uses `tokio::time::sleep` instead of
15//! [`std::thread::sleep`] in the wait loop. The two layers share
16//! the same algorithm and the same invariants.
17//!
18//! # Loom support
19//!
20//! Under `RUSTFLAGS='--cfg loom'` the atomics and mutex used by
21//! [`Throttle`] are sourced from the [`loom`] crate's
22//! shadow-`std` modules. This lets a model checker explore every
23//! legal interleaving of the CAS loop in
24//! [`Throttle::try_acquire`] and verify that no interleaving
25//! over-grants tokens. The clock is abstracted behind the
26//! [`Clock`] trait so loom tests can drive a deterministic
27//! [`ManualClock`] rather than wall-clock time.
28//!
29//! # Examples
30//!
31//! ```no_run
32//! use throttle_core::Throttle;
33//! let t: Throttle = Throttle::new(8, 4); // burst 8, sustain 4 tokens/sec
34//! assert!(t.try_acquire(8)); // burst the whole bucket
35//! assert!(!t.try_acquire(8)); // empty now, fast-fail
36//! ```
37//!
38//! The example uses `no_run` because the actual atomics under
39//! `RUSTFLAGS='--cfg loom'` require a `loom::model()` context.
40//! Behavioural correctness is exercised by the in-crate unit
41//! tests and the integration tests under `tests/`.
42
43#![forbid(unsafe_code)]
44
45use std::time::{Duration, Instant};
46
47#[cfg(loom)]
48use loom::sync::atomic::{AtomicU64, Ordering};
49#[cfg(loom)]
50use loom::sync::Mutex;
51
52#[cfg(not(loom))]
53use std::sync::atomic::{AtomicU64, Ordering};
54#[cfg(not(loom))]
55use std::sync::Mutex;
56
57/// A monotonic time source.
58///
59/// Production code uses [`SystemClock`], which delegates to
60/// [`Instant::now`]. Tests and loom models inject a
61/// [`ManualClock`] so the refill computation runs against
62/// deterministic timestamps.
63pub trait Clock: Send + Sync {
64 /// Returns a monotonic instant. Successive calls must never
65 /// return an earlier instant than a prior call.
66 fn now(&self) -> Instant;
67}
68
69/// Clock implementation backed by [`Instant::now`].
70#[derive(Debug, Default, Clone, Copy)]
71pub struct SystemClock;
72
73impl Clock for SystemClock {
74 fn now(&self) -> Instant {
75 Instant::now()
76 }
77}
78
79// Blanket impls so callers can share a clock through `Arc` or
80// pass a borrowed reference into [`Throttle::with_clock`]
81// without wrapping it in a newtype. We provide separate impls
82// for the two `Arc` flavours (`std`'s and `loom`'s) so loom
83// tests can build `Arc<ManualClock>` from `loom::sync::Arc`
84// while the default build keeps using `std::sync::Arc`.
85impl<C: Clock + ?Sized> Clock for &C {
86 fn now(&self) -> Instant {
87 (**self).now()
88 }
89}
90
91impl<C: Clock + ?Sized> Clock for std::sync::Arc<C> {
92 fn now(&self) -> Instant {
93 (**self).now()
94 }
95}
96
97#[cfg(loom)]
98impl<C: Clock + ?Sized> Clock for loom::sync::Arc<C> {
99 fn now(&self) -> Instant {
100 (**self).now()
101 }
102}
103
104/// Deterministic clock for tests and loom models.
105///
106/// The clock starts at the [`Instant`] captured at construction
107/// time and advances only when callers invoke
108/// [`ManualClock::advance`]. Internally the offset is stored as
109/// nanoseconds in an [`AtomicU64`], so the clock is `Send + Sync`
110/// and can be shared across threads (or loom-modelled threads)
111/// without further wrapping.
112#[derive(Debug)]
113pub struct ManualClock {
114 base: Instant,
115 offset_nanos: AtomicU64,
116}
117
118impl ManualClock {
119 /// Creates a clock anchored at the calling-thread's current
120 /// [`Instant`] with zero offset.
121 pub fn new() -> Self {
122 Self {
123 base: Instant::now(),
124 offset_nanos: AtomicU64::new(0),
125 }
126 }
127
128 /// Advances the clock by `delta`. Saturates at
129 /// [`u64::MAX`] nanoseconds (~584 years), which is plenty for
130 /// any realistic test horizon.
131 pub fn advance(&self, delta: Duration) {
132 let add = u64::try_from(delta.as_nanos()).unwrap_or(u64::MAX);
133 // Saturating add: a wraparound here would silently rewind
134 // the clock and break the monotonicity contract.
135 let mut cur = self.offset_nanos.load(Ordering::Acquire);
136 loop {
137 let next = cur.saturating_add(add);
138 match self.offset_nanos.compare_exchange_weak(
139 cur,
140 next,
141 Ordering::AcqRel,
142 Ordering::Acquire,
143 ) {
144 Ok(_) => return,
145 Err(actual) => cur = actual,
146 }
147 }
148 }
149}
150
151impl Default for ManualClock {
152 fn default() -> Self {
153 Self::new()
154 }
155}
156
157impl Clock for ManualClock {
158 fn now(&self) -> Instant {
159 self.base + Duration::from_nanos(self.offset_nanos.load(Ordering::Acquire))
160 }
161}
162
163/// Errors returned by [`Throttle::acquire_blocking`].
164#[derive(Debug, thiserror::Error, PartialEq, Eq)]
165pub enum ThrottleError {
166 /// The caller asked for more tokens than the bucket can ever
167 /// hold. Waiting would deadlock; the request must be rejected
168 /// at the source.
169 #[error("requested {requested} tokens exceeds capacity {capacity}")]
170 RequestExceedsCapacity {
171 /// Tokens the caller asked for.
172 requested: u64,
173 /// Bucket capacity.
174 capacity: u64,
175 },
176 /// The bucket has a zero refill rate and the initial capacity
177 /// is insufficient for the request. There is no way to ever
178 /// satisfy the request, so blocking is pointless.
179 #[error(
180 "zero refill rate cannot satisfy acquire of {requested} tokens \
181 (only {available} available)"
182 )]
183 ZeroRefillExhausted {
184 /// Tokens the caller asked for.
185 requested: u64,
186 /// Tokens currently in the bucket.
187 available: u64,
188 },
189}
190
191/// Token-bucket admission control gate.
192///
193/// The bucket is initialised full, so the first burst of up to
194/// `capacity` tokens is granted immediately. Tokens accrue at
195/// `refill_per_sec` per second up to `capacity`, computed lazily
196/// against the [`Clock`] on each [`Throttle::try_acquire`] call.
197///
198/// `Throttle` is generic over the clock so loom and unit tests
199/// can inject a [`ManualClock`]. Production code uses the
200/// [`SystemClock`] default.
201pub struct Throttle<C: Clock = SystemClock> {
202 capacity: u64,
203 refill_per_sec: u64,
204 available: AtomicU64,
205 last_refill: Mutex<Instant>,
206 clock: C,
207}
208
209impl Throttle<SystemClock> {
210 /// Builds a throttle backed by the [`SystemClock`].
211 ///
212 /// The bucket starts full; the first acquire of up to
213 /// `capacity` tokens succeeds without waiting.
214 pub fn new(capacity: u64, refill_per_sec: u64) -> Self {
215 Self::with_clock(capacity, refill_per_sec, SystemClock)
216 }
217}
218
219impl<C: Clock> Throttle<C> {
220 /// Builds a throttle that consults `clock` for refill timing.
221 ///
222 /// The bucket starts full and the last-refill timestamp is
223 /// captured from `clock` at construction time. Subsequent
224 /// `clock.now()` values must be monotonic relative to that
225 /// initial reading.
226 pub fn with_clock(capacity: u64, refill_per_sec: u64, clock: C) -> Self {
227 let now = clock.now();
228 Self {
229 capacity,
230 refill_per_sec,
231 available: AtomicU64::new(capacity),
232 last_refill: Mutex::new(now),
233 clock,
234 }
235 }
236
237 /// Burst capacity (the maximum number of tokens that may be
238 /// acquired in one go).
239 pub fn capacity(&self) -> u64 {
240 self.capacity
241 }
242
243 /// Sustained refill rate in tokens per second.
244 pub fn refill_per_sec(&self) -> u64 {
245 self.refill_per_sec
246 }
247
248 /// Best-effort snapshot of the currently available tokens.
249 ///
250 /// Useful for tests and diagnostics; do not branch on this
251 /// in admission code (use [`Throttle::try_acquire`] instead).
252 pub fn available(&self) -> u64 {
253 self.available.load(Ordering::Acquire)
254 }
255
256 /// Tries to take `n` tokens.
257 ///
258 /// Returns `true` on success and `false` if the bucket does
259 /// not currently hold `n` tokens. The bucket is refilled
260 /// from the clock-elapsed interval before the check.
261 ///
262 /// Requesting `n > capacity` always returns `false`: the
263 /// bucket can never hold that many tokens, so blocking would
264 /// be pointless.
265 pub fn try_acquire(&self, n: u64) -> bool {
266 if n > self.capacity {
267 return false;
268 }
269 self.refill();
270 if n == 0 {
271 return true;
272 }
273 self.consume(n)
274 }
275
276 /// Acquires `n` tokens, sleeping the calling thread if the
277 /// bucket is empty.
278 ///
279 /// This is the synchronous counterpart of `dynomite`'s
280 /// async `Throttle::acquire`. The wait loop sleeps for the
281 /// time required to refill the missing tokens at
282 /// `refill_per_sec`, clamped to the range 1 ms .. 1 s so a
283 /// fractional refill never spins tightly and a misconfigured
284 /// throttle still polls regularly.
285 ///
286 /// # Errors
287 ///
288 /// * [`ThrottleError::RequestExceedsCapacity`] if `n` is
289 /// larger than the bucket's capacity.
290 /// * [`ThrottleError::ZeroRefillExhausted`] if
291 /// `refill_per_sec == 0` and the initial bucket cannot
292 /// satisfy the request.
293 pub fn acquire_blocking(&self, n: u64) -> Result<(), ThrottleError> {
294 if n > self.capacity {
295 return Err(ThrottleError::RequestExceedsCapacity {
296 requested: n,
297 capacity: self.capacity,
298 });
299 }
300 if n == 0 {
301 return Ok(());
302 }
303 if self.try_acquire(n) {
304 return Ok(());
305 }
306 if self.refill_per_sec == 0 {
307 return Err(ThrottleError::ZeroRefillExhausted {
308 requested: n,
309 available: self.available(),
310 });
311 }
312 loop {
313 let needed = n.saturating_sub(self.available.load(Ordering::Acquire));
314 let needed = needed.max(1);
315 // Compute the wait in integer nanoseconds. Multiplying
316 // by a billion in u128 avoids float precision loss
317 // and tracks the same time domain as `refill`.
318 let want_nanos =
319 u128::from(needed).saturating_mul(1_000_000_000) / u128::from(self.refill_per_sec);
320 let want_nanos = want_nanos.clamp(1_000_000, 1_000_000_000);
321 let dur = Duration::from_nanos(u64::try_from(want_nanos).unwrap_or(u64::MAX));
322 std::thread::sleep(dur);
323 if self.try_acquire(n) {
324 return Ok(());
325 }
326 }
327 }
328
329 /// Adds tokens earned since the last refill instant. Does not
330 /// exceed `capacity`. Holds the `last_refill` mutex while
331 /// computing the increment so concurrent refillers add up to
332 /// the same total.
333 fn refill(&self) {
334 if self.refill_per_sec == 0 {
335 return;
336 }
337 let now = self.clock.now();
338 let mut last = self
339 .last_refill
340 .lock()
341 .expect("invariant: throttle last_refill mutex must not be poisoned");
342 let elapsed = now.duration_since(*last);
343 // Convert elapsed time to whole tokens. Using nanoseconds
344 // and integer math avoids the precision drift floats would
345 // cause across many short refills.
346 let elapsed_nanos: u128 = elapsed.as_nanos();
347 let rate = u128::from(self.refill_per_sec);
348 let new_tokens_u128 = elapsed_nanos.saturating_mul(rate) / 1_000_000_000_u128;
349 if new_tokens_u128 == 0 {
350 return;
351 }
352 // Advance `last_refill` only by the integer-token slice
353 // we are crediting; the fractional tail rolls into the
354 // next refill.
355 let new_tokens = u64::try_from(new_tokens_u128).unwrap_or(u64::MAX);
356 let credited_nanos = (u128::from(new_tokens) * 1_000_000_000_u128) / rate;
357 *last += Duration::from_nanos(u64::try_from(credited_nanos).unwrap_or(u64::MAX));
358 // Saturating add into `available`, capped at `capacity`.
359 let mut cur = self.available.load(Ordering::Acquire);
360 loop {
361 let target = cur.saturating_add(new_tokens).min(self.capacity);
362 if target == cur {
363 break;
364 }
365 match self.available.compare_exchange_weak(
366 cur,
367 target,
368 Ordering::AcqRel,
369 Ordering::Acquire,
370 ) {
371 Ok(_) => break,
372 Err(actual) => cur = actual,
373 }
374 }
375 }
376
377 /// Atomically subtracts `n` tokens from `available` if at
378 /// least `n` are currently held. Returns whether the
379 /// subtraction succeeded.
380 fn consume(&self, n: u64) -> bool {
381 let mut cur = self.available.load(Ordering::Acquire);
382 loop {
383 if cur < n {
384 return false;
385 }
386 match self.available.compare_exchange_weak(
387 cur,
388 cur - n,
389 Ordering::AcqRel,
390 Ordering::Acquire,
391 ) {
392 Ok(_) => return true,
393 Err(actual) => cur = actual,
394 }
395 }
396 }
397}
398
399impl<C: Clock> std::fmt::Debug for Throttle<C> {
400 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
401 f.debug_struct("Throttle")
402 .field("capacity", &self.capacity)
403 .field("refill_per_sec", &self.refill_per_sec)
404 .field("available", &self.available())
405 .finish_non_exhaustive()
406 }
407}
408
409#[cfg(all(test, not(loom)))]
410mod tests {
411 use super::*;
412 use hegel::generators as gs;
413 use hegel::TestCase;
414 use std::sync::Arc;
415
416 #[test]
417 fn new_starts_full() {
418 let t: Throttle = Throttle::new(8, 1);
419 assert_eq!(t.capacity(), 8);
420 assert_eq!(t.refill_per_sec(), 1);
421 assert_eq!(t.available(), 8);
422 }
423
424 #[test]
425 fn try_acquire_zero_is_always_true_even_when_empty() {
426 let t: Throttle = Throttle::new(2, 0);
427 assert!(t.try_acquire(2));
428 assert_eq!(t.available(), 0);
429 assert!(t.try_acquire(0));
430 }
431
432 #[test]
433 fn try_acquire_above_capacity_fails_fast() {
434 let t: Throttle = Throttle::new(4, 100);
435 assert!(!t.try_acquire(5));
436 // The bucket is unchanged.
437 assert_eq!(t.available(), 4);
438 }
439
440 #[test]
441 fn manual_clock_drives_refill() {
442 let clock = Arc::new(ManualClock::new());
443 let t = Throttle::with_clock(10, 100, Arc::clone(&clock));
444 assert!(t.try_acquire(10));
445 assert_eq!(t.available(), 0);
446 // 100 ms at 100 tokens/s yields exactly 10 tokens.
447 clock.advance(Duration::from_millis(100));
448 // try_acquire(0) triggers refill but consumes nothing.
449 assert!(t.try_acquire(0));
450 assert_eq!(t.available(), 10);
451 }
452
453 #[test]
454 fn manual_clock_caps_at_capacity() {
455 let clock = Arc::new(ManualClock::new());
456 let t = Throttle::with_clock(5, 1, Arc::clone(&clock));
457 // Drain.
458 assert!(t.try_acquire(5));
459 // Advance an hour at 1 token/s == 3600 tokens; should
460 // saturate at capacity 5.
461 clock.advance(Duration::from_secs(3600));
462 assert!(t.try_acquire(0));
463 assert_eq!(t.available(), 5);
464 }
465
466 #[test]
467 fn manual_clock_zero_refill_does_not_replenish() {
468 let clock = Arc::new(ManualClock::new());
469 let t = Throttle::with_clock(3, 0, Arc::clone(&clock));
470 assert!(t.try_acquire(3));
471 clock.advance(Duration::from_secs(60));
472 assert!(!t.try_acquire(1));
473 }
474
475 #[test]
476 fn acquire_blocking_above_capacity_returns_typed_error() {
477 let t: Throttle = Throttle::new(2, 1);
478 let err = t.acquire_blocking(5).unwrap_err();
479 assert_eq!(
480 err,
481 ThrottleError::RequestExceedsCapacity {
482 requested: 5,
483 capacity: 2,
484 }
485 );
486 }
487
488 #[test]
489 fn acquire_blocking_zero_refill_with_empty_bucket_returns_error() {
490 let t: Throttle = Throttle::new(1, 0);
491 assert!(t.try_acquire(1));
492 let err = t.acquire_blocking(1).unwrap_err();
493 assert!(matches!(
494 err,
495 ThrottleError::ZeroRefillExhausted {
496 requested: 1,
497 available: 0,
498 }
499 ));
500 }
501
502 #[test]
503 fn acquire_blocking_zero_request_is_noop() {
504 let t: Throttle = Throttle::new(1, 0);
505 // Even with refill=0 and capacity=1, an acquire of zero
506 // succeeds without blocking.
507 t.acquire_blocking(0).unwrap();
508 assert_eq!(t.available(), 1);
509 }
510
511 #[test]
512 fn acquire_blocking_waits_for_refill() {
513 let t = Throttle::new(2, 200); // refill 200/s -> 5ms/token
514 assert!(t.try_acquire(2));
515 let start = Instant::now();
516 t.acquire_blocking(2).unwrap();
517 let elapsed = start.elapsed();
518 assert!(
519 elapsed >= Duration::from_millis(5),
520 "acquire returned in {elapsed:?}, expected at least ~10ms"
521 );
522 assert!(
523 elapsed < Duration::from_secs(2),
524 "acquire took unexpectedly long: {elapsed:?}"
525 );
526 }
527
528 #[hegel::test(test_cases = 64)]
529 fn manual_clock_envelope_holds(tc: TestCase) {
530 let capacity = tc.draw(gs::integers::<u64>().min_value(1).max_value(64));
531 let refill = tc.draw(gs::integers::<u64>().min_value(1).max_value(1_000));
532 let req_sizes: Vec<u64> = (0..16)
533 .map(|_| tc.draw(gs::integers::<u64>().min_value(0).max_value(capacity)))
534 .collect();
535 // Test both wall-clock (via the Hegel harness) and a
536 // ManualClock that does not advance: the second case
537 // pins the envelope to capacity exactly.
538 let clock = Arc::new(ManualClock::new());
539 let t = Throttle::with_clock(capacity, refill, Arc::clone(&clock));
540 let mut granted: u128 = 0;
541 for n in &req_sizes {
542 if t.try_acquire(*n) {
543 granted += u128::from(*n);
544 }
545 }
546 // No clock advance => envelope is exactly capacity.
547 assert!(
548 granted <= u128::from(capacity),
549 "granted {granted} > capacity {capacity} with frozen clock"
550 );
551 }
552}