Skip to main content

throttle_net/
throttle.rs

1//! The Tier-1 throttle: one token bucket with a waiting acquire.
2
3use core::time::Duration;
4
5use better_bucket::{Bucket, Decision as BucketDecision};
6use clock_lib::{Clock, SystemClock};
7
8use crate::decision::Decision;
9#[cfg(feature = "runtime")]
10use crate::error::ThrottleError;
11use crate::limiter::Limiter;
12
13/// A single outbound throttle backed by a token bucket.
14///
15/// This is the Tier-1 surface: construct one with [`per_second`](Self::per_second)
16/// or [`per_duration`](Self::per_duration), then pace your outbound work with
17/// [`acquire`](Self::acquire). Because throttle-net protects *downstreams*, the
18/// headline operation **waits** for a token rather than rejecting the caller —
19/// you are slowing your own requests, not dropping someone else's. When you would
20/// rather not wait, [`try_acquire`](Self::try_acquire) reports the outcome
21/// immediately.
22///
23/// The bucket refills smoothly and starts full, so a burst up to the capacity is
24/// admitted at once and the sustained rate is the refill rate. Token accounting
25/// is lock-free (a single atomic compare-and-swap per acquire), and time is read
26/// from an injectable [`Clock`] — [`SystemClock`] in production, or a
27/// `ManualClock` in tests via [`with_clock`](Self::with_clock).
28///
29/// # Examples
30///
31/// ```
32/// # async fn run() -> Result<(), throttle_net::ThrottleError> {
33/// use throttle_net::Throttle;
34///
35/// // 100 requests per second, bursting up to 100.
36/// let throttle = Throttle::per_second(100);
37/// throttle.acquire().await?; // returns as soon as a token is free
38/// # Ok(())
39/// # }
40/// ```
41#[derive(Debug)]
42pub struct Throttle<C: Clock = SystemClock> {
43    bucket: Bucket<C>,
44}
45
46impl Throttle<SystemClock> {
47    /// Creates a throttle that admits `rate` units per second, bursting up to
48    /// `rate`, driven by the OS monotonic clock.
49    ///
50    /// A `rate` of `0` yields a throttle that grants nothing; an
51    /// [`acquire`](Self::acquire) on it returns
52    /// [`ThrottleError::CostExceedsCapacity`].
53    ///
54    /// # Examples
55    ///
56    /// ```
57    /// use throttle_net::Throttle;
58    ///
59    /// let throttle = Throttle::per_second(50);
60    /// assert_eq!(throttle.capacity(), 50);
61    /// assert!(throttle.try_acquire());
62    /// ```
63    #[must_use]
64    pub fn per_second(rate: u32) -> Self {
65        Self {
66            bucket: Bucket::per_second(rate),
67        }
68    }
69
70    /// Creates a throttle that admits `amount` units every `period`, bursting up
71    /// to `amount`, driven by the OS monotonic clock.
72    ///
73    /// Use this when the natural window is not one second — for example, sixty
74    /// calls per minute, or five per hundred milliseconds.
75    ///
76    /// # Examples
77    ///
78    /// ```
79    /// use std::time::Duration;
80    /// use throttle_net::Throttle;
81    ///
82    /// // 60 requests per minute.
83    /// let throttle = Throttle::per_duration(60, Duration::from_secs(60));
84    /// assert_eq!(throttle.capacity(), 60);
85    /// ```
86    #[must_use]
87    pub fn per_duration(amount: u32, period: Duration) -> Self {
88        Self {
89            bucket: Bucket::per_duration(amount, period),
90        }
91    }
92}
93
94impl<C: Clock> Throttle<C> {
95    /// Replaces the time source, returning a throttle driven by `clock`.
96    ///
97    /// The common use is deterministic testing: inject a
98    /// [`ManualClock`](clock_lib::ManualClock) (shared via an
99    /// [`Arc`](std::sync::Arc)) and drive refills by advancing it, with no real
100    /// sleeping. The bucket is re-anchored to the new clock and starts full.
101    ///
102    /// # Examples
103    ///
104    /// ```
105    /// use std::sync::Arc;
106    /// use std::time::Duration;
107    /// use clock_lib::ManualClock;
108    /// use throttle_net::Throttle;
109    ///
110    /// let clock = Arc::new(ManualClock::new());
111    /// let throttle = Throttle::per_second(2).with_clock(clock.clone());
112    ///
113    /// assert!(throttle.try_acquire());
114    /// assert!(throttle.try_acquire());
115    /// assert!(!throttle.try_acquire()); // drained
116    ///
117    /// clock.advance(Duration::from_secs(1)); // a full period refills it
118    /// assert!(throttle.try_acquire());
119    /// ```
120    #[must_use]
121    pub fn with_clock<C2: Clock>(self, clock: C2) -> Throttle<C2> {
122        Throttle {
123            bucket: self.bucket.with_clock(clock),
124        }
125    }
126
127    /// The maximum number of tokens the throttle can hold (its burst size).
128    #[inline]
129    #[must_use]
130    pub fn capacity(&self) -> u32 {
131        self.bucket.capacity()
132    }
133
134    /// The number of whole tokens available right now.
135    ///
136    /// A point-in-time read for observability and tests, not a reservation.
137    #[inline]
138    #[must_use]
139    pub fn available(&self) -> u32 {
140        self.bucket.available()
141    }
142
143    /// Attempts to take one token without waiting, returning whether it was
144    /// granted.
145    ///
146    /// # Examples
147    ///
148    /// ```
149    /// use throttle_net::Throttle;
150    ///
151    /// let throttle = Throttle::per_second(1);
152    /// assert!(throttle.try_acquire());  // the one token
153    /// assert!(!throttle.try_acquire()); // none left this instant
154    /// ```
155    #[inline]
156    #[must_use]
157    pub fn try_acquire(&self) -> bool {
158        self.bucket.try_acquire(1)
159    }
160
161    /// Attempts to take `cost` tokens without waiting, returning whether they
162    /// were granted.
163    ///
164    /// Granting is all-or-nothing: either every token is deducted or none is.
165    ///
166    /// # Examples
167    ///
168    /// ```
169    /// use throttle_net::Throttle;
170    ///
171    /// let throttle = Throttle::per_second(10);
172    /// assert!(throttle.try_acquire_with_cost(7));
173    /// assert!(!throttle.try_acquire_with_cost(7)); // only 3 left
174    /// ```
175    #[inline]
176    #[must_use]
177    pub fn try_acquire_with_cost(&self, cost: u32) -> bool {
178        self.bucket.try_acquire(cost)
179    }
180
181    /// Reports whether `cost` tokens would be granted now, without taking them.
182    ///
183    /// This is the non-consuming counterpart to [`try_acquire_with_cost`](Self::try_acquire_with_cost),
184    /// used by composite limiters to poll a constituent before committing. The
185    /// [`Decision::Retry`] wait is estimated from the refill rate, so it is a
186    /// close guide rather than an exact promise.
187    ///
188    /// # Examples
189    ///
190    /// ```
191    /// use throttle_net::{Decision, Throttle};
192    ///
193    /// let throttle = Throttle::per_second(4);
194    /// assert_eq!(throttle.peek(3), Decision::Acquired); // would grant, took nothing
195    /// assert!(throttle.try_acquire_with_cost(4));        // still full
196    /// ```
197    #[inline]
198    #[must_use]
199    pub fn peek(&self, cost: u32) -> Decision {
200        let capacity = self.bucket.capacity();
201        if cost > capacity {
202            return Decision::Impossible;
203        }
204        let available = self.bucket.available();
205        if available >= cost {
206            return Decision::Acquired;
207        }
208        let config = self.bucket.config();
209        let refill_amount = config.refill_amount();
210        let period = config.refill_period();
211        if refill_amount == 0 || period.is_zero() {
212            // No refill, and not enough on hand: it will never accrue.
213            return Decision::Impossible;
214        }
215        // `cost <= capacity` and `available < cost`, so the deficit is positive
216        // and bounded by capacity; no underflow.
217        let deficit = cost - available;
218        Decision::Retry {
219            after: estimate_refill_wait(period, deficit, refill_amount),
220        }
221    }
222
223    /// The synchronous, consuming core shared by the trait impl and the waiting
224    /// surface. Deducts `cost` on success.
225    #[inline]
226    fn decide(&self, cost: u32) -> Decision {
227        match self.bucket.acquire(cost) {
228            BucketDecision::Allowed => Decision::Acquired,
229            BucketDecision::Denied { retry_after } if retry_after == Duration::MAX => {
230                Decision::Impossible
231            }
232            BucketDecision::Denied { retry_after } => Decision::Retry { after: retry_after },
233            // `better_bucket::Decision` is `#[non_exhaustive]`. An outcome this
234            // version does not understand is treated as un-grantable rather than
235            // risk over-sending to a downstream.
236            _ => Decision::Impossible,
237        }
238    }
239}
240
241#[cfg(feature = "runtime")]
242#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
243impl<C: Clock> Throttle<C> {
244    /// Takes one token, waiting until one is available.
245    ///
246    /// This is the marquee outbound operation: it paces the caller instead of
247    /// rejecting it. It returns once a token has been deducted, or
248    /// [`ThrottleError::CostExceedsCapacity`] if the throttle's capacity is zero.
249    ///
250    /// # Errors
251    ///
252    /// Returns [`ThrottleError::CostExceedsCapacity`] when the capacity is `0`,
253    /// because a single token can never be granted.
254    ///
255    /// # Examples
256    ///
257    /// ```
258    /// # async fn run() -> Result<(), throttle_net::ThrottleError> {
259    /// use throttle_net::Throttle;
260    ///
261    /// let throttle = Throttle::per_second(100);
262    /// throttle.acquire().await?;
263    /// # Ok(())
264    /// # }
265    /// ```
266    pub async fn acquire(&self) -> Result<(), ThrottleError> {
267        self.acquire_with_cost(1).await
268    }
269
270    /// Takes `cost` tokens, waiting until they are available.
271    ///
272    /// The cost lets one request weigh more than another — a batch of ten, or an
273    /// LLM call billed by token count. The waiter sleeps for the bucket's own
274    /// estimate of the refill time and retries, so it converges without busy
275    /// spinning even under contention.
276    ///
277    /// # Errors
278    ///
279    /// Returns [`ThrottleError::CostExceedsCapacity`] when `cost` exceeds the
280    /// throttle's capacity; that request can never be granted, so it fails fast
281    /// rather than waiting forever.
282    ///
283    /// # Examples
284    ///
285    /// ```
286    /// # async fn run() -> Result<(), throttle_net::ThrottleError> {
287    /// use throttle_net::Throttle;
288    ///
289    /// let throttle = Throttle::per_second(1000);
290    /// throttle.acquire_with_cost(250).await?; // a heavier request
291    /// # Ok(())
292    /// # }
293    /// ```
294    pub async fn acquire_with_cost(&self, cost: u32) -> Result<(), ThrottleError> {
295        let timer = crate::obs::Timer::start();
296        let result = loop {
297            match self.decide(cost) {
298                Decision::Acquired => break Ok(()),
299                Decision::Impossible => {
300                    break Err(ThrottleError::CostExceedsCapacity {
301                        cost,
302                        capacity: self.capacity(),
303                    });
304                }
305                Decision::Retry { after } => crate::rt::sleep(after).await,
306            }
307        };
308        if result.is_ok() {
309            crate::obs::acquired("throttle");
310        }
311        crate::obs::wait("throttle", &timer);
312        crate::obs::trace_acquire("throttle", cost, result.is_ok(), &timer);
313        result
314    }
315}
316
317/// Estimates the wait until `deficit` tokens accrue at `refill_amount` per
318/// `period`, rounded up so the caller never wakes a touch too early.
319///
320/// Computed in integer nanoseconds (`u128`) to stay deterministic and avoid
321/// floating point; the result is clamped to the `Duration::from_nanos` range.
322#[inline]
323fn estimate_refill_wait(period: Duration, deficit: u32, refill_amount: u32) -> Duration {
324    let numerator = period.as_nanos().saturating_mul(u128::from(deficit));
325    let nanos = numerator.div_ceil(u128::from(refill_amount));
326    Duration::from_nanos(u64::try_from(nanos).unwrap_or(u64::MAX))
327}
328
329impl<C: Clock> Limiter for Throttle<C> {
330    #[inline]
331    fn peek(&self, cost: u32) -> Decision {
332        Throttle::peek(self, cost)
333    }
334
335    #[inline]
336    fn acquire_cost(&self, cost: u32) -> Decision {
337        self.decide(cost)
338    }
339
340    #[inline]
341    fn available(&self) -> u32 {
342        self.bucket.available()
343    }
344
345    #[inline]
346    fn capacity(&self) -> u32 {
347        self.bucket.capacity()
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    #![allow(clippy::unwrap_used)]
354
355    use super::Throttle;
356    use crate::decision::Decision;
357    use crate::error::ThrottleError;
358    use crate::limiter::Limiter;
359    use clock_lib::ManualClock;
360    use core::time::Duration;
361    use std::sync::Arc;
362
363    fn assert_send_sync<T: Send + Sync>() {}
364
365    #[test]
366    fn test_public_types_are_send_sync() {
367        assert_send_sync::<Throttle>();
368        assert_send_sync::<Decision>();
369        assert_send_sync::<ThrottleError>();
370    }
371
372    #[test]
373    fn test_try_acquire_grants_up_to_capacity_then_refuses() {
374        let throttle = Throttle::per_second(3);
375        assert!(throttle.try_acquire());
376        assert!(throttle.try_acquire());
377        assert!(throttle.try_acquire());
378        assert!(!throttle.try_acquire());
379    }
380
381    #[test]
382    fn test_try_acquire_with_cost_is_all_or_nothing() {
383        let throttle = Throttle::per_second(10);
384        assert!(throttle.try_acquire_with_cost(7));
385        // Only 3 remain, so a cost of 7 takes nothing.
386        assert!(!throttle.try_acquire_with_cost(7));
387        assert!(throttle.try_acquire_with_cost(3));
388    }
389
390    #[test]
391    fn test_refill_after_a_full_period_under_manual_clock() {
392        let clock = Arc::new(ManualClock::new());
393        let throttle = Throttle::per_second(4).with_clock(clock.clone());
394
395        for _ in 0..4 {
396            assert!(throttle.try_acquire());
397        }
398        assert!(!throttle.try_acquire());
399
400        clock.advance(Duration::from_secs(1));
401        assert!(throttle.try_acquire());
402    }
403
404    #[test]
405    fn test_acquire_cost_reports_retry_then_impossible() {
406        let throttle = Throttle::per_second(2);
407        assert_eq!(throttle.acquire_cost(2), Decision::Acquired);
408        // Drained: another unit must wait.
409        assert!(matches!(throttle.acquire_cost(1), Decision::Retry { .. }));
410        // A cost beyond capacity can never be granted.
411        assert_eq!(throttle.acquire_cost(3), Decision::Impossible);
412    }
413
414    #[test]
415    fn test_available_tracks_consumption() {
416        let throttle = Throttle::per_second(5);
417        assert_eq!(throttle.available(), 5);
418        assert!(throttle.try_acquire_with_cost(2));
419        assert_eq!(throttle.available(), 3);
420    }
421
422    #[tokio::test]
423    async fn test_acquire_returns_immediately_when_a_token_is_free() {
424        let throttle = Throttle::per_second(1);
425        assert!(throttle.acquire().await.is_ok());
426    }
427
428    #[tokio::test]
429    async fn test_acquire_with_cost_errors_when_cost_exceeds_capacity() {
430        let throttle = Throttle::per_second(5);
431        let err = throttle.acquire_with_cost(9).await.unwrap_err();
432        assert_eq!(
433            err,
434            ThrottleError::CostExceedsCapacity {
435                cost: 9,
436                capacity: 5,
437            }
438        );
439    }
440
441    #[tokio::test]
442    async fn test_acquire_waits_for_refill_then_succeeds() {
443        // Capacity 1000 refilling at 1 token/ms: after draining, one token
444        // returns in about a millisecond, so the waiter completes promptly.
445        let throttle = Throttle::per_second(1000);
446        for _ in 0..1000 {
447            assert!(throttle.try_acquire());
448        }
449        assert!(!throttle.try_acquire());
450        assert!(throttle.acquire().await.is_ok());
451    }
452}