throttle_net/throttle.rs
1//! The Tier-1 throttle: one token bucket with a waiting acquire.
2
3use core::time::Duration;
4
5use better_bucket::{Bucket, Decision as BucketDecision};
6use clock_lib::{Clock, SystemClock};
7
8use crate::decision::Decision;
9use crate::error::ThrottleError;
10use crate::limiter::Limiter;
11
12/// A single outbound throttle backed by a token bucket.
13///
14/// This is the Tier-1 surface: construct one with [`per_second`](Self::per_second)
15/// or [`per_duration`](Self::per_duration), then pace your outbound work with
16/// [`acquire`](Self::acquire). Because throttle-net protects *downstreams*, the
17/// headline operation **waits** for a token rather than rejecting the caller —
18/// you are slowing your own requests, not dropping someone else's. When you would
19/// rather not wait, [`try_acquire`](Self::try_acquire) reports the outcome
20/// immediately.
21///
22/// The bucket refills smoothly and starts full, so a burst up to the capacity is
23/// admitted at once and the sustained rate is the refill rate. Token accounting
24/// is lock-free (a single atomic compare-and-swap per acquire), and time is read
25/// from an injectable [`Clock`] — [`SystemClock`] in production, or a
26/// `ManualClock` in tests via [`with_clock`](Self::with_clock).
27///
28/// # Examples
29///
30/// ```
31/// # async fn run() -> Result<(), throttle_net::ThrottleError> {
32/// use throttle_net::Throttle;
33///
34/// // 100 requests per second, bursting up to 100.
35/// let throttle = Throttle::per_second(100);
36/// throttle.acquire().await?; // returns as soon as a token is free
37/// # Ok(())
38/// # }
39/// ```
40#[derive(Debug)]
41pub struct Throttle<C: Clock = SystemClock> {
42 bucket: Bucket<C>,
43}
44
45impl Throttle<SystemClock> {
46 /// Creates a throttle that admits `rate` units per second, bursting up to
47 /// `rate`, driven by the OS monotonic clock.
48 ///
49 /// A `rate` of `0` yields a throttle that grants nothing; an
50 /// [`acquire`](Self::acquire) on it returns
51 /// [`ThrottleError::CostExceedsCapacity`].
52 ///
53 /// # Examples
54 ///
55 /// ```
56 /// use throttle_net::Throttle;
57 ///
58 /// let throttle = Throttle::per_second(50);
59 /// assert_eq!(throttle.capacity(), 50);
60 /// assert!(throttle.try_acquire());
61 /// ```
62 #[must_use]
63 pub fn per_second(rate: u32) -> Self {
64 Self {
65 bucket: Bucket::per_second(rate),
66 }
67 }
68
69 /// Creates a throttle that admits `amount` units every `period`, bursting up
70 /// to `amount`, driven by the OS monotonic clock.
71 ///
72 /// Use this when the natural window is not one second — for example, sixty
73 /// calls per minute, or five per hundred milliseconds.
74 ///
75 /// # Examples
76 ///
77 /// ```
78 /// use std::time::Duration;
79 /// use throttle_net::Throttle;
80 ///
81 /// // 60 requests per minute.
82 /// let throttle = Throttle::per_duration(60, Duration::from_secs(60));
83 /// assert_eq!(throttle.capacity(), 60);
84 /// ```
85 #[must_use]
86 pub fn per_duration(amount: u32, period: Duration) -> Self {
87 Self {
88 bucket: Bucket::per_duration(amount, period),
89 }
90 }
91}
92
93impl<C: Clock> Throttle<C> {
94 /// Replaces the time source, returning a throttle driven by `clock`.
95 ///
96 /// The common use is deterministic testing: inject a
97 /// [`ManualClock`](clock_lib::ManualClock) (shared via an
98 /// [`Arc`](std::sync::Arc)) and drive refills by advancing it, with no real
99 /// sleeping. The bucket is re-anchored to the new clock and starts full.
100 ///
101 /// # Examples
102 ///
103 /// ```
104 /// use std::sync::Arc;
105 /// use std::time::Duration;
106 /// use clock_lib::ManualClock;
107 /// use throttle_net::Throttle;
108 ///
109 /// let clock = Arc::new(ManualClock::new());
110 /// let throttle = Throttle::per_second(2).with_clock(clock.clone());
111 ///
112 /// assert!(throttle.try_acquire());
113 /// assert!(throttle.try_acquire());
114 /// assert!(!throttle.try_acquire()); // drained
115 ///
116 /// clock.advance(Duration::from_secs(1)); // a full period refills it
117 /// assert!(throttle.try_acquire());
118 /// ```
119 #[must_use]
120 pub fn with_clock<C2: Clock>(self, clock: C2) -> Throttle<C2> {
121 Throttle {
122 bucket: self.bucket.with_clock(clock),
123 }
124 }
125
126 /// The maximum number of tokens the throttle can hold (its burst size).
127 #[inline]
128 #[must_use]
129 pub fn capacity(&self) -> u32 {
130 self.bucket.capacity()
131 }
132
133 /// The number of whole tokens available right now.
134 ///
135 /// A point-in-time read for observability and tests, not a reservation.
136 #[inline]
137 #[must_use]
138 pub fn available(&self) -> u32 {
139 self.bucket.available()
140 }
141
142 /// Attempts to take one token without waiting, returning whether it was
143 /// granted.
144 ///
145 /// # Examples
146 ///
147 /// ```
148 /// use throttle_net::Throttle;
149 ///
150 /// let throttle = Throttle::per_second(1);
151 /// assert!(throttle.try_acquire()); // the one token
152 /// assert!(!throttle.try_acquire()); // none left this instant
153 /// ```
154 #[inline]
155 #[must_use]
156 pub fn try_acquire(&self) -> bool {
157 self.bucket.try_acquire(1)
158 }
159
160 /// Attempts to take `cost` tokens without waiting, returning whether they
161 /// were granted.
162 ///
163 /// Granting is all-or-nothing: either every token is deducted or none is.
164 ///
165 /// # Examples
166 ///
167 /// ```
168 /// use throttle_net::Throttle;
169 ///
170 /// let throttle = Throttle::per_second(10);
171 /// assert!(throttle.try_acquire_with_cost(7));
172 /// assert!(!throttle.try_acquire_with_cost(7)); // only 3 left
173 /// ```
174 #[inline]
175 #[must_use]
176 pub fn try_acquire_with_cost(&self, cost: u32) -> bool {
177 self.bucket.try_acquire(cost)
178 }
179
180 /// Reports whether `cost` tokens would be granted now, without taking them.
181 ///
182 /// This is the non-consuming counterpart to [`try_acquire_with_cost`](Self::try_acquire_with_cost),
183 /// used by composite limiters to poll a constituent before committing. The
184 /// [`Decision::Retry`] wait is estimated from the refill rate, so it is a
185 /// close guide rather than an exact promise.
186 ///
187 /// # Examples
188 ///
189 /// ```
190 /// use throttle_net::{Decision, Throttle};
191 ///
192 /// let throttle = Throttle::per_second(4);
193 /// assert_eq!(throttle.peek(3), Decision::Acquired); // would grant, took nothing
194 /// assert!(throttle.try_acquire_with_cost(4)); // still full
195 /// ```
196 #[inline]
197 #[must_use]
198 pub fn peek(&self, cost: u32) -> Decision {
199 let capacity = self.bucket.capacity();
200 if cost > capacity {
201 return Decision::Impossible;
202 }
203 let available = self.bucket.available();
204 if available >= cost {
205 return Decision::Acquired;
206 }
207 let config = self.bucket.config();
208 let refill_amount = config.refill_amount();
209 let period = config.refill_period();
210 if refill_amount == 0 || period.is_zero() {
211 // No refill, and not enough on hand: it will never accrue.
212 return Decision::Impossible;
213 }
214 // `cost <= capacity` and `available < cost`, so the deficit is positive
215 // and bounded by capacity; no underflow.
216 let deficit = cost - available;
217 Decision::Retry {
218 after: estimate_refill_wait(period, deficit, refill_amount),
219 }
220 }
221
222 /// The synchronous, consuming core shared by the trait impl and the waiting
223 /// surface. Deducts `cost` on success.
224 #[inline]
225 fn decide(&self, cost: u32) -> Decision {
226 match self.bucket.acquire(cost) {
227 BucketDecision::Allowed => Decision::Acquired,
228 BucketDecision::Denied { retry_after } if retry_after == Duration::MAX => {
229 Decision::Impossible
230 }
231 BucketDecision::Denied { retry_after } => Decision::Retry { after: retry_after },
232 // `better_bucket::Decision` is `#[non_exhaustive]`. An outcome this
233 // version does not understand is treated as un-grantable rather than
234 // risk over-sending to a downstream.
235 _ => Decision::Impossible,
236 }
237 }
238}
239
240#[cfg(feature = "tokio")]
241#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
242impl<C: Clock> Throttle<C> {
243 /// Takes one token, waiting until one is available.
244 ///
245 /// This is the marquee outbound operation: it paces the caller instead of
246 /// rejecting it. It returns once a token has been deducted, or
247 /// [`ThrottleError::CostExceedsCapacity`] if the throttle's capacity is zero.
248 ///
249 /// # Errors
250 ///
251 /// Returns [`ThrottleError::CostExceedsCapacity`] when the capacity is `0`,
252 /// because a single token can never be granted.
253 ///
254 /// # Examples
255 ///
256 /// ```
257 /// # async fn run() -> Result<(), throttle_net::ThrottleError> {
258 /// use throttle_net::Throttle;
259 ///
260 /// let throttle = Throttle::per_second(100);
261 /// throttle.acquire().await?;
262 /// # Ok(())
263 /// # }
264 /// ```
265 pub async fn acquire(&self) -> Result<(), ThrottleError> {
266 self.acquire_with_cost(1).await
267 }
268
269 /// Takes `cost` tokens, waiting until they are available.
270 ///
271 /// The cost lets one request weigh more than another — a batch of ten, or an
272 /// LLM call billed by token count. The waiter sleeps for the bucket's own
273 /// estimate of the refill time and retries, so it converges without busy
274 /// spinning even under contention.
275 ///
276 /// # Errors
277 ///
278 /// Returns [`ThrottleError::CostExceedsCapacity`] when `cost` exceeds the
279 /// throttle's capacity; that request can never be granted, so it fails fast
280 /// rather than waiting forever.
281 ///
282 /// # Examples
283 ///
284 /// ```
285 /// # async fn run() -> Result<(), throttle_net::ThrottleError> {
286 /// use throttle_net::Throttle;
287 ///
288 /// let throttle = Throttle::per_second(1000);
289 /// throttle.acquire_with_cost(250).await?; // a heavier request
290 /// # Ok(())
291 /// # }
292 /// ```
293 pub async fn acquire_with_cost(&self, cost: u32) -> Result<(), ThrottleError> {
294 loop {
295 match self.decide(cost) {
296 Decision::Acquired => return Ok(()),
297 Decision::Impossible => {
298 return Err(ThrottleError::CostExceedsCapacity {
299 cost,
300 capacity: self.capacity(),
301 });
302 }
303 Decision::Retry { after } => tokio::time::sleep(after).await,
304 }
305 }
306 }
307}
308
309/// Estimates the wait until `deficit` tokens accrue at `refill_amount` per
310/// `period`, rounded up so the caller never wakes a touch too early.
311///
312/// Computed in integer nanoseconds (`u128`) to stay deterministic and avoid
313/// floating point; the result is clamped to the `Duration::from_nanos` range.
314#[inline]
315fn estimate_refill_wait(period: Duration, deficit: u32, refill_amount: u32) -> Duration {
316 let numerator = period.as_nanos().saturating_mul(u128::from(deficit));
317 let nanos = numerator.div_ceil(u128::from(refill_amount));
318 Duration::from_nanos(u64::try_from(nanos).unwrap_or(u64::MAX))
319}
320
321impl<C: Clock> Limiter for Throttle<C> {
322 #[inline]
323 fn peek(&self, cost: u32) -> Decision {
324 Throttle::peek(self, cost)
325 }
326
327 #[inline]
328 fn acquire_cost(&self, cost: u32) -> Decision {
329 self.decide(cost)
330 }
331
332 #[inline]
333 fn available(&self) -> u32 {
334 self.bucket.available()
335 }
336
337 #[inline]
338 fn capacity(&self) -> u32 {
339 self.bucket.capacity()
340 }
341}
342
343#[cfg(test)]
344mod tests {
345 #![allow(clippy::unwrap_used)]
346
347 use super::Throttle;
348 use crate::decision::Decision;
349 use crate::error::ThrottleError;
350 use crate::limiter::Limiter;
351 use clock_lib::ManualClock;
352 use core::time::Duration;
353 use std::sync::Arc;
354
355 fn assert_send_sync<T: Send + Sync>() {}
356
357 #[test]
358 fn test_public_types_are_send_sync() {
359 assert_send_sync::<Throttle>();
360 assert_send_sync::<Decision>();
361 assert_send_sync::<ThrottleError>();
362 }
363
364 #[test]
365 fn test_try_acquire_grants_up_to_capacity_then_refuses() {
366 let throttle = Throttle::per_second(3);
367 assert!(throttle.try_acquire());
368 assert!(throttle.try_acquire());
369 assert!(throttle.try_acquire());
370 assert!(!throttle.try_acquire());
371 }
372
373 #[test]
374 fn test_try_acquire_with_cost_is_all_or_nothing() {
375 let throttle = Throttle::per_second(10);
376 assert!(throttle.try_acquire_with_cost(7));
377 // Only 3 remain, so a cost of 7 takes nothing.
378 assert!(!throttle.try_acquire_with_cost(7));
379 assert!(throttle.try_acquire_with_cost(3));
380 }
381
382 #[test]
383 fn test_refill_after_a_full_period_under_manual_clock() {
384 let clock = Arc::new(ManualClock::new());
385 let throttle = Throttle::per_second(4).with_clock(clock.clone());
386
387 for _ in 0..4 {
388 assert!(throttle.try_acquire());
389 }
390 assert!(!throttle.try_acquire());
391
392 clock.advance(Duration::from_secs(1));
393 assert!(throttle.try_acquire());
394 }
395
396 #[test]
397 fn test_acquire_cost_reports_retry_then_impossible() {
398 let throttle = Throttle::per_second(2);
399 assert_eq!(throttle.acquire_cost(2), Decision::Acquired);
400 // Drained: another unit must wait.
401 assert!(matches!(throttle.acquire_cost(1), Decision::Retry { .. }));
402 // A cost beyond capacity can never be granted.
403 assert_eq!(throttle.acquire_cost(3), Decision::Impossible);
404 }
405
406 #[test]
407 fn test_available_tracks_consumption() {
408 let throttle = Throttle::per_second(5);
409 assert_eq!(throttle.available(), 5);
410 assert!(throttle.try_acquire_with_cost(2));
411 assert_eq!(throttle.available(), 3);
412 }
413
414 #[tokio::test]
415 async fn test_acquire_returns_immediately_when_a_token_is_free() {
416 let throttle = Throttle::per_second(1);
417 assert!(throttle.acquire().await.is_ok());
418 }
419
420 #[tokio::test]
421 async fn test_acquire_with_cost_errors_when_cost_exceeds_capacity() {
422 let throttle = Throttle::per_second(5);
423 let err = throttle.acquire_with_cost(9).await.unwrap_err();
424 assert_eq!(
425 err,
426 ThrottleError::CostExceedsCapacity {
427 cost: 9,
428 capacity: 5,
429 }
430 );
431 }
432
433 #[tokio::test]
434 async fn test_acquire_waits_for_refill_then_succeeds() {
435 // Capacity 1000 refilling at 1 token/ms: after draining, one token
436 // returns in about a millisecond, so the waiter completes promptly.
437 let throttle = Throttle::per_second(1000);
438 for _ in 0..1000 {
439 assert!(throttle.try_acquire());
440 }
441 assert!(!throttle.try_acquire());
442 assert!(throttle.acquire().await.is_ok());
443 }
444}