better_bucket/bucket.rs
1//! The single token bucket and the `TokenBucket` trait.
2//!
3//! This is the lock-free core. All mutable state lives in one `AtomicU64` that
4//! packs the current token count and the time of the last refill; `try_acquire`
5//! is a single `compare_exchange_weak` loop with lazy refill computed from the
6//! injected monotonic clock. There is no lock and no allocation on the acquire
7//! path, and the bucket is cache-line aligned so independent buckets never
8//! falsely share. The public surface is identical to the `0.2` foundation
9//! release — only the internals changed.
10//!
11//! # Packing
12//!
13//! The state word is split:
14//! - **upper 32 bits** — tokens in *millitokens* (thousandths of a token), for
15//! sub-token refill resolution. Capped at [`u32::MAX`] millitokens, so the
16//! effective capacity ceiling is about 4.29 million tokens.
17//! - **lower 32 bits** — milliseconds since the bucket's `created_at` anchor of
18//! the last refill computation.
19//!
20//! The millisecond field wraps every ~49.7 days. Elapsed time is computed with
21//! `wrapping_sub`, so refill stays correct for any gap shorter than that — i.e.
22//! for any bucket used at least once per ~49.7 days, which is every real
23//! limiter. A bucket left fully idle for longer may under-refill once on its
24//! next use, a safe and self-correcting outcome.
25
26use core::time::Duration;
27
28#[cfg(loom)]
29use loom::sync::atomic::{AtomicU64, Ordering};
30
31use clock_lib::{Clock, Monotonic, SystemClock};
32#[cfg(not(loom))]
33use core::sync::atomic::{AtomicU64, Ordering};
34
35use crate::config::BucketConfig;
36use crate::decision::Decision;
37
38/// Millitokens per whole token.
39const MILLI: u64 = 1_000;
40
41/// Fractional bits for the precomputed refill rate. The rate is stored as
42/// millitokens-per-millisecond in `Q(REFILL_FRAC_BITS)` fixed point so the hot
43/// path is a multiply and a shift — no division per acquire.
44const REFILL_FRAC_BITS: u32 = 22;
45
46/// Packs `millitokens` (clamped to 32 bits) and `last_ms` into one word.
47#[inline]
48fn pack(millitokens: u64, last_ms: u32) -> u64 {
49 (millitokens.min(u64::from(u32::MAX)) << 32) | u64::from(last_ms)
50}
51
52/// Unpacks the state word into `(millitokens, last_ms)`.
53#[inline]
54fn unpack(state: u64) -> (u64, u32) {
55 (state >> 32, (state & u64::from(u32::MAX)) as u32)
56}
57
58/// Maps a Tier-1 request to a config, collapsing a degenerate request (zero
59/// capacity, amount, or period) to a bucket that grants nothing. That is
60/// well-defined and safe rather than a panic; [`BucketConfig::new`] is the
61/// validated, error-returning path.
62fn tier1_config(capacity: u32, amount: u32, period: Duration, initial: u32) -> BucketConfig {
63 if capacity == 0 || amount == 0 || period.is_zero() {
64 BucketConfig::raw(0, 0, Duration::from_secs(1), 0)
65 } else {
66 BucketConfig::raw(capacity, amount, period, initial)
67 }
68}
69
70/// A token bucket: a counter that refills over time and grants tokens on demand.
71///
72/// A bucket holds up to its capacity in tokens, accrues more at a fixed rate,
73/// and hands them out when asked. The hot path is **lock-free** — a single
74/// `compare_exchange_weak` on a packed atomic word — and **allocation-free**.
75/// Refill is **lazy**: there is no background thread or timer, the token count
76/// is brought current from the monotonic clock the instant you call
77/// [`acquire`](Self::acquire), [`try_acquire`](Self::try_acquire), or
78/// [`available`](Self::available).
79///
80/// The type parameter `C` is the time source. It defaults to
81/// [`SystemClock`](clock_lib::SystemClock) (the OS monotonic clock); inject a
82/// [`ManualClock`](clock_lib::ManualClock) with [`with_clock`](Self::with_clock)
83/// to drive time by hand in tests. `Bucket` is `Send + Sync` whenever its clock
84/// is, which every [`Clock`] implementation guarantees.
85///
86/// # Limits
87///
88/// The packed representation caps capacity at about 4.29 million tokens
89/// (`u32::MAX` millitokens). Time is tracked as 32-bit milliseconds since
90/// construction and wraps every ~49.7 days; the wrap is handled, so an
91/// actively-used bucket refills correctly indefinitely (only a bucket idle for
92/// longer than that may under-refill once, safely, on its next use).
93///
94/// # Examples
95///
96/// The one-line common case:
97///
98/// ```
99/// use better_bucket::Bucket;
100///
101/// let bucket = Bucket::per_second(100);
102/// if bucket.try_acquire(1) {
103/// // allowed — do the work
104/// }
105/// ```
106#[repr(align(64))]
107pub struct Bucket<C: Clock = SystemClock> {
108 /// Packed `(millitokens << 32) | last_ms`. The only mutable state, and the
109 /// single point of synchronisation.
110 state: AtomicU64,
111 /// Capacity in millitokens, already clamped to the 32-bit packing ceiling.
112 capacity_millitokens: u64,
113 /// Refill rate as millitokens-per-millisecond in `Q(REFILL_FRAC_BITS)` fixed
114 /// point. Derived once from `refill_amount` and the period in nanoseconds, so
115 /// the hot path needs only a multiply and a shift and sub-millisecond periods
116 /// stay accurate. Zero means no refill.
117 refill_per_ms_q: u128,
118 /// The monotonic anchor that `last_ms` is measured from.
119 created_at: Monotonic,
120 /// The original configuration, kept for [`config`](Self::config).
121 config: BucketConfig,
122 /// The injected time source.
123 clock: C,
124}
125
126/// Constructs a bucket from a finished config and a clock, anchoring the refill
127/// clock at the supplied clock's current reading and filling to `initial`.
128fn build<C: Clock>(config: BucketConfig, clock: C) -> Bucket<C> {
129 let created_at = clock.now();
130 let capacity_millitokens = u64::from(config.capacity())
131 .saturating_mul(MILLI)
132 .min(u64::from(u32::MAX));
133 // Millitokens per millisecond = refill_amount * 1000 / period_ms
134 // = refill_amount * 1e9 / period_nanos,
135 // kept in Q(REFILL_FRAC_BITS) fixed point. Computing from nanoseconds keeps
136 // sub-millisecond periods accurate; the one division happens here, not on the
137 // acquire path. `0` disables refill.
138 let refill_per_ms_q = if config.refill_amount() == 0 || config.refill_period().is_zero() {
139 0
140 } else {
141 let period_nanos = u64::try_from(config.refill_period().as_nanos())
142 .unwrap_or(u64::MAX)
143 .max(1);
144 let numerator = (u128::from(config.refill_amount()) * 1_000_000_000) << REFILL_FRAC_BITS;
145 numerator / u128::from(period_nanos)
146 };
147 let initial_millitokens = (u64::from(config.initial()) * MILLI).min(capacity_millitokens);
148 Bucket {
149 state: AtomicU64::new(pack(initial_millitokens, 0)),
150 capacity_millitokens,
151 refill_per_ms_q,
152 created_at,
153 config,
154 clock,
155 }
156}
157
158impl Bucket<SystemClock> {
159 /// Creates a bucket of capacity `rate` that refills `rate` tokens per
160 /// second, starting full, driven by the OS monotonic clock.
161 ///
162 /// This is the headline Tier-1 constructor. A `rate` of `0` yields a bucket
163 /// that grants nothing (capacity `0`); use [`BucketConfig::new`] when you
164 /// want zero rejected as an error.
165 ///
166 /// # Examples
167 ///
168 /// ```
169 /// use better_bucket::Bucket;
170 ///
171 /// let bucket = Bucket::per_second(50);
172 /// assert_eq!(bucket.capacity(), 50);
173 /// assert!(bucket.try_acquire(1));
174 /// ```
175 #[must_use]
176 pub fn per_second(rate: u32) -> Self {
177 Self::from_config(tier1_config(rate, rate, Duration::from_secs(1), rate))
178 }
179
180 /// Creates a bucket of capacity `amount` that refills `amount` tokens every
181 /// `period`, starting full, driven by the OS monotonic clock.
182 ///
183 /// Use this when the natural rate is not per-second — e.g. 5 tokens per 100
184 /// milliseconds, or 1000 per minute. An `amount` of `0` or a zero `period`
185 /// yields a bucket that grants nothing.
186 ///
187 /// # Examples
188 ///
189 /// ```
190 /// use better_bucket::Bucket;
191 /// use std::time::Duration;
192 ///
193 /// // 5 tokens every 100ms.
194 /// let bucket = Bucket::per_duration(5, Duration::from_millis(100));
195 /// assert_eq!(bucket.capacity(), 5);
196 /// ```
197 #[must_use]
198 pub fn per_duration(amount: u32, period: Duration) -> Self {
199 Self::from_config(tier1_config(amount, amount, period, amount))
200 }
201
202 /// Creates a bucket from a validated [`BucketConfig`], driven by the OS
203 /// monotonic clock.
204 ///
205 /// Use this when you need full control over capacity, rate, and initial
206 /// fill independently (e.g. a large burst ceiling with a slow refill, or a
207 /// bucket that starts empty).
208 ///
209 /// # Examples
210 ///
211 /// ```
212 /// use better_bucket::{Bucket, BucketConfig};
213 /// use std::time::Duration;
214 ///
215 /// // 500-token burst, 100/sec refill, starting empty.
216 /// let config = BucketConfig::new(500, 100, Duration::from_secs(1), 0)?;
217 /// let bucket = Bucket::from_config(config);
218 /// assert_eq!(bucket.available(), 0);
219 /// # Ok::<(), better_bucket::BucketError>(())
220 /// ```
221 #[must_use]
222 pub fn from_config(config: BucketConfig) -> Self {
223 build(config, SystemClock::new())
224 }
225}
226
227impl<C: Clock> Bucket<C> {
228 /// Replaces the bucket's time source, resetting it to its initial fill
229 /// anchored at the new clock's current reading.
230 ///
231 /// This is the clock-injection seam. The intended use is immediately after
232 /// construction — chiefly in tests, where injecting a
233 /// [`ManualClock`](clock_lib::ManualClock) makes refill behaviour
234 /// deterministic with no `sleep`.
235 ///
236 /// # Examples
237 ///
238 /// ```
239 /// use better_bucket::Bucket;
240 /// use clock_lib::ManualClock;
241 /// use std::sync::Arc;
242 /// use std::time::Duration;
243 ///
244 /// let clock = Arc::new(ManualClock::new());
245 /// let bucket = Bucket::per_second(10).with_clock(Arc::clone(&clock));
246 ///
247 /// assert!(bucket.try_acquire(10)); // drain it
248 /// assert!(!bucket.try_acquire(1)); // empty
249 ///
250 /// clock.advance(Duration::from_secs(1)); // no real sleep
251 /// assert_eq!(bucket.available(), 10); // fully refilled
252 /// ```
253 #[must_use]
254 pub fn with_clock<C2: Clock>(self, clock: C2) -> Bucket<C2> {
255 build(self.config, clock)
256 }
257
258 /// Attempts to take `n` tokens, returning the full [`Decision`].
259 ///
260 /// Brings the bucket current (lazy refill) and, if at least `n` tokens are
261 /// available, deducts them and returns [`Decision::Allowed`]. Otherwise the
262 /// bucket is left untouched and [`Decision::Denied`] carries the minimum
263 /// wait until the request would succeed. Requesting `0` always succeeds;
264 /// requesting more than the capacity can never succeed (the denial's
265 /// `retry_after` is [`Duration::MAX`]).
266 ///
267 /// This never blocks and never allocates.
268 ///
269 /// # Examples
270 ///
271 /// ```
272 /// use better_bucket::{Bucket, Decision};
273 ///
274 /// let bucket = Bucket::per_second(5);
275 /// assert_eq!(bucket.acquire(3), Decision::Allowed);
276 /// assert_eq!(bucket.available(), 2);
277 /// ```
278 pub fn acquire(&self, n: u32) -> Decision {
279 self.acquire_inner(n)
280 }
281
282 /// Attempts to take `n` tokens, returning whether it succeeded.
283 ///
284 /// The one-line convenience over [`acquire`](Self::acquire): equivalent to
285 /// `self.acquire(n).is_allowed()`, for the common case where you only need
286 /// allow/deny and not the retry hint.
287 ///
288 /// # Examples
289 ///
290 /// ```
291 /// use better_bucket::Bucket;
292 ///
293 /// let bucket = Bucket::per_second(1);
294 /// assert!(bucket.try_acquire(1));
295 /// assert!(!bucket.try_acquire(1)); // drained
296 /// ```
297 #[must_use]
298 pub fn try_acquire(&self, n: u32) -> bool {
299 self.acquire_inner(n).is_allowed()
300 }
301
302 /// Returns how many whole tokens are available right now, after lazy refill.
303 ///
304 /// This is a momentary snapshot; under concurrent acquires it can be stale
305 /// the instant it returns. Treat it as advisory.
306 ///
307 /// # Examples
308 ///
309 /// ```
310 /// use better_bucket::Bucket;
311 ///
312 /// let bucket = Bucket::per_second(10);
313 /// assert_eq!(bucket.available(), 10);
314 /// assert!(bucket.try_acquire(4));
315 /// assert_eq!(bucket.available(), 6);
316 /// ```
317 #[must_use]
318 pub fn available(&self) -> u32 {
319 let now_ms = self.now_ms();
320 let (tokens_mt, last_ms) = unpack(self.state.load(Ordering::Relaxed));
321 let refilled = self.refilled(tokens_mt, last_ms, now_ms);
322 u32::try_from(refilled / MILLI).unwrap_or(u32::MAX)
323 }
324
325 /// Returns the bucket's capacity (its burst ceiling), in whole tokens.
326 ///
327 /// # Examples
328 ///
329 /// ```
330 /// use better_bucket::Bucket;
331 ///
332 /// assert_eq!(Bucket::per_second(64).capacity(), 64);
333 /// ```
334 #[must_use]
335 pub const fn capacity(&self) -> u32 {
336 (self.capacity_millitokens / MILLI) as u32
337 }
338
339 /// Returns the configuration this bucket was built from.
340 ///
341 /// # Examples
342 ///
343 /// ```
344 /// use better_bucket::Bucket;
345 /// use std::time::Duration;
346 ///
347 /// let bucket = Bucket::per_second(10);
348 /// assert_eq!(bucket.config().refill_period(), Duration::from_secs(1));
349 /// ```
350 #[must_use]
351 pub const fn config(&self) -> BucketConfig {
352 self.config
353 }
354
355 /// Refills the bucket to full and marks it current as of now.
356 ///
357 /// Use it to discard accumulated debt and grant a fresh burst — for example
358 /// at the start of a new billing window, or to clear a backlog after a
359 /// dependency recovers. Long uptime needs no special handling: the
360 /// millisecond counter wraps safely, so an actively-used bucket never stalls.
361 ///
362 /// # Examples
363 ///
364 /// ```
365 /// use better_bucket::Bucket;
366 ///
367 /// let bucket = Bucket::per_second(4);
368 /// assert!(bucket.try_acquire(4));
369 /// assert_eq!(bucket.available(), 0);
370 /// bucket.reset();
371 /// assert_eq!(bucket.available(), 4);
372 /// ```
373 pub fn reset(&self) {
374 let now_ms = self.now_ms();
375 self.state
376 .store(pack(self.capacity_millitokens, now_ms), Ordering::Relaxed);
377 }
378
379 /// Milliseconds since `created_at`, truncated into the 32-bit time field.
380 ///
381 /// The field wraps every ~49.7 days. [`refilled`](Self::refilled) computes
382 /// the elapsed interval with `wrapping_sub`, so the result is correct for
383 /// any gap shorter than that — i.e. for any bucket used at least once per
384 /// ~49.7 days, which covers every real limiter. A bucket left fully idle for
385 /// longer may under-refill once on its next use (safe and self-correcting).
386 #[inline]
387 fn now_ms(&self) -> u32 {
388 let elapsed = self.clock.now().saturating_duration_since(self.created_at);
389 (elapsed.as_millis() & u128::from(u32::MAX)) as u32
390 }
391
392 /// The millitoken count after refilling over `last_ms → now_ms`, capped at
393 /// capacity. Saturating throughout: a huge elapsed gap fills to capacity, it
394 /// can never wrap or overflow. When no whole millisecond has elapsed since
395 /// the last refill — the common case under bursty load — it returns
396 /// immediately, doing no arithmetic at all.
397 #[inline]
398 fn refilled(&self, tokens_mt: u64, last_ms: u32, now_ms: u32) -> u64 {
399 if self.refill_per_ms_q == 0 {
400 return tokens_mt;
401 }
402 // `wrapping_sub` is correct because the 32-bit ms field wraps: the true
403 // elapsed interval is recovered for any gap shorter than ~49.7 days.
404 let elapsed_ms = now_ms.wrapping_sub(last_ms);
405 if elapsed_ms == 0 {
406 return tokens_mt;
407 }
408 let added = u128::from(elapsed_ms).saturating_mul(self.refill_per_ms_q) >> REFILL_FRAC_BITS;
409 let added_mt = u64::try_from(added).unwrap_or(u64::MAX);
410 tokens_mt
411 .saturating_add(added_mt)
412 .min(self.capacity_millitokens)
413 }
414
415 /// The minimum time for `deficit_mt` millitokens to accrue, rounded up.
416 /// [`Duration::MAX`] if the bucket never refills.
417 fn time_for(&self, deficit_mt: u64) -> Duration {
418 if self.refill_per_ms_q == 0 {
419 return Duration::MAX;
420 }
421 // ms = ceil(deficit_mt / rate_per_ms) = ceil((deficit_mt << FRAC) / q).
422 let numerator =
423 (u128::from(deficit_mt) << REFILL_FRAC_BITS).saturating_add(self.refill_per_ms_q - 1);
424 let millis = numerator / self.refill_per_ms_q;
425 Duration::from_millis(u64::try_from(millis).unwrap_or(u64::MAX))
426 }
427
428 #[inline]
429 fn acquire_inner(&self, n: u32) -> Decision {
430 if n == 0 {
431 // Zero tokens are always available, even from an empty bucket.
432 return Decision::Allowed;
433 }
434 let need_mt = u64::from(n) * MILLI;
435 if need_mt > self.capacity_millitokens {
436 // More than the bucket can ever hold: it can never be granted.
437 return Decision::Denied {
438 retry_after: Duration::MAX,
439 };
440 }
441
442 let now_ms = self.now_ms();
443 loop {
444 let current = self.state.load(Ordering::Relaxed);
445 let (tokens_mt, last_ms) = unpack(current);
446 let refilled = self.refilled(tokens_mt, last_ms, now_ms);
447 if refilled < need_mt {
448 // Denied: report the wait for the shortfall to accrue. No write,
449 // so a denied request never contends with a granting one.
450 return Decision::Denied {
451 retry_after: self.time_for(need_mt - refilled),
452 };
453 }
454 let next = pack(refilled - need_mt, now_ms);
455 // Relaxed is sufficient: the only shared state is this word, and the
456 // CAS gives the read-modify-write atomicity the no-over-grant
457 // contract depends on. A spurious or lost race retries.
458 if self
459 .state
460 .compare_exchange_weak(current, next, Ordering::Relaxed, Ordering::Relaxed)
461 .is_ok()
462 {
463 return Decision::Allowed;
464 }
465 }
466 }
467}
468
469impl<C: Clock> core::fmt::Debug for Bucket<C> {
470 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
471 f.debug_struct("Bucket")
472 .field("capacity", &self.capacity())
473 .field("available", &self.available())
474 .field("config", &self.config)
475 .finish()
476 }
477}
478
479/// The token-bucket surface a consumer depends on.
480///
481/// `TokenBucket` is the abstraction `rate-net` (and any other consumer) codes
482/// against, so it can hold a bucket without naming its concrete clock type. It
483/// mirrors the inherent methods of [`Bucket`]; see those for the detailed
484/// contract of each.
485pub trait TokenBucket {
486 /// Attempts to take `n` tokens, returning the full [`Decision`].
487 fn acquire(&self, n: u32) -> Decision;
488
489 /// Attempts to take `n` tokens, returning whether it succeeded.
490 #[must_use]
491 fn try_acquire(&self, n: u32) -> bool;
492
493 /// Returns the whole tokens available right now, after lazy refill.
494 #[must_use]
495 fn available(&self) -> u32;
496
497 /// Returns the bucket's capacity (its burst ceiling).
498 #[must_use]
499 fn capacity(&self) -> u32;
500}
501
502impl<C: Clock> TokenBucket for Bucket<C> {
503 fn acquire(&self, n: u32) -> Decision {
504 self.acquire_inner(n)
505 }
506
507 fn try_acquire(&self, n: u32) -> bool {
508 self.acquire_inner(n).is_allowed()
509 }
510
511 fn available(&self) -> u32 {
512 Bucket::available(self)
513 }
514
515 fn capacity(&self) -> u32 {
516 self.capacity()
517 }
518}
519
520#[cfg(all(test, not(loom)))]
521mod tests {
522 #![allow(clippy::unwrap_used)]
523
524 use super::{Bucket, TokenBucket};
525 use crate::decision::Decision;
526 use clock_lib::{ManualClock, SystemClock};
527 use core::time::Duration;
528 use std::sync::Arc;
529 use std::thread;
530
531 /// A bucket driven by a `ManualClock` the test controls, so refill is
532 /// deterministic with no real time passing.
533 fn manual_bucket(rate: u32) -> (Arc<ManualClock>, Bucket<Arc<ManualClock>>) {
534 let clock = Arc::new(ManualClock::new());
535 let bucket = Bucket::per_second(rate).with_clock(Arc::clone(&clock));
536 (clock, bucket)
537 }
538
539 #[test]
540 fn test_bucket_is_send_and_sync() {
541 fn assert_send_sync<T: Send + Sync>() {}
542 assert_send_sync::<Bucket<SystemClock>>();
543 assert_send_sync::<Bucket<Arc<ManualClock>>>();
544 }
545
546 #[test]
547 fn test_starts_full() {
548 let (_clock, bucket) = manual_bucket(10);
549 assert_eq!(bucket.available(), 10);
550 assert_eq!(bucket.capacity(), 10);
551 }
552
553 #[test]
554 fn test_acquire_deducts_tokens() {
555 let (_clock, bucket) = manual_bucket(10);
556 assert_eq!(bucket.acquire(3), Decision::Allowed);
557 assert_eq!(bucket.available(), 7);
558 }
559
560 #[test]
561 fn test_exact_empty_then_denied() {
562 let (_clock, bucket) = manual_bucket(10);
563 assert!(bucket.try_acquire(10)); // exact-empty
564 assert_eq!(bucket.available(), 0);
565 assert!(!bucket.try_acquire(1));
566 }
567
568 #[test]
569 fn test_acquire_zero_always_allowed() {
570 let (_clock, bucket) = manual_bucket(1);
571 assert!(bucket.try_acquire(1)); // drain
572 assert!(!bucket.try_acquire(1));
573 assert!(bucket.try_acquire(0)); // still allowed when empty
574 }
575
576 #[test]
577 fn test_request_above_capacity_never_grantable() {
578 let (_clock, bucket) = manual_bucket(5);
579 assert_eq!(
580 bucket.acquire(6),
581 Decision::Denied {
582 retry_after: Duration::MAX
583 }
584 );
585 }
586
587 #[test]
588 fn test_full_refill_after_one_period() {
589 let (clock, bucket) = manual_bucket(10);
590 assert!(bucket.try_acquire(10));
591 assert!(!bucket.try_acquire(1));
592 clock.advance(Duration::from_secs(1));
593 assert_eq!(bucket.available(), 10);
594 assert!(bucket.try_acquire(10));
595 }
596
597 #[test]
598 fn test_partial_refill_is_proportional() {
599 let (clock, bucket) = manual_bucket(100);
600 assert!(bucket.try_acquire(100));
601 clock.advance(Duration::from_millis(250)); // a quarter second
602 assert_eq!(bucket.available(), 25);
603 }
604
605 #[test]
606 fn test_refill_saturates_at_capacity() {
607 let (clock, bucket) = manual_bucket(10);
608 assert!(bucket.try_acquire(10));
609 clock.advance(Duration::from_secs(100)); // would be 1000 tokens
610 assert_eq!(bucket.available(), 10); // clamped to capacity
611 }
612
613 #[test]
614 fn test_refill_after_long_idle_saturates_without_overflow() {
615 // A multi-year idle gap must fill to capacity, never wrap or overflow.
616 let (clock, bucket) = manual_bucket(1_000);
617 assert!(bucket.try_acquire(1_000));
618 clock.advance(Duration::from_secs(60 * 60 * 24 * 365 * 5));
619 assert_eq!(bucket.available(), 1_000);
620 // And the bucket is still usable afterwards.
621 assert!(bucket.try_acquire(1_000));
622 }
623
624 #[test]
625 fn test_denied_reports_retry_after() {
626 let (_clock, bucket) = manual_bucket(10);
627 assert!(bucket.try_acquire(10)); // empty
628 // Five tokens at 10/sec accrue in 500ms.
629 assert_eq!(
630 bucket.acquire(5),
631 Decision::Denied {
632 retry_after: Duration::from_millis(500)
633 }
634 );
635 }
636
637 #[test]
638 fn test_per_duration_uses_custom_period() {
639 let clock = Arc::new(ManualClock::new());
640 let bucket =
641 Bucket::per_duration(5, Duration::from_millis(100)).with_clock(Arc::clone(&clock));
642 assert!(bucket.try_acquire(5));
643 clock.advance(Duration::from_millis(100));
644 assert_eq!(bucket.available(), 5);
645 }
646
647 #[test]
648 fn test_sub_millisecond_period_still_refills() {
649 // 5 tokens per 200µs ⇒ 25 tokens/ms. The millisecond tick is coarse but
650 // the rate is computed from nanoseconds, so a full ms refills fully.
651 let clock = Arc::new(ManualClock::new());
652 let bucket =
653 Bucket::per_duration(5, Duration::from_micros(200)).with_clock(Arc::clone(&clock));
654 assert!(bucket.try_acquire(5));
655 clock.advance(Duration::from_millis(1));
656 assert_eq!(bucket.available(), 5); // capped at capacity
657 }
658
659 #[test]
660 fn test_zero_rate_is_deny_all() {
661 let bucket = Bucket::per_second(0);
662 assert_eq!(bucket.capacity(), 0);
663 assert_eq!(bucket.available(), 0);
664 assert!(!bucket.try_acquire(1));
665 assert!(bucket.try_acquire(0));
666 }
667
668 #[test]
669 fn test_reset_refills_to_capacity() {
670 let (_clock, bucket) = manual_bucket(5);
671 assert!(bucket.try_acquire(5));
672 assert_eq!(bucket.available(), 0);
673 bucket.reset();
674 assert_eq!(bucket.available(), 5);
675 }
676
677 #[test]
678 fn test_trait_object_safe_surface() {
679 let (_clock, bucket) = manual_bucket(4);
680 let as_trait: &dyn TokenBucket = &bucket;
681 assert_eq!(as_trait.capacity(), 4);
682 assert!(as_trait.try_acquire(4));
683 assert!(!as_trait.try_acquire(1));
684 }
685
686 #[test]
687 fn test_concurrent_acquire_never_over_grants() {
688 // 100 tokens, no refill (clock never advances). Eight threads each
689 // demand 30 — total demand 240, available 100. Under a correct CAS,
690 // exactly 100 succeed: no over-grant (≤ 100) and no lost token (= 100).
691 let clock = Arc::new(ManualClock::new());
692 let bucket = Arc::new(Bucket::per_second(100).with_clock(clock));
693 let threads = 8;
694 let demand = 30u32;
695
696 let handles: Vec<_> = (0..threads)
697 .map(|_| {
698 let bucket = Arc::clone(&bucket);
699 thread::spawn(move || {
700 let mut taken = 0u32;
701 for _ in 0..demand {
702 if bucket.try_acquire(1) {
703 taken += 1;
704 }
705 }
706 taken
707 })
708 })
709 .collect();
710
711 let total: u32 = handles.into_iter().map(|h| h.join().unwrap()).sum();
712 assert_eq!(total, 100, "CAS bucket must grant exactly capacity");
713 assert_eq!(bucket.available(), 0);
714 }
715
716 #[test]
717 fn test_pack_unpack_round_trip() {
718 for &mt in &[0_u64, 1, 1_000, 50_000, u64::from(u32::MAX)] {
719 for &ms in &[0_u32, 1, 1_000, u32::MAX] {
720 let (got_mt, got_ms) = super::unpack(super::pack(mt, ms));
721 assert_eq!(got_mt, mt.min(u64::from(u32::MAX)));
722 assert_eq!(got_ms, ms);
723 }
724 }
725 }
726}