throttle_net/throttle.rs
1//! The Tier-1 throttle: one token bucket with a waiting acquire.
2
3use core::time::Duration;
4
5use better_bucket::{Bucket, Decision as BucketDecision};
6use clock_lib::{Clock, SystemClock};
7
8use crate::decision::Decision;
9#[cfg(feature = "runtime")]
10use crate::error::ThrottleError;
11use crate::limiter::Limiter;
12
13/// A single outbound throttle backed by a token bucket.
14///
15/// This is the Tier-1 surface: construct one with [`per_second`](Self::per_second)
16/// or [`per_duration`](Self::per_duration), then pace your outbound work with
17/// [`acquire`](Self::acquire). Because throttle-net protects *downstreams*, the
18/// headline operation **waits** for a token rather than rejecting the caller —
19/// you are slowing your own requests, not dropping someone else's. When you would
20/// rather not wait, [`try_acquire`](Self::try_acquire) reports the outcome
21/// immediately.
22///
23/// The bucket refills smoothly and starts full, so a burst up to the capacity is
24/// admitted at once and the sustained rate is the refill rate. Token accounting
25/// is lock-free (a single atomic compare-and-swap per acquire), and time is read
26/// from an injectable [`Clock`] — [`SystemClock`] in production, or a
27/// `ManualClock` in tests via [`with_clock`](Self::with_clock).
28///
29/// # Examples
30///
31/// ```
32/// # async fn run() -> Result<(), throttle_net::ThrottleError> {
33/// use throttle_net::Throttle;
34///
35/// // 100 requests per second, bursting up to 100.
36/// let throttle = Throttle::per_second(100);
37/// throttle.acquire().await?; // returns as soon as a token is free
38/// # Ok(())
39/// # }
40/// ```
41#[derive(Debug)]
42pub struct Throttle<C: Clock = SystemClock> {
43 bucket: Bucket<C>,
44}
45
46impl Throttle<SystemClock> {
47 /// Creates a throttle that admits `rate` units per second, bursting up to
48 /// `rate`, driven by the OS monotonic clock.
49 ///
50 /// A `rate` of `0` yields a throttle that grants nothing; an
51 /// [`acquire`](Self::acquire) on it returns
52 /// [`ThrottleError::CostExceedsCapacity`].
53 ///
54 /// # Examples
55 ///
56 /// ```
57 /// use throttle_net::Throttle;
58 ///
59 /// let throttle = Throttle::per_second(50);
60 /// assert_eq!(throttle.capacity(), 50);
61 /// assert!(throttle.try_acquire());
62 /// ```
63 #[must_use]
64 pub fn per_second(rate: u32) -> Self {
65 Self {
66 bucket: Bucket::per_second(rate),
67 }
68 }
69
70 /// Creates a throttle that admits `amount` units every `period`, bursting up
71 /// to `amount`, driven by the OS monotonic clock.
72 ///
73 /// Use this when the natural window is not one second — for example, sixty
74 /// calls per minute, or five per hundred milliseconds.
75 ///
76 /// # Examples
77 ///
78 /// ```
79 /// use std::time::Duration;
80 /// use throttle_net::Throttle;
81 ///
82 /// // 60 requests per minute.
83 /// let throttle = Throttle::per_duration(60, Duration::from_secs(60));
84 /// assert_eq!(throttle.capacity(), 60);
85 /// ```
86 #[must_use]
87 pub fn per_duration(amount: u32, period: Duration) -> Self {
88 Self {
89 bucket: Bucket::per_duration(amount, period),
90 }
91 }
92}
93
94impl<C: Clock> Throttle<C> {
95 /// Replaces the time source, returning a throttle driven by `clock`.
96 ///
97 /// The common use is deterministic testing: inject a
98 /// [`ManualClock`](clock_lib::ManualClock) (shared via an
99 /// [`Arc`](std::sync::Arc)) and drive refills by advancing it, with no real
100 /// sleeping. The bucket is re-anchored to the new clock and starts full.
101 ///
102 /// # Examples
103 ///
104 /// ```
105 /// use std::sync::Arc;
106 /// use std::time::Duration;
107 /// use clock_lib::ManualClock;
108 /// use throttle_net::Throttle;
109 ///
110 /// let clock = Arc::new(ManualClock::new());
111 /// let throttle = Throttle::per_second(2).with_clock(clock.clone());
112 ///
113 /// assert!(throttle.try_acquire());
114 /// assert!(throttle.try_acquire());
115 /// assert!(!throttle.try_acquire()); // drained
116 ///
117 /// clock.advance(Duration::from_secs(1)); // a full period refills it
118 /// assert!(throttle.try_acquire());
119 /// ```
120 #[must_use]
121 pub fn with_clock<C2: Clock>(self, clock: C2) -> Throttle<C2> {
122 Throttle {
123 bucket: self.bucket.with_clock(clock),
124 }
125 }
126
127 /// The maximum number of tokens the throttle can hold (its burst size).
128 #[inline]
129 #[must_use]
130 pub fn capacity(&self) -> u32 {
131 self.bucket.capacity()
132 }
133
134 /// The number of whole tokens available right now.
135 ///
136 /// A point-in-time read for observability and tests, not a reservation.
137 #[inline]
138 #[must_use]
139 pub fn available(&self) -> u32 {
140 self.bucket.available()
141 }
142
143 /// Attempts to take one token without waiting, returning whether it was
144 /// granted.
145 ///
146 /// # Examples
147 ///
148 /// ```
149 /// use throttle_net::Throttle;
150 ///
151 /// let throttle = Throttle::per_second(1);
152 /// assert!(throttle.try_acquire()); // the one token
153 /// assert!(!throttle.try_acquire()); // none left this instant
154 /// ```
155 #[inline]
156 #[must_use]
157 pub fn try_acquire(&self) -> bool {
158 self.bucket.try_acquire(1)
159 }
160
161 /// Attempts to take `cost` tokens without waiting, returning whether they
162 /// were granted.
163 ///
164 /// Granting is all-or-nothing: either every token is deducted or none is.
165 ///
166 /// # Examples
167 ///
168 /// ```
169 /// use throttle_net::Throttle;
170 ///
171 /// let throttle = Throttle::per_second(10);
172 /// assert!(throttle.try_acquire_with_cost(7));
173 /// assert!(!throttle.try_acquire_with_cost(7)); // only 3 left
174 /// ```
175 #[inline]
176 #[must_use]
177 pub fn try_acquire_with_cost(&self, cost: u32) -> bool {
178 self.bucket.try_acquire(cost)
179 }
180
181 /// Reports whether `cost` tokens would be granted now, without taking them.
182 ///
183 /// This is the non-consuming counterpart to [`try_acquire_with_cost`](Self::try_acquire_with_cost),
184 /// used by composite limiters to poll a constituent before committing. The
185 /// [`Decision::Retry`] wait is estimated from the refill rate, so it is a
186 /// close guide rather than an exact promise.
187 ///
188 /// # Examples
189 ///
190 /// ```
191 /// use throttle_net::{Decision, Throttle};
192 ///
193 /// let throttle = Throttle::per_second(4);
194 /// assert_eq!(throttle.peek(3), Decision::Acquired); // would grant, took nothing
195 /// assert!(throttle.try_acquire_with_cost(4)); // still full
196 /// ```
197 #[inline]
198 #[must_use]
199 pub fn peek(&self, cost: u32) -> Decision {
200 let capacity = self.bucket.capacity();
201 if cost > capacity {
202 return Decision::Impossible;
203 }
204 let available = self.bucket.available();
205 if available >= cost {
206 return Decision::Acquired;
207 }
208 let config = self.bucket.config();
209 let refill_amount = config.refill_amount();
210 let period = config.refill_period();
211 if refill_amount == 0 || period.is_zero() {
212 // No refill, and not enough on hand: it will never accrue.
213 return Decision::Impossible;
214 }
215 // `cost <= capacity` and `available < cost`, so the deficit is positive
216 // and bounded by capacity; no underflow.
217 let deficit = cost - available;
218 Decision::Retry {
219 after: estimate_refill_wait(period, deficit, refill_amount),
220 }
221 }
222
223 /// The synchronous, consuming core shared by the trait impl and the waiting
224 /// surface. Deducts `cost` on success.
225 #[inline]
226 fn decide(&self, cost: u32) -> Decision {
227 match self.bucket.acquire(cost) {
228 BucketDecision::Allowed => Decision::Acquired,
229 BucketDecision::Denied { retry_after } if retry_after == Duration::MAX => {
230 Decision::Impossible
231 }
232 BucketDecision::Denied { retry_after } => Decision::Retry { after: retry_after },
233 // `better_bucket::Decision` is `#[non_exhaustive]`. An outcome this
234 // version does not understand is treated as un-grantable rather than
235 // risk over-sending to a downstream.
236 _ => Decision::Impossible,
237 }
238 }
239}
240
241#[cfg(feature = "runtime")]
242#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
243impl<C: Clock> Throttle<C> {
244 /// Takes one token, waiting until one is available.
245 ///
246 /// This is the marquee outbound operation: it paces the caller instead of
247 /// rejecting it. It returns once a token has been deducted, or
248 /// [`ThrottleError::CostExceedsCapacity`] if the throttle's capacity is zero.
249 ///
250 /// # Errors
251 ///
252 /// Returns [`ThrottleError::CostExceedsCapacity`] when the capacity is `0`,
253 /// because a single token can never be granted.
254 ///
255 /// # Examples
256 ///
257 /// ```
258 /// # async fn run() -> Result<(), throttle_net::ThrottleError> {
259 /// use throttle_net::Throttle;
260 ///
261 /// let throttle = Throttle::per_second(100);
262 /// throttle.acquire().await?;
263 /// # Ok(())
264 /// # }
265 /// ```
266 pub async fn acquire(&self) -> Result<(), ThrottleError> {
267 self.acquire_with_cost(1).await
268 }
269
270 /// Takes `cost` tokens, waiting until they are available.
271 ///
272 /// The cost lets one request weigh more than another — a batch of ten, or an
273 /// LLM call billed by token count. The waiter sleeps for the bucket's own
274 /// estimate of the refill time and retries, so it converges without busy
275 /// spinning even under contention.
276 ///
277 /// # Errors
278 ///
279 /// Returns [`ThrottleError::CostExceedsCapacity`] when `cost` exceeds the
280 /// throttle's capacity; that request can never be granted, so it fails fast
281 /// rather than waiting forever.
282 ///
283 /// # Examples
284 ///
285 /// ```
286 /// # async fn run() -> Result<(), throttle_net::ThrottleError> {
287 /// use throttle_net::Throttle;
288 ///
289 /// let throttle = Throttle::per_second(1000);
290 /// throttle.acquire_with_cost(250).await?; // a heavier request
291 /// # Ok(())
292 /// # }
293 /// ```
294 pub async fn acquire_with_cost(&self, cost: u32) -> Result<(), ThrottleError> {
295 let timer = crate::obs::Timer::start();
296 let result = loop {
297 match self.decide(cost) {
298 Decision::Acquired => break Ok(()),
299 Decision::Impossible => {
300 break Err(ThrottleError::CostExceedsCapacity {
301 cost,
302 capacity: self.capacity(),
303 });
304 }
305 Decision::Retry { after } => crate::rt::sleep(after).await,
306 }
307 };
308 if result.is_ok() {
309 crate::obs::acquired("throttle");
310 }
311 crate::obs::wait("throttle", &timer);
312 crate::obs::trace_acquire("throttle", cost, result.is_ok(), &timer);
313 result
314 }
315}
316
317/// Estimates the wait until `deficit` tokens accrue at `refill_amount` per
318/// `period`, rounded up so the caller never wakes a touch too early.
319///
320/// Computed in integer nanoseconds (`u128`) to stay deterministic and avoid
321/// floating point; the result is clamped to the `Duration::from_nanos` range.
322#[inline]
323fn estimate_refill_wait(period: Duration, deficit: u32, refill_amount: u32) -> Duration {
324 let numerator = period.as_nanos().saturating_mul(u128::from(deficit));
325 let nanos = numerator.div_ceil(u128::from(refill_amount));
326 Duration::from_nanos(u64::try_from(nanos).unwrap_or(u64::MAX))
327}
328
329impl<C: Clock> Limiter for Throttle<C> {
330 #[inline]
331 fn peek(&self, cost: u32) -> Decision {
332 Throttle::peek(self, cost)
333 }
334
335 #[inline]
336 fn acquire_cost(&self, cost: u32) -> Decision {
337 self.decide(cost)
338 }
339
340 #[inline]
341 fn available(&self) -> u32 {
342 self.bucket.available()
343 }
344
345 #[inline]
346 fn capacity(&self) -> u32 {
347 self.bucket.capacity()
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 #![allow(clippy::unwrap_used)]
354
355 use super::Throttle;
356 use crate::decision::Decision;
357 use crate::error::ThrottleError;
358 use crate::limiter::Limiter;
359 use clock_lib::ManualClock;
360 use core::time::Duration;
361 use std::sync::Arc;
362
363 fn assert_send_sync<T: Send + Sync>() {}
364
365 #[test]
366 fn test_public_types_are_send_sync() {
367 assert_send_sync::<Throttle>();
368 assert_send_sync::<Decision>();
369 assert_send_sync::<ThrottleError>();
370 }
371
372 #[test]
373 fn test_try_acquire_grants_up_to_capacity_then_refuses() {
374 let throttle = Throttle::per_second(3);
375 assert!(throttle.try_acquire());
376 assert!(throttle.try_acquire());
377 assert!(throttle.try_acquire());
378 assert!(!throttle.try_acquire());
379 }
380
381 #[test]
382 fn test_try_acquire_with_cost_is_all_or_nothing() {
383 let throttle = Throttle::per_second(10);
384 assert!(throttle.try_acquire_with_cost(7));
385 // Only 3 remain, so a cost of 7 takes nothing.
386 assert!(!throttle.try_acquire_with_cost(7));
387 assert!(throttle.try_acquire_with_cost(3));
388 }
389
390 #[test]
391 fn test_refill_after_a_full_period_under_manual_clock() {
392 let clock = Arc::new(ManualClock::new());
393 let throttle = Throttle::per_second(4).with_clock(clock.clone());
394
395 for _ in 0..4 {
396 assert!(throttle.try_acquire());
397 }
398 assert!(!throttle.try_acquire());
399
400 clock.advance(Duration::from_secs(1));
401 assert!(throttle.try_acquire());
402 }
403
404 #[test]
405 fn test_acquire_cost_reports_retry_then_impossible() {
406 let throttle = Throttle::per_second(2);
407 assert_eq!(throttle.acquire_cost(2), Decision::Acquired);
408 // Drained: another unit must wait.
409 assert!(matches!(throttle.acquire_cost(1), Decision::Retry { .. }));
410 // A cost beyond capacity can never be granted.
411 assert_eq!(throttle.acquire_cost(3), Decision::Impossible);
412 }
413
414 #[test]
415 fn test_available_tracks_consumption() {
416 let throttle = Throttle::per_second(5);
417 assert_eq!(throttle.available(), 5);
418 assert!(throttle.try_acquire_with_cost(2));
419 assert_eq!(throttle.available(), 3);
420 }
421
422 #[tokio::test]
423 async fn test_acquire_returns_immediately_when_a_token_is_free() {
424 let throttle = Throttle::per_second(1);
425 assert!(throttle.acquire().await.is_ok());
426 }
427
428 #[tokio::test]
429 async fn test_acquire_with_cost_errors_when_cost_exceeds_capacity() {
430 let throttle = Throttle::per_second(5);
431 let err = throttle.acquire_with_cost(9).await.unwrap_err();
432 assert_eq!(
433 err,
434 ThrottleError::CostExceedsCapacity {
435 cost: 9,
436 capacity: 5,
437 }
438 );
439 }
440
441 #[tokio::test]
442 async fn test_acquire_waits_for_refill_then_succeeds() {
443 // Capacity 1000 refilling at 1 token/ms: after draining, one token
444 // returns in about a millisecond, so the waiter completes promptly.
445 let throttle = Throttle::per_second(1000);
446 for _ in 0..1000 {
447 assert!(throttle.try_acquire());
448 }
449 assert!(!throttle.try_acquire());
450 assert!(throttle.acquire().await.is_ok());
451 }
452}