Skip to main content

better_bucket/
bucket.rs

1//! The single token bucket and the `TokenBucket` trait.
2//!
3//! This is the lock-free core. All mutable state lives in one `AtomicU64` that
4//! packs the current token count and the time of the last refill; `try_acquire`
5//! is a single `compare_exchange_weak` loop with lazy refill computed from the
6//! injected monotonic clock. There is no lock and no allocation on the acquire
7//! path, and the bucket is cache-line aligned so independent buckets never
8//! falsely share. The public surface is identical to the `0.2` foundation
9//! release — only the internals changed.
10//!
11//! # Packing
12//!
13//! The state word is split:
14//! - **upper 32 bits** — tokens in *millitokens* (thousandths of a token), for
15//!   sub-token refill resolution. Capped at [`u32::MAX`] millitokens, so the
16//!   effective capacity ceiling is about 4.29 million tokens.
17//! - **lower 32 bits** — milliseconds since the bucket's `created_at` anchor of
18//!   the last refill computation.
19//!
20//! The millisecond field wraps every ~49.7 days. Elapsed time is computed with
21//! `wrapping_sub`, so refill stays correct for any gap shorter than that — i.e.
22//! for any bucket used at least once per ~49.7 days, which is every real
23//! limiter. A bucket left fully idle for longer may under-refill once on its
24//! next use, a safe and self-correcting outcome.
25
26use core::time::Duration;
27
28#[cfg(loom)]
29use loom::sync::atomic::{AtomicU64, Ordering};
30
31use clock_lib::{Clock, Monotonic, SystemClock};
32#[cfg(not(loom))]
33use core::sync::atomic::{AtomicU64, Ordering};
34
35use crate::config::BucketConfig;
36use crate::decision::Decision;
37
38/// Millitokens per whole token.
39const MILLI: u64 = 1_000;
40
41/// Fractional bits for the precomputed refill rate. The rate is stored as
42/// millitokens-per-millisecond in `Q(REFILL_FRAC_BITS)` fixed point so the hot
43/// path is a multiply and a shift — no division per acquire.
44const REFILL_FRAC_BITS: u32 = 22;
45
46/// Packs `millitokens` (clamped to 32 bits) and `last_ms` into one word.
47#[inline]
48fn pack(millitokens: u64, last_ms: u32) -> u64 {
49    (millitokens.min(u64::from(u32::MAX)) << 32) | u64::from(last_ms)
50}
51
52/// Unpacks the state word into `(millitokens, last_ms)`.
53#[inline]
54fn unpack(state: u64) -> (u64, u32) {
55    (state >> 32, (state & u64::from(u32::MAX)) as u32)
56}
57
58/// Maps a Tier-1 request to a config, collapsing a degenerate request (zero
59/// capacity, amount, or period) to a bucket that grants nothing. That is
60/// well-defined and safe rather than a panic; [`BucketConfig::new`] is the
61/// validated, error-returning path.
62fn tier1_config(capacity: u32, amount: u32, period: Duration, initial: u32) -> BucketConfig {
63    if capacity == 0 || amount == 0 || period.is_zero() {
64        BucketConfig::raw(0, 0, Duration::from_secs(1), 0)
65    } else {
66        BucketConfig::raw(capacity, amount, period, initial)
67    }
68}
69
70/// A token bucket: a counter that refills over time and grants tokens on demand.
71///
72/// A bucket holds up to its capacity in tokens, accrues more at a fixed rate,
73/// and hands them out when asked. The hot path is **lock-free** — a single
74/// `compare_exchange_weak` on a packed atomic word — and **allocation-free**.
75/// Refill is **lazy**: there is no background thread or timer, the token count
76/// is brought current from the monotonic clock the instant you call
77/// [`acquire`](Self::acquire), [`try_acquire`](Self::try_acquire), or
78/// [`available`](Self::available).
79///
80/// The type parameter `C` is the time source. It defaults to
81/// [`SystemClock`](clock_lib::SystemClock) (the OS monotonic clock); inject a
82/// [`ManualClock`](clock_lib::ManualClock) with [`with_clock`](Self::with_clock)
83/// to drive time by hand in tests. `Bucket` is `Send + Sync` whenever its clock
84/// is, which every [`Clock`] implementation guarantees.
85///
86/// # Limits
87///
88/// The packed representation caps capacity at about 4.29 million tokens
89/// (`u32::MAX` millitokens). Time is tracked as 32-bit milliseconds since
90/// construction and wraps every ~49.7 days; the wrap is handled, so an
91/// actively-used bucket refills correctly indefinitely (only a bucket idle for
92/// longer than that may under-refill once, safely, on its next use).
93///
94/// # Examples
95///
96/// The one-line common case:
97///
98/// ```
99/// use better_bucket::Bucket;
100///
101/// let bucket = Bucket::per_second(100);
102/// if bucket.try_acquire(1) {
103///     // allowed — do the work
104/// }
105/// ```
106#[repr(align(64))]
107pub struct Bucket<C: Clock = SystemClock> {
108    /// Packed `(millitokens << 32) | last_ms`. The only mutable state, and the
109    /// single point of synchronisation.
110    state: AtomicU64,
111    /// Capacity in millitokens, already clamped to the 32-bit packing ceiling.
112    capacity_millitokens: u64,
113    /// Refill rate as millitokens-per-millisecond in `Q(REFILL_FRAC_BITS)` fixed
114    /// point. Derived once from `refill_amount` and the period in nanoseconds, so
115    /// the hot path needs only a multiply and a shift and sub-millisecond periods
116    /// stay accurate. Zero means no refill.
117    refill_per_ms_q: u128,
118    /// The monotonic anchor that `last_ms` is measured from.
119    created_at: Monotonic,
120    /// The original configuration, kept for [`config`](Self::config).
121    config: BucketConfig,
122    /// The injected time source.
123    clock: C,
124}
125
126/// Constructs a bucket from a finished config and a clock, anchoring the refill
127/// clock at the supplied clock's current reading and filling to `initial`.
128fn build<C: Clock>(config: BucketConfig, clock: C) -> Bucket<C> {
129    let created_at = clock.now();
130    let capacity_millitokens = u64::from(config.capacity())
131        .saturating_mul(MILLI)
132        .min(u64::from(u32::MAX));
133    // Millitokens per millisecond = refill_amount * 1000 / period_ms
134    //                             = refill_amount * 1e9 / period_nanos,
135    // kept in Q(REFILL_FRAC_BITS) fixed point. Computing from nanoseconds keeps
136    // sub-millisecond periods accurate; the one division happens here, not on the
137    // acquire path. `0` disables refill.
138    let refill_per_ms_q = if config.refill_amount() == 0 || config.refill_period().is_zero() {
139        0
140    } else {
141        let period_nanos = u64::try_from(config.refill_period().as_nanos())
142            .unwrap_or(u64::MAX)
143            .max(1);
144        let numerator = (u128::from(config.refill_amount()) * 1_000_000_000) << REFILL_FRAC_BITS;
145        numerator / u128::from(period_nanos)
146    };
147    let initial_millitokens = (u64::from(config.initial()) * MILLI).min(capacity_millitokens);
148    Bucket {
149        state: AtomicU64::new(pack(initial_millitokens, 0)),
150        capacity_millitokens,
151        refill_per_ms_q,
152        created_at,
153        config,
154        clock,
155    }
156}
157
158impl Bucket<SystemClock> {
159    /// Creates a bucket of capacity `rate` that refills `rate` tokens per
160    /// second, starting full, driven by the OS monotonic clock.
161    ///
162    /// This is the headline Tier-1 constructor. A `rate` of `0` yields a bucket
163    /// that grants nothing (capacity `0`); use [`BucketConfig::new`] when you
164    /// want zero rejected as an error.
165    ///
166    /// # Examples
167    ///
168    /// ```
169    /// use better_bucket::Bucket;
170    ///
171    /// let bucket = Bucket::per_second(50);
172    /// assert_eq!(bucket.capacity(), 50);
173    /// assert!(bucket.try_acquire(1));
174    /// ```
175    #[must_use]
176    pub fn per_second(rate: u32) -> Self {
177        Self::from_config(tier1_config(rate, rate, Duration::from_secs(1), rate))
178    }
179
180    /// Creates a bucket of capacity `amount` that refills `amount` tokens every
181    /// `period`, starting full, driven by the OS monotonic clock.
182    ///
183    /// Use this when the natural rate is not per-second — e.g. 5 tokens per 100
184    /// milliseconds, or 1000 per minute. An `amount` of `0` or a zero `period`
185    /// yields a bucket that grants nothing.
186    ///
187    /// # Examples
188    ///
189    /// ```
190    /// use better_bucket::Bucket;
191    /// use std::time::Duration;
192    ///
193    /// // 5 tokens every 100ms.
194    /// let bucket = Bucket::per_duration(5, Duration::from_millis(100));
195    /// assert_eq!(bucket.capacity(), 5);
196    /// ```
197    #[must_use]
198    pub fn per_duration(amount: u32, period: Duration) -> Self {
199        Self::from_config(tier1_config(amount, amount, period, amount))
200    }
201
202    /// Creates a bucket from a validated [`BucketConfig`], driven by the OS
203    /// monotonic clock.
204    ///
205    /// Use this when you need full control over capacity, rate, and initial
206    /// fill independently (e.g. a large burst ceiling with a slow refill, or a
207    /// bucket that starts empty).
208    ///
209    /// # Examples
210    ///
211    /// ```
212    /// use better_bucket::{Bucket, BucketConfig};
213    /// use std::time::Duration;
214    ///
215    /// // 500-token burst, 100/sec refill, starting empty.
216    /// let config = BucketConfig::new(500, 100, Duration::from_secs(1), 0)?;
217    /// let bucket = Bucket::from_config(config);
218    /// assert_eq!(bucket.available(), 0);
219    /// # Ok::<(), better_bucket::BucketError>(())
220    /// ```
221    #[must_use]
222    pub fn from_config(config: BucketConfig) -> Self {
223        build(config, SystemClock::new())
224    }
225}
226
227impl<C: Clock> Bucket<C> {
228    /// Replaces the bucket's time source, resetting it to its initial fill
229    /// anchored at the new clock's current reading.
230    ///
231    /// This is the clock-injection seam. The intended use is immediately after
232    /// construction — chiefly in tests, where injecting a
233    /// [`ManualClock`](clock_lib::ManualClock) makes refill behaviour
234    /// deterministic with no `sleep`.
235    ///
236    /// # Examples
237    ///
238    /// ```
239    /// use better_bucket::Bucket;
240    /// use clock_lib::ManualClock;
241    /// use std::sync::Arc;
242    /// use std::time::Duration;
243    ///
244    /// let clock = Arc::new(ManualClock::new());
245    /// let bucket = Bucket::per_second(10).with_clock(Arc::clone(&clock));
246    ///
247    /// assert!(bucket.try_acquire(10)); // drain it
248    /// assert!(!bucket.try_acquire(1)); // empty
249    ///
250    /// clock.advance(Duration::from_secs(1)); // no real sleep
251    /// assert_eq!(bucket.available(), 10);  // fully refilled
252    /// ```
253    #[must_use]
254    pub fn with_clock<C2: Clock>(self, clock: C2) -> Bucket<C2> {
255        build(self.config, clock)
256    }
257
258    /// Attempts to take `n` tokens, returning the full [`Decision`].
259    ///
260    /// Brings the bucket current (lazy refill) and, if at least `n` tokens are
261    /// available, deducts them and returns [`Decision::Allowed`]. Otherwise the
262    /// bucket is left untouched and [`Decision::Denied`] carries the minimum
263    /// wait until the request would succeed. Requesting `0` always succeeds;
264    /// requesting more than the capacity can never succeed (the denial's
265    /// `retry_after` is [`Duration::MAX`]).
266    ///
267    /// This never blocks and never allocates.
268    ///
269    /// # Examples
270    ///
271    /// ```
272    /// use better_bucket::{Bucket, Decision};
273    ///
274    /// let bucket = Bucket::per_second(5);
275    /// assert_eq!(bucket.acquire(3), Decision::Allowed);
276    /// assert_eq!(bucket.available(), 2);
277    /// ```
278    #[inline]
279    pub fn acquire(&self, n: u32) -> Decision {
280        self.acquire_inner(n)
281    }
282
283    /// Attempts to take `n` tokens, returning whether it succeeded.
284    ///
285    /// The one-line convenience over [`acquire`](Self::acquire): equivalent to
286    /// `self.acquire(n).is_allowed()`, for the common case where you only need
287    /// allow/deny and not the retry hint.
288    ///
289    /// # Examples
290    ///
291    /// ```
292    /// use better_bucket::Bucket;
293    ///
294    /// let bucket = Bucket::per_second(1);
295    /// assert!(bucket.try_acquire(1));
296    /// assert!(!bucket.try_acquire(1)); // drained
297    /// ```
298    #[inline]
299    #[must_use]
300    pub fn try_acquire(&self, n: u32) -> bool {
301        self.acquire_inner(n).is_allowed()
302    }
303
304    /// Returns how many whole tokens are available right now, after lazy refill.
305    ///
306    /// This is a momentary snapshot; under concurrent acquires it can be stale
307    /// the instant it returns. Treat it as advisory.
308    ///
309    /// # Examples
310    ///
311    /// ```
312    /// use better_bucket::Bucket;
313    ///
314    /// let bucket = Bucket::per_second(10);
315    /// assert_eq!(bucket.available(), 10);
316    /// assert!(bucket.try_acquire(4));
317    /// assert_eq!(bucket.available(), 6);
318    /// ```
319    #[inline]
320    #[must_use]
321    pub fn available(&self) -> u32 {
322        let now_ms = self.now_ms();
323        let (tokens_mt, last_ms) = unpack(self.state.load(Ordering::Relaxed));
324        let refilled = self.refilled(tokens_mt, last_ms, now_ms);
325        u32::try_from(refilled / MILLI).unwrap_or(u32::MAX)
326    }
327
328    /// Returns the bucket's capacity (its burst ceiling), in whole tokens.
329    ///
330    /// # Examples
331    ///
332    /// ```
333    /// use better_bucket::Bucket;
334    ///
335    /// assert_eq!(Bucket::per_second(64).capacity(), 64);
336    /// ```
337    #[must_use]
338    pub const fn capacity(&self) -> u32 {
339        (self.capacity_millitokens / MILLI) as u32
340    }
341
342    /// Returns the configuration this bucket was built from.
343    ///
344    /// # Examples
345    ///
346    /// ```
347    /// use better_bucket::Bucket;
348    /// use std::time::Duration;
349    ///
350    /// let bucket = Bucket::per_second(10);
351    /// assert_eq!(bucket.config().refill_period(), Duration::from_secs(1));
352    /// ```
353    #[must_use]
354    pub const fn config(&self) -> BucketConfig {
355        self.config
356    }
357
358    /// Refills the bucket to full and marks it current as of now.
359    ///
360    /// Use it to discard accumulated debt and grant a fresh burst — for example
361    /// at the start of a new billing window, or to clear a backlog after a
362    /// dependency recovers. Long uptime needs no special handling: the
363    /// millisecond counter wraps safely, so an actively-used bucket never stalls.
364    ///
365    /// # Examples
366    ///
367    /// ```
368    /// use better_bucket::Bucket;
369    ///
370    /// let bucket = Bucket::per_second(4);
371    /// assert!(bucket.try_acquire(4));
372    /// assert_eq!(bucket.available(), 0);
373    /// bucket.reset();
374    /// assert_eq!(bucket.available(), 4);
375    /// ```
376    pub fn reset(&self) {
377        let now_ms = self.now_ms();
378        self.state
379            .store(pack(self.capacity_millitokens, now_ms), Ordering::Relaxed);
380    }
381
382    /// Milliseconds since `created_at`, truncated into the 32-bit time field.
383    ///
384    /// The field wraps every ~49.7 days. [`refilled`](Self::refilled) computes
385    /// the elapsed interval with `wrapping_sub`, so the result is correct for
386    /// any gap shorter than that — i.e. for any bucket used at least once per
387    /// ~49.7 days, which covers every real limiter. A bucket left fully idle for
388    /// longer may under-refill once on its next use (safe and self-correcting).
389    #[inline]
390    fn now_ms(&self) -> u32 {
391        let elapsed = self.clock.now().saturating_duration_since(self.created_at);
392        (elapsed.as_millis() & u128::from(u32::MAX)) as u32
393    }
394
395    /// The millitoken count after refilling over `last_ms → now_ms`, capped at
396    /// capacity. Saturating throughout: a huge elapsed gap fills to capacity, it
397    /// can never wrap or overflow. When no whole millisecond has elapsed since
398    /// the last refill — the common case under bursty load — it returns
399    /// immediately, doing no arithmetic at all.
400    #[inline]
401    fn refilled(&self, tokens_mt: u64, last_ms: u32, now_ms: u32) -> u64 {
402        if self.refill_per_ms_q == 0 {
403            return tokens_mt;
404        }
405        // `wrapping_sub` is correct because the 32-bit ms field wraps: the true
406        // elapsed interval is recovered for any gap shorter than ~49.7 days.
407        let elapsed_ms = now_ms.wrapping_sub(last_ms);
408        if elapsed_ms == 0 {
409            return tokens_mt;
410        }
411        let added = u128::from(elapsed_ms).saturating_mul(self.refill_per_ms_q) >> REFILL_FRAC_BITS;
412        let added_mt = u64::try_from(added).unwrap_or(u64::MAX);
413        tokens_mt
414            .saturating_add(added_mt)
415            .min(self.capacity_millitokens)
416    }
417
418    /// The minimum time for `deficit_mt` millitokens to accrue, rounded up.
419    /// [`Duration::MAX`] if the bucket never refills.
420    fn time_for(&self, deficit_mt: u64) -> Duration {
421        if self.refill_per_ms_q == 0 {
422            return Duration::MAX;
423        }
424        // ms = ceil(deficit_mt / rate_per_ms) = ceil((deficit_mt << FRAC) / q).
425        let numerator =
426            (u128::from(deficit_mt) << REFILL_FRAC_BITS).saturating_add(self.refill_per_ms_q - 1);
427        let millis = numerator / self.refill_per_ms_q;
428        Duration::from_millis(u64::try_from(millis).unwrap_or(u64::MAX))
429    }
430
431    #[inline]
432    fn acquire_inner(&self, n: u32) -> Decision {
433        if n == 0 {
434            // Zero tokens are always available, even from an empty bucket.
435            return Decision::Allowed;
436        }
437        let need_mt = u64::from(n) * MILLI;
438        if need_mt > self.capacity_millitokens {
439            // More than the bucket can ever hold: it can never be granted.
440            return Decision::Denied {
441                retry_after: Duration::MAX,
442            };
443        }
444
445        let now_ms = self.now_ms();
446        loop {
447            let current = self.state.load(Ordering::Relaxed);
448            let (tokens_mt, last_ms) = unpack(current);
449            let refilled = self.refilled(tokens_mt, last_ms, now_ms);
450            if refilled < need_mt {
451                // Denied: report the wait for the shortfall to accrue. No write,
452                // so a denied request never contends with a granting one.
453                return Decision::Denied {
454                    retry_after: self.time_for(need_mt - refilled),
455                };
456            }
457            let next = pack(refilled - need_mt, now_ms);
458            // Relaxed is sufficient: the only shared state is this word, and the
459            // CAS gives the read-modify-write atomicity the no-over-grant
460            // contract depends on. A spurious or lost race retries.
461            match self.state.compare_exchange_weak(
462                current,
463                next,
464                Ordering::Relaxed,
465                Ordering::Relaxed,
466            ) {
467                Ok(_) => return Decision::Allowed,
468                // Lost the race (or a spurious weak failure). Hint the CPU that
469                // we are in a retry spin before recomputing — this eases the
470                // cache-line contention and yields to a sibling SMT thread, and
471                // costs nothing on the uncontended path, which never gets here.
472                Err(_) => core::hint::spin_loop(),
473            }
474        }
475    }
476}
477
478impl<C: Clock> core::fmt::Debug for Bucket<C> {
479    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
480        f.debug_struct("Bucket")
481            .field("capacity", &self.capacity())
482            .field("available", &self.available())
483            .field("config", &self.config)
484            .finish()
485    }
486}
487
488/// The token-bucket surface a consumer depends on.
489///
490/// `TokenBucket` is the abstraction `rate-net` (and any other consumer) codes
491/// against, so it can hold a bucket without naming its concrete clock type. It
492/// mirrors the inherent methods of [`Bucket`]; see those for the detailed
493/// contract of each.
494pub trait TokenBucket {
495    /// Attempts to take `n` tokens, returning the full [`Decision`].
496    fn acquire(&self, n: u32) -> Decision;
497
498    /// Attempts to take `n` tokens, returning whether it succeeded.
499    #[must_use]
500    fn try_acquire(&self, n: u32) -> bool;
501
502    /// Returns the whole tokens available right now, after lazy refill.
503    #[must_use]
504    fn available(&self) -> u32;
505
506    /// Returns the bucket's capacity (its burst ceiling).
507    #[must_use]
508    fn capacity(&self) -> u32;
509}
510
511impl<C: Clock> TokenBucket for Bucket<C> {
512    #[inline]
513    fn acquire(&self, n: u32) -> Decision {
514        self.acquire_inner(n)
515    }
516
517    #[inline]
518    fn try_acquire(&self, n: u32) -> bool {
519        self.acquire_inner(n).is_allowed()
520    }
521
522    #[inline]
523    fn available(&self) -> u32 {
524        Bucket::available(self)
525    }
526
527    #[inline]
528    fn capacity(&self) -> u32 {
529        self.capacity()
530    }
531}
532
533#[cfg(all(test, not(loom)))]
534mod tests {
535    #![allow(clippy::unwrap_used)]
536
537    use super::{Bucket, TokenBucket};
538    use crate::decision::Decision;
539    use clock_lib::{ManualClock, SystemClock};
540    use core::time::Duration;
541    use std::sync::Arc;
542    use std::thread;
543
544    /// A bucket driven by a `ManualClock` the test controls, so refill is
545    /// deterministic with no real time passing.
546    fn manual_bucket(rate: u32) -> (Arc<ManualClock>, Bucket<Arc<ManualClock>>) {
547        let clock = Arc::new(ManualClock::new());
548        let bucket = Bucket::per_second(rate).with_clock(Arc::clone(&clock));
549        (clock, bucket)
550    }
551
552    #[test]
553    fn test_bucket_is_send_and_sync() {
554        fn assert_send_sync<T: Send + Sync>() {}
555        assert_send_sync::<Bucket<SystemClock>>();
556        assert_send_sync::<Bucket<Arc<ManualClock>>>();
557    }
558
559    #[test]
560    fn test_starts_full() {
561        let (_clock, bucket) = manual_bucket(10);
562        assert_eq!(bucket.available(), 10);
563        assert_eq!(bucket.capacity(), 10);
564    }
565
566    #[test]
567    fn test_acquire_deducts_tokens() {
568        let (_clock, bucket) = manual_bucket(10);
569        assert_eq!(bucket.acquire(3), Decision::Allowed);
570        assert_eq!(bucket.available(), 7);
571    }
572
573    #[test]
574    fn test_exact_empty_then_denied() {
575        let (_clock, bucket) = manual_bucket(10);
576        assert!(bucket.try_acquire(10)); // exact-empty
577        assert_eq!(bucket.available(), 0);
578        assert!(!bucket.try_acquire(1));
579    }
580
581    #[test]
582    fn test_acquire_zero_always_allowed() {
583        let (_clock, bucket) = manual_bucket(1);
584        assert!(bucket.try_acquire(1)); // drain
585        assert!(!bucket.try_acquire(1));
586        assert!(bucket.try_acquire(0)); // still allowed when empty
587    }
588
589    #[test]
590    fn test_request_above_capacity_never_grantable() {
591        let (_clock, bucket) = manual_bucket(5);
592        assert_eq!(
593            bucket.acquire(6),
594            Decision::Denied {
595                retry_after: Duration::MAX
596            }
597        );
598    }
599
600    #[test]
601    fn test_full_refill_after_one_period() {
602        let (clock, bucket) = manual_bucket(10);
603        assert!(bucket.try_acquire(10));
604        assert!(!bucket.try_acquire(1));
605        clock.advance(Duration::from_secs(1));
606        assert_eq!(bucket.available(), 10);
607        assert!(bucket.try_acquire(10));
608    }
609
610    #[test]
611    fn test_partial_refill_is_proportional() {
612        let (clock, bucket) = manual_bucket(100);
613        assert!(bucket.try_acquire(100));
614        clock.advance(Duration::from_millis(250)); // a quarter second
615        assert_eq!(bucket.available(), 25);
616    }
617
618    #[test]
619    fn test_refill_saturates_at_capacity() {
620        let (clock, bucket) = manual_bucket(10);
621        assert!(bucket.try_acquire(10));
622        clock.advance(Duration::from_secs(100)); // would be 1000 tokens
623        assert_eq!(bucket.available(), 10); // clamped to capacity
624    }
625
626    #[test]
627    fn test_refill_after_long_idle_saturates_without_overflow() {
628        // A multi-year idle gap must fill to capacity, never wrap or overflow.
629        let (clock, bucket) = manual_bucket(1_000);
630        assert!(bucket.try_acquire(1_000));
631        clock.advance(Duration::from_secs(60 * 60 * 24 * 365 * 5));
632        assert_eq!(bucket.available(), 1_000);
633        // And the bucket is still usable afterwards.
634        assert!(bucket.try_acquire(1_000));
635    }
636
637    #[test]
638    fn test_denied_reports_retry_after() {
639        let (_clock, bucket) = manual_bucket(10);
640        assert!(bucket.try_acquire(10)); // empty
641        // Five tokens at 10/sec accrue in 500ms.
642        assert_eq!(
643            bucket.acquire(5),
644            Decision::Denied {
645                retry_after: Duration::from_millis(500)
646            }
647        );
648    }
649
650    #[test]
651    fn test_per_duration_uses_custom_period() {
652        let clock = Arc::new(ManualClock::new());
653        let bucket =
654            Bucket::per_duration(5, Duration::from_millis(100)).with_clock(Arc::clone(&clock));
655        assert!(bucket.try_acquire(5));
656        clock.advance(Duration::from_millis(100));
657        assert_eq!(bucket.available(), 5);
658    }
659
660    #[test]
661    fn test_sub_millisecond_period_still_refills() {
662        // 5 tokens per 200µs ⇒ 25 tokens/ms. The millisecond tick is coarse but
663        // the rate is computed from nanoseconds, so a full ms refills fully.
664        let clock = Arc::new(ManualClock::new());
665        let bucket =
666            Bucket::per_duration(5, Duration::from_micros(200)).with_clock(Arc::clone(&clock));
667        assert!(bucket.try_acquire(5));
668        clock.advance(Duration::from_millis(1));
669        assert_eq!(bucket.available(), 5); // capped at capacity
670    }
671
672    #[test]
673    fn test_zero_rate_is_deny_all() {
674        let bucket = Bucket::per_second(0);
675        assert_eq!(bucket.capacity(), 0);
676        assert_eq!(bucket.available(), 0);
677        assert!(!bucket.try_acquire(1));
678        assert!(bucket.try_acquire(0));
679    }
680
681    #[test]
682    fn test_reset_refills_to_capacity() {
683        let (_clock, bucket) = manual_bucket(5);
684        assert!(bucket.try_acquire(5));
685        assert_eq!(bucket.available(), 0);
686        bucket.reset();
687        assert_eq!(bucket.available(), 5);
688    }
689
690    #[test]
691    fn test_trait_object_safe_surface() {
692        let (_clock, bucket) = manual_bucket(4);
693        let as_trait: &dyn TokenBucket = &bucket;
694        assert_eq!(as_trait.capacity(), 4);
695        assert!(as_trait.try_acquire(4));
696        assert!(!as_trait.try_acquire(1));
697    }
698
699    #[test]
700    fn test_concurrent_acquire_never_over_grants() {
701        // 100 tokens, no refill (clock never advances). Eight threads each
702        // demand 30 — total demand 240, available 100. Under a correct CAS,
703        // exactly 100 succeed: no over-grant (≤ 100) and no lost token (= 100).
704        let clock = Arc::new(ManualClock::new());
705        let bucket = Arc::new(Bucket::per_second(100).with_clock(clock));
706        let threads = 8;
707        let demand = 30u32;
708
709        let handles: Vec<_> = (0..threads)
710            .map(|_| {
711                let bucket = Arc::clone(&bucket);
712                thread::spawn(move || {
713                    let mut taken = 0u32;
714                    for _ in 0..demand {
715                        if bucket.try_acquire(1) {
716                            taken += 1;
717                        }
718                    }
719                    taken
720                })
721            })
722            .collect();
723
724        let total: u32 = handles.into_iter().map(|h| h.join().unwrap()).sum();
725        assert_eq!(total, 100, "CAS bucket must grant exactly capacity");
726        assert_eq!(bucket.available(), 0);
727    }
728
729    #[test]
730    fn test_high_contention_conserves_every_token() {
731        // The accuracy certification: 16 threads hammer one no-refill bucket,
732        // each taking 3 tokens at a time. Capacity is not a multiple of the take
733        // size, so the last partial request cannot fit. With no refill, every
734        // token must be accounted for — either granted or still available — with
735        // none lost, duplicated, or handed out beyond capacity.
736        const CAPACITY: u32 = 6_001;
737        const THREADS: u32 = 16;
738        const TAKE: u32 = 3;
739
740        let clock = Arc::new(ManualClock::new()); // never advanced => no refill
741        let bucket = Arc::new(Bucket::per_second(CAPACITY).with_clock(clock));
742
743        let handles: Vec<_> = (0..THREADS)
744            .map(|_| {
745                let bucket = Arc::clone(&bucket);
746                thread::spawn(move || {
747                    let mut taken = 0u32;
748                    for _ in 0..CAPACITY {
749                        if bucket.try_acquire(TAKE) {
750                            taken += TAKE;
751                        }
752                    }
753                    taken
754                })
755            })
756            .collect();
757
758        let granted: u32 = handles.into_iter().map(|h| h.join().unwrap()).sum();
759
760        // No over-grant: never more than capacity (and a multiple of the take).
761        assert!(granted <= CAPACITY, "over-grant: {granted} > {CAPACITY}");
762        assert_eq!(granted % TAKE, 0, "a partial take was granted");
763        // Exact conservation: granted + still-available == the initial capacity.
764        // Any lost, duplicated, or corrupted token breaks this equality.
765        assert_eq!(
766            bucket.available(),
767            CAPACITY - granted,
768            "tokens were lost or corrupted under contention"
769        );
770    }
771
772    #[test]
773    fn test_pack_unpack_round_trip() {
774        for &mt in &[0_u64, 1, 1_000, 50_000, u64::from(u32::MAX)] {
775            for &ms in &[0_u32, 1, 1_000, u32::MAX] {
776                let (got_mt, got_ms) = super::unpack(super::pack(mt, ms));
777                assert_eq!(got_mt, mt.min(u64::from(u32::MAX)));
778                assert_eq!(got_ms, ms);
779            }
780        }
781    }
782}