Skip to main content

better_bucket/
bucket.rs

1//! The single token bucket and the `TokenBucket` trait.
2//!
3//! This is the lock-free core. All mutable state lives in one `AtomicU64` that
4//! packs the current token count and the time of the last refill; `try_acquire`
5//! is a single `compare_exchange_weak` loop with lazy refill computed from the
6//! injected monotonic clock. There is no lock and no allocation on the acquire
7//! path, and the bucket is cache-line aligned so independent buckets never
8//! falsely share. The public surface is identical to the `0.2` foundation
9//! release — only the internals changed.
10//!
11//! # Packing
12//!
13//! The state word is split:
14//! - **upper 32 bits** — tokens in *millitokens* (thousandths of a token), for
15//!   sub-token refill resolution. Capped at [`u32::MAX`] millitokens, so the
16//!   effective capacity ceiling is about 4.29 million tokens.
17//! - **lower 32 bits** — milliseconds since the bucket's `created_at` anchor of
18//!   the last refill computation.
19//!
20//! The millisecond field wraps every ~49.7 days. Elapsed time is computed with
21//! `wrapping_sub`, so refill stays correct for any gap shorter than that — i.e.
22//! for any bucket used at least once per ~49.7 days, which is every real
23//! limiter. A bucket left fully idle for longer may under-refill once on its
24//! next use, a safe and self-correcting outcome.
25
26use core::time::Duration;
27
28#[cfg(loom)]
29use loom::sync::atomic::{AtomicU64, Ordering};
30
31use clock_lib::{Clock, Monotonic, SystemClock};
32#[cfg(not(loom))]
33use core::sync::atomic::{AtomicU64, Ordering};
34
35use crate::config::BucketConfig;
36use crate::decision::Decision;
37
38/// Millitokens per whole token.
39const MILLI: u64 = 1_000;
40
41/// Fractional bits for the precomputed refill rate. The rate is stored as
42/// millitokens-per-millisecond in `Q(REFILL_FRAC_BITS)` fixed point so the hot
43/// path is a multiply and a shift — no division per acquire.
44const REFILL_FRAC_BITS: u32 = 22;
45
46/// Packs `millitokens` (clamped to 32 bits) and `last_ms` into one word.
47#[inline]
48fn pack(millitokens: u64, last_ms: u32) -> u64 {
49    (millitokens.min(u64::from(u32::MAX)) << 32) | u64::from(last_ms)
50}
51
52/// Unpacks the state word into `(millitokens, last_ms)`.
53#[inline]
54fn unpack(state: u64) -> (u64, u32) {
55    (state >> 32, (state & u64::from(u32::MAX)) as u32)
56}
57
58/// Maps a Tier-1 request to a config, collapsing a degenerate request (zero
59/// capacity, amount, or period) to a bucket that grants nothing. That is
60/// well-defined and safe rather than a panic; [`BucketConfig::new`] is the
61/// validated, error-returning path.
62fn tier1_config(capacity: u32, amount: u32, period: Duration, initial: u32) -> BucketConfig {
63    if capacity == 0 || amount == 0 || period.is_zero() {
64        BucketConfig::raw(0, 0, Duration::from_secs(1), 0)
65    } else {
66        BucketConfig::raw(capacity, amount, period, initial)
67    }
68}
69
70/// A token bucket: a counter that refills over time and grants tokens on demand.
71///
72/// A bucket holds up to its capacity in tokens, accrues more at a fixed rate,
73/// and hands them out when asked. The hot path is **lock-free** — a single
74/// `compare_exchange_weak` on a packed atomic word — and **allocation-free**.
75/// Refill is **lazy**: there is no background thread or timer, the token count
76/// is brought current from the monotonic clock the instant you call
77/// [`acquire`](Self::acquire), [`try_acquire`](Self::try_acquire), or
78/// [`available`](Self::available).
79///
80/// The type parameter `C` is the time source. It defaults to
81/// [`SystemClock`](clock_lib::SystemClock) (the OS monotonic clock); inject a
82/// [`ManualClock`](clock_lib::ManualClock) with [`with_clock`](Self::with_clock)
83/// to drive time by hand in tests. `Bucket` is `Send + Sync` whenever its clock
84/// is, which every [`Clock`] implementation guarantees.
85///
86/// # Limits
87///
88/// The packed representation caps capacity at about 4.29 million tokens
89/// (`u32::MAX` millitokens). Time is tracked as 32-bit milliseconds since
90/// construction and wraps every ~49.7 days; the wrap is handled, so an
91/// actively-used bucket refills correctly indefinitely (only a bucket idle for
92/// longer than that may under-refill once, safely, on its next use).
93///
94/// # Examples
95///
96/// The one-line common case:
97///
98/// ```
99/// use better_bucket::Bucket;
100///
101/// let bucket = Bucket::per_second(100);
102/// if bucket.try_acquire(1) {
103///     // allowed — do the work
104/// }
105/// ```
106#[repr(align(64))]
107pub struct Bucket<C: Clock = SystemClock> {
108    /// Packed `(millitokens << 32) | last_ms`. The only mutable state, and the
109    /// single point of synchronisation.
110    state: AtomicU64,
111    /// Capacity in millitokens, already clamped to the 32-bit packing ceiling.
112    capacity_millitokens: u64,
113    /// Refill rate as millitokens-per-millisecond in `Q(REFILL_FRAC_BITS)` fixed
114    /// point. Derived once from `refill_amount` and the period in nanoseconds, so
115    /// the hot path needs only a multiply and a shift and sub-millisecond periods
116    /// stay accurate. Zero means no refill.
117    refill_per_ms_q: u128,
118    /// The monotonic anchor that `last_ms` is measured from.
119    created_at: Monotonic,
120    /// The original configuration, kept for [`config`](Self::config).
121    config: BucketConfig,
122    /// The injected time source.
123    clock: C,
124}
125
126/// Constructs a bucket from a finished config and a clock, anchoring the refill
127/// clock at the supplied clock's current reading and filling to `initial`.
128fn build<C: Clock>(config: BucketConfig, clock: C) -> Bucket<C> {
129    let created_at = clock.now();
130    let capacity_millitokens = u64::from(config.capacity())
131        .saturating_mul(MILLI)
132        .min(u64::from(u32::MAX));
133    // Millitokens per millisecond = refill_amount * 1000 / period_ms
134    //                             = refill_amount * 1e9 / period_nanos,
135    // kept in Q(REFILL_FRAC_BITS) fixed point. Computing from nanoseconds keeps
136    // sub-millisecond periods accurate; the one division happens here, not on the
137    // acquire path. `0` disables refill.
138    let refill_per_ms_q = if config.refill_amount() == 0 || config.refill_period().is_zero() {
139        0
140    } else {
141        let period_nanos = u64::try_from(config.refill_period().as_nanos())
142            .unwrap_or(u64::MAX)
143            .max(1);
144        let numerator = (u128::from(config.refill_amount()) * 1_000_000_000) << REFILL_FRAC_BITS;
145        numerator / u128::from(period_nanos)
146    };
147    let initial_millitokens = (u64::from(config.initial()) * MILLI).min(capacity_millitokens);
148    Bucket {
149        state: AtomicU64::new(pack(initial_millitokens, 0)),
150        capacity_millitokens,
151        refill_per_ms_q,
152        created_at,
153        config,
154        clock,
155    }
156}
157
158impl Bucket<SystemClock> {
159    /// Creates a bucket of capacity `rate` that refills `rate` tokens per
160    /// second, starting full, driven by the OS monotonic clock.
161    ///
162    /// This is the headline Tier-1 constructor. A `rate` of `0` yields a bucket
163    /// that grants nothing (capacity `0`); use [`BucketConfig::new`] when you
164    /// want zero rejected as an error.
165    ///
166    /// # Examples
167    ///
168    /// ```
169    /// use better_bucket::Bucket;
170    ///
171    /// let bucket = Bucket::per_second(50);
172    /// assert_eq!(bucket.capacity(), 50);
173    /// assert!(bucket.try_acquire(1));
174    /// ```
175    #[must_use]
176    pub fn per_second(rate: u32) -> Self {
177        Self::from_config(tier1_config(rate, rate, Duration::from_secs(1), rate))
178    }
179
180    /// Creates a bucket of capacity `amount` that refills `amount` tokens every
181    /// `period`, starting full, driven by the OS monotonic clock.
182    ///
183    /// Use this when the natural rate is not per-second — e.g. 5 tokens per 100
184    /// milliseconds, or 1000 per minute. An `amount` of `0` or a zero `period`
185    /// yields a bucket that grants nothing.
186    ///
187    /// # Examples
188    ///
189    /// ```
190    /// use better_bucket::Bucket;
191    /// use std::time::Duration;
192    ///
193    /// // 5 tokens every 100ms.
194    /// let bucket = Bucket::per_duration(5, Duration::from_millis(100));
195    /// assert_eq!(bucket.capacity(), 5);
196    /// ```
197    #[must_use]
198    pub fn per_duration(amount: u32, period: Duration) -> Self {
199        Self::from_config(tier1_config(amount, amount, period, amount))
200    }
201
202    /// Creates a bucket from a validated [`BucketConfig`], driven by the OS
203    /// monotonic clock.
204    ///
205    /// Use this when you need full control over capacity, rate, and initial
206    /// fill independently (e.g. a large burst ceiling with a slow refill, or a
207    /// bucket that starts empty).
208    ///
209    /// # Examples
210    ///
211    /// ```
212    /// use better_bucket::{Bucket, BucketConfig};
213    /// use std::time::Duration;
214    ///
215    /// // 500-token burst, 100/sec refill, starting empty.
216    /// let config = BucketConfig::new(500, 100, Duration::from_secs(1), 0)?;
217    /// let bucket = Bucket::from_config(config);
218    /// assert_eq!(bucket.available(), 0);
219    /// # Ok::<(), better_bucket::BucketError>(())
220    /// ```
221    #[must_use]
222    pub fn from_config(config: BucketConfig) -> Self {
223        build(config, SystemClock::new())
224    }
225}
226
227impl<C: Clock> Bucket<C> {
228    /// Replaces the bucket's time source, resetting it to its initial fill
229    /// anchored at the new clock's current reading.
230    ///
231    /// This is the clock-injection seam. The intended use is immediately after
232    /// construction — chiefly in tests, where injecting a
233    /// [`ManualClock`](clock_lib::ManualClock) makes refill behaviour
234    /// deterministic with no `sleep`.
235    ///
236    /// # Examples
237    ///
238    /// ```
239    /// use better_bucket::Bucket;
240    /// use clock_lib::ManualClock;
241    /// use std::sync::Arc;
242    /// use std::time::Duration;
243    ///
244    /// let clock = Arc::new(ManualClock::new());
245    /// let bucket = Bucket::per_second(10).with_clock(Arc::clone(&clock));
246    ///
247    /// assert!(bucket.try_acquire(10)); // drain it
248    /// assert!(!bucket.try_acquire(1)); // empty
249    ///
250    /// clock.advance(Duration::from_secs(1)); // no real sleep
251    /// assert_eq!(bucket.available(), 10);  // fully refilled
252    /// ```
253    #[must_use]
254    pub fn with_clock<C2: Clock>(self, clock: C2) -> Bucket<C2> {
255        build(self.config, clock)
256    }
257
258    /// Attempts to take `n` tokens, returning the full [`Decision`].
259    ///
260    /// Brings the bucket current (lazy refill) and, if at least `n` tokens are
261    /// available, deducts them and returns [`Decision::Allowed`]. Otherwise the
262    /// bucket is left untouched and [`Decision::Denied`] carries the minimum
263    /// wait until the request would succeed. Requesting `0` always succeeds;
264    /// requesting more than the capacity can never succeed (the denial's
265    /// `retry_after` is [`Duration::MAX`]).
266    ///
267    /// This never blocks and never allocates.
268    ///
269    /// # Examples
270    ///
271    /// ```
272    /// use better_bucket::{Bucket, Decision};
273    ///
274    /// let bucket = Bucket::per_second(5);
275    /// assert_eq!(bucket.acquire(3), Decision::Allowed);
276    /// assert_eq!(bucket.available(), 2);
277    /// ```
278    pub fn acquire(&self, n: u32) -> Decision {
279        self.acquire_inner(n)
280    }
281
282    /// Attempts to take `n` tokens, returning whether it succeeded.
283    ///
284    /// The one-line convenience over [`acquire`](Self::acquire): equivalent to
285    /// `self.acquire(n).is_allowed()`, for the common case where you only need
286    /// allow/deny and not the retry hint.
287    ///
288    /// # Examples
289    ///
290    /// ```
291    /// use better_bucket::Bucket;
292    ///
293    /// let bucket = Bucket::per_second(1);
294    /// assert!(bucket.try_acquire(1));
295    /// assert!(!bucket.try_acquire(1)); // drained
296    /// ```
297    #[must_use]
298    pub fn try_acquire(&self, n: u32) -> bool {
299        self.acquire_inner(n).is_allowed()
300    }
301
302    /// Returns how many whole tokens are available right now, after lazy refill.
303    ///
304    /// This is a momentary snapshot; under concurrent acquires it can be stale
305    /// the instant it returns. Treat it as advisory.
306    ///
307    /// # Examples
308    ///
309    /// ```
310    /// use better_bucket::Bucket;
311    ///
312    /// let bucket = Bucket::per_second(10);
313    /// assert_eq!(bucket.available(), 10);
314    /// assert!(bucket.try_acquire(4));
315    /// assert_eq!(bucket.available(), 6);
316    /// ```
317    #[must_use]
318    pub fn available(&self) -> u32 {
319        let now_ms = self.now_ms();
320        let (tokens_mt, last_ms) = unpack(self.state.load(Ordering::Relaxed));
321        let refilled = self.refilled(tokens_mt, last_ms, now_ms);
322        u32::try_from(refilled / MILLI).unwrap_or(u32::MAX)
323    }
324
325    /// Returns the bucket's capacity (its burst ceiling), in whole tokens.
326    ///
327    /// # Examples
328    ///
329    /// ```
330    /// use better_bucket::Bucket;
331    ///
332    /// assert_eq!(Bucket::per_second(64).capacity(), 64);
333    /// ```
334    #[must_use]
335    pub const fn capacity(&self) -> u32 {
336        (self.capacity_millitokens / MILLI) as u32
337    }
338
339    /// Returns the configuration this bucket was built from.
340    ///
341    /// # Examples
342    ///
343    /// ```
344    /// use better_bucket::Bucket;
345    /// use std::time::Duration;
346    ///
347    /// let bucket = Bucket::per_second(10);
348    /// assert_eq!(bucket.config().refill_period(), Duration::from_secs(1));
349    /// ```
350    #[must_use]
351    pub const fn config(&self) -> BucketConfig {
352        self.config
353    }
354
355    /// Refills the bucket to full and marks it current as of now.
356    ///
357    /// Use it to discard accumulated debt and grant a fresh burst — for example
358    /// at the start of a new billing window, or to clear a backlog after a
359    /// dependency recovers. Long uptime needs no special handling: the
360    /// millisecond counter wraps safely, so an actively-used bucket never stalls.
361    ///
362    /// # Examples
363    ///
364    /// ```
365    /// use better_bucket::Bucket;
366    ///
367    /// let bucket = Bucket::per_second(4);
368    /// assert!(bucket.try_acquire(4));
369    /// assert_eq!(bucket.available(), 0);
370    /// bucket.reset();
371    /// assert_eq!(bucket.available(), 4);
372    /// ```
373    pub fn reset(&self) {
374        let now_ms = self.now_ms();
375        self.state
376            .store(pack(self.capacity_millitokens, now_ms), Ordering::Relaxed);
377    }
378
379    /// Milliseconds since `created_at`, truncated into the 32-bit time field.
380    ///
381    /// The field wraps every ~49.7 days. [`refilled`](Self::refilled) computes
382    /// the elapsed interval with `wrapping_sub`, so the result is correct for
383    /// any gap shorter than that — i.e. for any bucket used at least once per
384    /// ~49.7 days, which covers every real limiter. A bucket left fully idle for
385    /// longer may under-refill once on its next use (safe and self-correcting).
386    #[inline]
387    fn now_ms(&self) -> u32 {
388        let elapsed = self.clock.now().saturating_duration_since(self.created_at);
389        (elapsed.as_millis() & u128::from(u32::MAX)) as u32
390    }
391
392    /// The millitoken count after refilling over `last_ms → now_ms`, capped at
393    /// capacity. Saturating throughout: a huge elapsed gap fills to capacity, it
394    /// can never wrap or overflow. When no whole millisecond has elapsed since
395    /// the last refill — the common case under bursty load — it returns
396    /// immediately, doing no arithmetic at all.
397    #[inline]
398    fn refilled(&self, tokens_mt: u64, last_ms: u32, now_ms: u32) -> u64 {
399        if self.refill_per_ms_q == 0 {
400            return tokens_mt;
401        }
402        // `wrapping_sub` is correct because the 32-bit ms field wraps: the true
403        // elapsed interval is recovered for any gap shorter than ~49.7 days.
404        let elapsed_ms = now_ms.wrapping_sub(last_ms);
405        if elapsed_ms == 0 {
406            return tokens_mt;
407        }
408        let added = u128::from(elapsed_ms).saturating_mul(self.refill_per_ms_q) >> REFILL_FRAC_BITS;
409        let added_mt = u64::try_from(added).unwrap_or(u64::MAX);
410        tokens_mt
411            .saturating_add(added_mt)
412            .min(self.capacity_millitokens)
413    }
414
415    /// The minimum time for `deficit_mt` millitokens to accrue, rounded up.
416    /// [`Duration::MAX`] if the bucket never refills.
417    fn time_for(&self, deficit_mt: u64) -> Duration {
418        if self.refill_per_ms_q == 0 {
419            return Duration::MAX;
420        }
421        // ms = ceil(deficit_mt / rate_per_ms) = ceil((deficit_mt << FRAC) / q).
422        let numerator =
423            (u128::from(deficit_mt) << REFILL_FRAC_BITS).saturating_add(self.refill_per_ms_q - 1);
424        let millis = numerator / self.refill_per_ms_q;
425        Duration::from_millis(u64::try_from(millis).unwrap_or(u64::MAX))
426    }
427
428    #[inline]
429    fn acquire_inner(&self, n: u32) -> Decision {
430        if n == 0 {
431            // Zero tokens are always available, even from an empty bucket.
432            return Decision::Allowed;
433        }
434        let need_mt = u64::from(n) * MILLI;
435        if need_mt > self.capacity_millitokens {
436            // More than the bucket can ever hold: it can never be granted.
437            return Decision::Denied {
438                retry_after: Duration::MAX,
439            };
440        }
441
442        let now_ms = self.now_ms();
443        loop {
444            let current = self.state.load(Ordering::Relaxed);
445            let (tokens_mt, last_ms) = unpack(current);
446            let refilled = self.refilled(tokens_mt, last_ms, now_ms);
447            if refilled < need_mt {
448                // Denied: report the wait for the shortfall to accrue. No write,
449                // so a denied request never contends with a granting one.
450                return Decision::Denied {
451                    retry_after: self.time_for(need_mt - refilled),
452                };
453            }
454            let next = pack(refilled - need_mt, now_ms);
455            // Relaxed is sufficient: the only shared state is this word, and the
456            // CAS gives the read-modify-write atomicity the no-over-grant
457            // contract depends on. A spurious or lost race retries.
458            if self
459                .state
460                .compare_exchange_weak(current, next, Ordering::Relaxed, Ordering::Relaxed)
461                .is_ok()
462            {
463                return Decision::Allowed;
464            }
465        }
466    }
467}
468
469impl<C: Clock> core::fmt::Debug for Bucket<C> {
470    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
471        f.debug_struct("Bucket")
472            .field("capacity", &self.capacity())
473            .field("available", &self.available())
474            .field("config", &self.config)
475            .finish()
476    }
477}
478
479/// The token-bucket surface a consumer depends on.
480///
481/// `TokenBucket` is the abstraction `rate-net` (and any other consumer) codes
482/// against, so it can hold a bucket without naming its concrete clock type. It
483/// mirrors the inherent methods of [`Bucket`]; see those for the detailed
484/// contract of each.
485pub trait TokenBucket {
486    /// Attempts to take `n` tokens, returning the full [`Decision`].
487    fn acquire(&self, n: u32) -> Decision;
488
489    /// Attempts to take `n` tokens, returning whether it succeeded.
490    #[must_use]
491    fn try_acquire(&self, n: u32) -> bool;
492
493    /// Returns the whole tokens available right now, after lazy refill.
494    #[must_use]
495    fn available(&self) -> u32;
496
497    /// Returns the bucket's capacity (its burst ceiling).
498    #[must_use]
499    fn capacity(&self) -> u32;
500}
501
502impl<C: Clock> TokenBucket for Bucket<C> {
503    fn acquire(&self, n: u32) -> Decision {
504        self.acquire_inner(n)
505    }
506
507    fn try_acquire(&self, n: u32) -> bool {
508        self.acquire_inner(n).is_allowed()
509    }
510
511    fn available(&self) -> u32 {
512        Bucket::available(self)
513    }
514
515    fn capacity(&self) -> u32 {
516        self.capacity()
517    }
518}
519
520#[cfg(all(test, not(loom)))]
521mod tests {
522    #![allow(clippy::unwrap_used)]
523
524    use super::{Bucket, TokenBucket};
525    use crate::decision::Decision;
526    use clock_lib::{ManualClock, SystemClock};
527    use core::time::Duration;
528    use std::sync::Arc;
529    use std::thread;
530
531    /// A bucket driven by a `ManualClock` the test controls, so refill is
532    /// deterministic with no real time passing.
533    fn manual_bucket(rate: u32) -> (Arc<ManualClock>, Bucket<Arc<ManualClock>>) {
534        let clock = Arc::new(ManualClock::new());
535        let bucket = Bucket::per_second(rate).with_clock(Arc::clone(&clock));
536        (clock, bucket)
537    }
538
539    #[test]
540    fn test_bucket_is_send_and_sync() {
541        fn assert_send_sync<T: Send + Sync>() {}
542        assert_send_sync::<Bucket<SystemClock>>();
543        assert_send_sync::<Bucket<Arc<ManualClock>>>();
544    }
545
546    #[test]
547    fn test_starts_full() {
548        let (_clock, bucket) = manual_bucket(10);
549        assert_eq!(bucket.available(), 10);
550        assert_eq!(bucket.capacity(), 10);
551    }
552
553    #[test]
554    fn test_acquire_deducts_tokens() {
555        let (_clock, bucket) = manual_bucket(10);
556        assert_eq!(bucket.acquire(3), Decision::Allowed);
557        assert_eq!(bucket.available(), 7);
558    }
559
560    #[test]
561    fn test_exact_empty_then_denied() {
562        let (_clock, bucket) = manual_bucket(10);
563        assert!(bucket.try_acquire(10)); // exact-empty
564        assert_eq!(bucket.available(), 0);
565        assert!(!bucket.try_acquire(1));
566    }
567
568    #[test]
569    fn test_acquire_zero_always_allowed() {
570        let (_clock, bucket) = manual_bucket(1);
571        assert!(bucket.try_acquire(1)); // drain
572        assert!(!bucket.try_acquire(1));
573        assert!(bucket.try_acquire(0)); // still allowed when empty
574    }
575
576    #[test]
577    fn test_request_above_capacity_never_grantable() {
578        let (_clock, bucket) = manual_bucket(5);
579        assert_eq!(
580            bucket.acquire(6),
581            Decision::Denied {
582                retry_after: Duration::MAX
583            }
584        );
585    }
586
587    #[test]
588    fn test_full_refill_after_one_period() {
589        let (clock, bucket) = manual_bucket(10);
590        assert!(bucket.try_acquire(10));
591        assert!(!bucket.try_acquire(1));
592        clock.advance(Duration::from_secs(1));
593        assert_eq!(bucket.available(), 10);
594        assert!(bucket.try_acquire(10));
595    }
596
597    #[test]
598    fn test_partial_refill_is_proportional() {
599        let (clock, bucket) = manual_bucket(100);
600        assert!(bucket.try_acquire(100));
601        clock.advance(Duration::from_millis(250)); // a quarter second
602        assert_eq!(bucket.available(), 25);
603    }
604
605    #[test]
606    fn test_refill_saturates_at_capacity() {
607        let (clock, bucket) = manual_bucket(10);
608        assert!(bucket.try_acquire(10));
609        clock.advance(Duration::from_secs(100)); // would be 1000 tokens
610        assert_eq!(bucket.available(), 10); // clamped to capacity
611    }
612
613    #[test]
614    fn test_refill_after_long_idle_saturates_without_overflow() {
615        // A multi-year idle gap must fill to capacity, never wrap or overflow.
616        let (clock, bucket) = manual_bucket(1_000);
617        assert!(bucket.try_acquire(1_000));
618        clock.advance(Duration::from_secs(60 * 60 * 24 * 365 * 5));
619        assert_eq!(bucket.available(), 1_000);
620        // And the bucket is still usable afterwards.
621        assert!(bucket.try_acquire(1_000));
622    }
623
624    #[test]
625    fn test_denied_reports_retry_after() {
626        let (_clock, bucket) = manual_bucket(10);
627        assert!(bucket.try_acquire(10)); // empty
628        // Five tokens at 10/sec accrue in 500ms.
629        assert_eq!(
630            bucket.acquire(5),
631            Decision::Denied {
632                retry_after: Duration::from_millis(500)
633            }
634        );
635    }
636
637    #[test]
638    fn test_per_duration_uses_custom_period() {
639        let clock = Arc::new(ManualClock::new());
640        let bucket =
641            Bucket::per_duration(5, Duration::from_millis(100)).with_clock(Arc::clone(&clock));
642        assert!(bucket.try_acquire(5));
643        clock.advance(Duration::from_millis(100));
644        assert_eq!(bucket.available(), 5);
645    }
646
647    #[test]
648    fn test_sub_millisecond_period_still_refills() {
649        // 5 tokens per 200µs ⇒ 25 tokens/ms. The millisecond tick is coarse but
650        // the rate is computed from nanoseconds, so a full ms refills fully.
651        let clock = Arc::new(ManualClock::new());
652        let bucket =
653            Bucket::per_duration(5, Duration::from_micros(200)).with_clock(Arc::clone(&clock));
654        assert!(bucket.try_acquire(5));
655        clock.advance(Duration::from_millis(1));
656        assert_eq!(bucket.available(), 5); // capped at capacity
657    }
658
659    #[test]
660    fn test_zero_rate_is_deny_all() {
661        let bucket = Bucket::per_second(0);
662        assert_eq!(bucket.capacity(), 0);
663        assert_eq!(bucket.available(), 0);
664        assert!(!bucket.try_acquire(1));
665        assert!(bucket.try_acquire(0));
666    }
667
668    #[test]
669    fn test_reset_refills_to_capacity() {
670        let (_clock, bucket) = manual_bucket(5);
671        assert!(bucket.try_acquire(5));
672        assert_eq!(bucket.available(), 0);
673        bucket.reset();
674        assert_eq!(bucket.available(), 5);
675    }
676
677    #[test]
678    fn test_trait_object_safe_surface() {
679        let (_clock, bucket) = manual_bucket(4);
680        let as_trait: &dyn TokenBucket = &bucket;
681        assert_eq!(as_trait.capacity(), 4);
682        assert!(as_trait.try_acquire(4));
683        assert!(!as_trait.try_acquire(1));
684    }
685
686    #[test]
687    fn test_concurrent_acquire_never_over_grants() {
688        // 100 tokens, no refill (clock never advances). Eight threads each
689        // demand 30 — total demand 240, available 100. Under a correct CAS,
690        // exactly 100 succeed: no over-grant (≤ 100) and no lost token (= 100).
691        let clock = Arc::new(ManualClock::new());
692        let bucket = Arc::new(Bucket::per_second(100).with_clock(clock));
693        let threads = 8;
694        let demand = 30u32;
695
696        let handles: Vec<_> = (0..threads)
697            .map(|_| {
698                let bucket = Arc::clone(&bucket);
699                thread::spawn(move || {
700                    let mut taken = 0u32;
701                    for _ in 0..demand {
702                        if bucket.try_acquire(1) {
703                            taken += 1;
704                        }
705                    }
706                    taken
707                })
708            })
709            .collect();
710
711        let total: u32 = handles.into_iter().map(|h| h.join().unwrap()).sum();
712        assert_eq!(total, 100, "CAS bucket must grant exactly capacity");
713        assert_eq!(bucket.available(), 0);
714    }
715
716    #[test]
717    fn test_pack_unpack_round_trip() {
718        for &mt in &[0_u64, 1, 1_000, 50_000, u64::from(u32::MAX)] {
719            for &ms in &[0_u32, 1, 1_000, u32::MAX] {
720                let (got_mt, got_ms) = super::unpack(super::pack(mt, ms));
721                assert_eq!(got_mt, mt.min(u64::from(u32::MAX)));
722                assert_eq!(got_ms, ms);
723            }
724        }
725    }
726}