Skip to main content

throttle_net/
throttle.rs

1//! The Tier-1 throttle: one token bucket with a waiting acquire.
2
3use core::time::Duration;
4
5use better_bucket::{Bucket, Decision as BucketDecision};
6use clock_lib::{Clock, SystemClock};
7
8use crate::decision::Decision;
9use crate::error::ThrottleError;
10use crate::limiter::Limiter;
11
12/// A single outbound throttle backed by a token bucket.
13///
14/// This is the Tier-1 surface: construct one with [`per_second`](Self::per_second)
15/// or [`per_duration`](Self::per_duration), then pace your outbound work with
16/// [`acquire`](Self::acquire). Because throttle-net protects *downstreams*, the
17/// headline operation **waits** for a token rather than rejecting the caller —
18/// you are slowing your own requests, not dropping someone else's. When you would
19/// rather not wait, [`try_acquire`](Self::try_acquire) reports the outcome
20/// immediately.
21///
22/// The bucket refills smoothly and starts full, so a burst up to the capacity is
23/// admitted at once and the sustained rate is the refill rate. Token accounting
24/// is lock-free (a single atomic compare-and-swap per acquire), and time is read
25/// from an injectable [`Clock`] — [`SystemClock`] in production, or a
26/// `ManualClock` in tests via [`with_clock`](Self::with_clock).
27///
28/// # Examples
29///
30/// ```
31/// # async fn run() -> Result<(), throttle_net::ThrottleError> {
32/// use throttle_net::Throttle;
33///
34/// // 100 requests per second, bursting up to 100.
35/// let throttle = Throttle::per_second(100);
36/// throttle.acquire().await?; // returns as soon as a token is free
37/// # Ok(())
38/// # }
39/// ```
40#[derive(Debug)]
41pub struct Throttle<C: Clock = SystemClock> {
42    bucket: Bucket<C>,
43}
44
45impl Throttle<SystemClock> {
46    /// Creates a throttle that admits `rate` units per second, bursting up to
47    /// `rate`, driven by the OS monotonic clock.
48    ///
49    /// A `rate` of `0` yields a throttle that grants nothing; an
50    /// [`acquire`](Self::acquire) on it returns
51    /// [`ThrottleError::CostExceedsCapacity`].
52    ///
53    /// # Examples
54    ///
55    /// ```
56    /// use throttle_net::Throttle;
57    ///
58    /// let throttle = Throttle::per_second(50);
59    /// assert_eq!(throttle.capacity(), 50);
60    /// assert!(throttle.try_acquire());
61    /// ```
62    #[must_use]
63    pub fn per_second(rate: u32) -> Self {
64        Self {
65            bucket: Bucket::per_second(rate),
66        }
67    }
68
69    /// Creates a throttle that admits `amount` units every `period`, bursting up
70    /// to `amount`, driven by the OS monotonic clock.
71    ///
72    /// Use this when the natural window is not one second — for example, sixty
73    /// calls per minute, or five per hundred milliseconds.
74    ///
75    /// # Examples
76    ///
77    /// ```
78    /// use std::time::Duration;
79    /// use throttle_net::Throttle;
80    ///
81    /// // 60 requests per minute.
82    /// let throttle = Throttle::per_duration(60, Duration::from_secs(60));
83    /// assert_eq!(throttle.capacity(), 60);
84    /// ```
85    #[must_use]
86    pub fn per_duration(amount: u32, period: Duration) -> Self {
87        Self {
88            bucket: Bucket::per_duration(amount, period),
89        }
90    }
91}
92
93impl<C: Clock> Throttle<C> {
94    /// Replaces the time source, returning a throttle driven by `clock`.
95    ///
96    /// The common use is deterministic testing: inject a
97    /// [`ManualClock`](clock_lib::ManualClock) (shared via an
98    /// [`Arc`](std::sync::Arc)) and drive refills by advancing it, with no real
99    /// sleeping. The bucket is re-anchored to the new clock and starts full.
100    ///
101    /// # Examples
102    ///
103    /// ```
104    /// use std::sync::Arc;
105    /// use std::time::Duration;
106    /// use clock_lib::ManualClock;
107    /// use throttle_net::Throttle;
108    ///
109    /// let clock = Arc::new(ManualClock::new());
110    /// let throttle = Throttle::per_second(2).with_clock(clock.clone());
111    ///
112    /// assert!(throttle.try_acquire());
113    /// assert!(throttle.try_acquire());
114    /// assert!(!throttle.try_acquire()); // drained
115    ///
116    /// clock.advance(Duration::from_secs(1)); // a full period refills it
117    /// assert!(throttle.try_acquire());
118    /// ```
119    #[must_use]
120    pub fn with_clock<C2: Clock>(self, clock: C2) -> Throttle<C2> {
121        Throttle {
122            bucket: self.bucket.with_clock(clock),
123        }
124    }
125
126    /// The maximum number of tokens the throttle can hold (its burst size).
127    #[inline]
128    #[must_use]
129    pub fn capacity(&self) -> u32 {
130        self.bucket.capacity()
131    }
132
133    /// The number of whole tokens available right now.
134    ///
135    /// A point-in-time read for observability and tests, not a reservation.
136    #[inline]
137    #[must_use]
138    pub fn available(&self) -> u32 {
139        self.bucket.available()
140    }
141
142    /// Attempts to take one token without waiting, returning whether it was
143    /// granted.
144    ///
145    /// # Examples
146    ///
147    /// ```
148    /// use throttle_net::Throttle;
149    ///
150    /// let throttle = Throttle::per_second(1);
151    /// assert!(throttle.try_acquire());  // the one token
152    /// assert!(!throttle.try_acquire()); // none left this instant
153    /// ```
154    #[inline]
155    #[must_use]
156    pub fn try_acquire(&self) -> bool {
157        self.bucket.try_acquire(1)
158    }
159
160    /// Attempts to take `cost` tokens without waiting, returning whether they
161    /// were granted.
162    ///
163    /// Granting is all-or-nothing: either every token is deducted or none is.
164    ///
165    /// # Examples
166    ///
167    /// ```
168    /// use throttle_net::Throttle;
169    ///
170    /// let throttle = Throttle::per_second(10);
171    /// assert!(throttle.try_acquire_with_cost(7));
172    /// assert!(!throttle.try_acquire_with_cost(7)); // only 3 left
173    /// ```
174    #[inline]
175    #[must_use]
176    pub fn try_acquire_with_cost(&self, cost: u32) -> bool {
177        self.bucket.try_acquire(cost)
178    }
179
180    /// Reports whether `cost` tokens would be granted now, without taking them.
181    ///
182    /// This is the non-consuming counterpart to [`try_acquire_with_cost`](Self::try_acquire_with_cost),
183    /// used by composite limiters to poll a constituent before committing. The
184    /// [`Decision::Retry`] wait is estimated from the refill rate, so it is a
185    /// close guide rather than an exact promise.
186    ///
187    /// # Examples
188    ///
189    /// ```
190    /// use throttle_net::{Decision, Throttle};
191    ///
192    /// let throttle = Throttle::per_second(4);
193    /// assert_eq!(throttle.peek(3), Decision::Acquired); // would grant, took nothing
194    /// assert!(throttle.try_acquire_with_cost(4));        // still full
195    /// ```
196    #[inline]
197    #[must_use]
198    pub fn peek(&self, cost: u32) -> Decision {
199        let capacity = self.bucket.capacity();
200        if cost > capacity {
201            return Decision::Impossible;
202        }
203        let available = self.bucket.available();
204        if available >= cost {
205            return Decision::Acquired;
206        }
207        let config = self.bucket.config();
208        let refill_amount = config.refill_amount();
209        let period = config.refill_period();
210        if refill_amount == 0 || period.is_zero() {
211            // No refill, and not enough on hand: it will never accrue.
212            return Decision::Impossible;
213        }
214        // `cost <= capacity` and `available < cost`, so the deficit is positive
215        // and bounded by capacity; no underflow.
216        let deficit = cost - available;
217        Decision::Retry {
218            after: estimate_refill_wait(period, deficit, refill_amount),
219        }
220    }
221
222    /// The synchronous, consuming core shared by the trait impl and the waiting
223    /// surface. Deducts `cost` on success.
224    #[inline]
225    fn decide(&self, cost: u32) -> Decision {
226        match self.bucket.acquire(cost) {
227            BucketDecision::Allowed => Decision::Acquired,
228            BucketDecision::Denied { retry_after } if retry_after == Duration::MAX => {
229                Decision::Impossible
230            }
231            BucketDecision::Denied { retry_after } => Decision::Retry { after: retry_after },
232            // `better_bucket::Decision` is `#[non_exhaustive]`. An outcome this
233            // version does not understand is treated as un-grantable rather than
234            // risk over-sending to a downstream.
235            _ => Decision::Impossible,
236        }
237    }
238}
239
240#[cfg(feature = "tokio")]
241#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
242impl<C: Clock> Throttle<C> {
243    /// Takes one token, waiting until one is available.
244    ///
245    /// This is the marquee outbound operation: it paces the caller instead of
246    /// rejecting it. It returns once a token has been deducted, or
247    /// [`ThrottleError::CostExceedsCapacity`] if the throttle's capacity is zero.
248    ///
249    /// # Errors
250    ///
251    /// Returns [`ThrottleError::CostExceedsCapacity`] when the capacity is `0`,
252    /// because a single token can never be granted.
253    ///
254    /// # Examples
255    ///
256    /// ```
257    /// # async fn run() -> Result<(), throttle_net::ThrottleError> {
258    /// use throttle_net::Throttle;
259    ///
260    /// let throttle = Throttle::per_second(100);
261    /// throttle.acquire().await?;
262    /// # Ok(())
263    /// # }
264    /// ```
265    pub async fn acquire(&self) -> Result<(), ThrottleError> {
266        self.acquire_with_cost(1).await
267    }
268
269    /// Takes `cost` tokens, waiting until they are available.
270    ///
271    /// The cost lets one request weigh more than another — a batch of ten, or an
272    /// LLM call billed by token count. The waiter sleeps for the bucket's own
273    /// estimate of the refill time and retries, so it converges without busy
274    /// spinning even under contention.
275    ///
276    /// # Errors
277    ///
278    /// Returns [`ThrottleError::CostExceedsCapacity`] when `cost` exceeds the
279    /// throttle's capacity; that request can never be granted, so it fails fast
280    /// rather than waiting forever.
281    ///
282    /// # Examples
283    ///
284    /// ```
285    /// # async fn run() -> Result<(), throttle_net::ThrottleError> {
286    /// use throttle_net::Throttle;
287    ///
288    /// let throttle = Throttle::per_second(1000);
289    /// throttle.acquire_with_cost(250).await?; // a heavier request
290    /// # Ok(())
291    /// # }
292    /// ```
293    pub async fn acquire_with_cost(&self, cost: u32) -> Result<(), ThrottleError> {
294        loop {
295            match self.decide(cost) {
296                Decision::Acquired => return Ok(()),
297                Decision::Impossible => {
298                    return Err(ThrottleError::CostExceedsCapacity {
299                        cost,
300                        capacity: self.capacity(),
301                    });
302                }
303                Decision::Retry { after } => tokio::time::sleep(after).await,
304            }
305        }
306    }
307}
308
309/// Estimates the wait until `deficit` tokens accrue at `refill_amount` per
310/// `period`, rounded up so the caller never wakes a touch too early.
311///
312/// Computed in integer nanoseconds (`u128`) to stay deterministic and avoid
313/// floating point; the result is clamped to the `Duration::from_nanos` range.
314#[inline]
315fn estimate_refill_wait(period: Duration, deficit: u32, refill_amount: u32) -> Duration {
316    let numerator = period.as_nanos().saturating_mul(u128::from(deficit));
317    let nanos = numerator.div_ceil(u128::from(refill_amount));
318    Duration::from_nanos(u64::try_from(nanos).unwrap_or(u64::MAX))
319}
320
321impl<C: Clock> Limiter for Throttle<C> {
322    #[inline]
323    fn peek(&self, cost: u32) -> Decision {
324        Throttle::peek(self, cost)
325    }
326
327    #[inline]
328    fn acquire_cost(&self, cost: u32) -> Decision {
329        self.decide(cost)
330    }
331
332    #[inline]
333    fn available(&self) -> u32 {
334        self.bucket.available()
335    }
336
337    #[inline]
338    fn capacity(&self) -> u32 {
339        self.bucket.capacity()
340    }
341}
342
343#[cfg(test)]
344mod tests {
345    #![allow(clippy::unwrap_used)]
346
347    use super::Throttle;
348    use crate::decision::Decision;
349    use crate::error::ThrottleError;
350    use crate::limiter::Limiter;
351    use clock_lib::ManualClock;
352    use core::time::Duration;
353    use std::sync::Arc;
354
355    fn assert_send_sync<T: Send + Sync>() {}
356
357    #[test]
358    fn test_public_types_are_send_sync() {
359        assert_send_sync::<Throttle>();
360        assert_send_sync::<Decision>();
361        assert_send_sync::<ThrottleError>();
362    }
363
364    #[test]
365    fn test_try_acquire_grants_up_to_capacity_then_refuses() {
366        let throttle = Throttle::per_second(3);
367        assert!(throttle.try_acquire());
368        assert!(throttle.try_acquire());
369        assert!(throttle.try_acquire());
370        assert!(!throttle.try_acquire());
371    }
372
373    #[test]
374    fn test_try_acquire_with_cost_is_all_or_nothing() {
375        let throttle = Throttle::per_second(10);
376        assert!(throttle.try_acquire_with_cost(7));
377        // Only 3 remain, so a cost of 7 takes nothing.
378        assert!(!throttle.try_acquire_with_cost(7));
379        assert!(throttle.try_acquire_with_cost(3));
380    }
381
382    #[test]
383    fn test_refill_after_a_full_period_under_manual_clock() {
384        let clock = Arc::new(ManualClock::new());
385        let throttle = Throttle::per_second(4).with_clock(clock.clone());
386
387        for _ in 0..4 {
388            assert!(throttle.try_acquire());
389        }
390        assert!(!throttle.try_acquire());
391
392        clock.advance(Duration::from_secs(1));
393        assert!(throttle.try_acquire());
394    }
395
396    #[test]
397    fn test_acquire_cost_reports_retry_then_impossible() {
398        let throttle = Throttle::per_second(2);
399        assert_eq!(throttle.acquire_cost(2), Decision::Acquired);
400        // Drained: another unit must wait.
401        assert!(matches!(throttle.acquire_cost(1), Decision::Retry { .. }));
402        // A cost beyond capacity can never be granted.
403        assert_eq!(throttle.acquire_cost(3), Decision::Impossible);
404    }
405
406    #[test]
407    fn test_available_tracks_consumption() {
408        let throttle = Throttle::per_second(5);
409        assert_eq!(throttle.available(), 5);
410        assert!(throttle.try_acquire_with_cost(2));
411        assert_eq!(throttle.available(), 3);
412    }
413
414    #[tokio::test]
415    async fn test_acquire_returns_immediately_when_a_token_is_free() {
416        let throttle = Throttle::per_second(1);
417        assert!(throttle.acquire().await.is_ok());
418    }
419
420    #[tokio::test]
421    async fn test_acquire_with_cost_errors_when_cost_exceeds_capacity() {
422        let throttle = Throttle::per_second(5);
423        let err = throttle.acquire_with_cost(9).await.unwrap_err();
424        assert_eq!(
425            err,
426            ThrottleError::CostExceedsCapacity {
427                cost: 9,
428                capacity: 5,
429            }
430        );
431    }
432
433    #[tokio::test]
434    async fn test_acquire_waits_for_refill_then_succeeds() {
435        // Capacity 1000 refilling at 1 token/ms: after draining, one token
436        // returns in about a millisecond, so the waiter completes promptly.
437        let throttle = Throttle::per_second(1000);
438        for _ in 0..1000 {
439            assert!(throttle.try_acquire());
440        }
441        assert!(!throttle.try_acquire());
442        assert!(throttle.acquire().await.is_ok());
443    }
444}