throttle_net/hybrid.rs
1//! A hybrid limiter: a request must clear every constituent.
2
3use std::sync::Arc;
4
5use crate::decision::Decision;
6#[cfg(feature = "runtime")]
7use crate::error::ThrottleError;
8use crate::limiter::{Limiter, acquire_all, peek_all};
9
10/// Several limiters combined so a request must satisfy **all** of them.
11///
12/// The classic use is layering windows on one resource — say "at most 10 per
13/// second *and* at most 100 per minute" — where either ceiling can bind. A
14/// hybrid is itself a [`Limiter`], so hybrids nest and slot anywhere a single
15/// limiter does.
16///
17/// Acquisition is two-phase to stay correct: every constituent is first
18/// [`peek`](Limiter::peek)ed, and tokens are only taken once all of them would
19/// grant. Without that, an early constituent could spend a token for a request a
20/// later one refuses. See [`Limiter`] for the full rationale.
21///
22/// Build one with [`Hybrid::builder`].
23///
24/// # Examples
25///
26/// ```
27/// use std::time::Duration;
28/// use throttle_net::{Hybrid, Throttle};
29///
30/// // 10 per second, and no more than 100 per minute.
31/// let hybrid = Hybrid::builder()
32/// .limiter(Throttle::per_second(10))
33/// .limiter(Throttle::per_duration(100, Duration::from_secs(60)))
34/// .build();
35///
36/// assert!(hybrid.try_acquire());
37/// ```
38#[derive(Clone)]
39pub struct Hybrid {
40 constituents: Arc<[Arc<dyn Limiter>]>,
41}
42
43impl Hybrid {
44 /// Starts building a hybrid limiter.
45 #[must_use]
46 pub fn builder() -> HybridBuilder {
47 HybridBuilder {
48 constituents: Vec::new(),
49 }
50 }
51
52 #[inline]
53 fn pairs(&self, cost: u32) -> impl Iterator<Item = (&dyn Limiter, u32)> + Clone {
54 self.constituents.iter().map(move |l| (l.as_ref(), cost))
55 }
56
57 /// Attempts to take one token from every constituent without waiting,
58 /// returning whether all granted.
59 ///
60 /// All-or-nothing across constituents: either every one is debited or, on a
61 /// refusal, the call reports failure.
62 ///
63 /// # Examples
64 ///
65 /// ```
66 /// use throttle_net::{Hybrid, Throttle};
67 ///
68 /// let hybrid = Hybrid::builder().limiter(Throttle::per_second(1)).build();
69 /// assert!(hybrid.try_acquire());
70 /// assert!(!hybrid.try_acquire());
71 /// ```
72 #[inline]
73 #[must_use]
74 pub fn try_acquire(&self) -> bool {
75 self.try_acquire_with_cost(1)
76 }
77
78 /// Attempts to take `cost` tokens from every constituent without waiting.
79 #[inline]
80 #[must_use]
81 pub fn try_acquire_with_cost(&self, cost: u32) -> bool {
82 acquire_all(self.pairs(cost)).is_acquired()
83 }
84}
85
86#[cfg(feature = "runtime")]
87#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
88impl Hybrid {
89 /// Takes one token from every constituent, waiting until all are free.
90 ///
91 /// # Errors
92 ///
93 /// Returns [`ThrottleError::CostExceedsCapacity`] if some constituent's
94 /// capacity is too small to ever grant the request.
95 ///
96 /// # Examples
97 ///
98 /// ```
99 /// # async fn run() -> Result<(), throttle_net::ThrottleError> {
100 /// use throttle_net::{Hybrid, Throttle};
101 ///
102 /// let hybrid = Hybrid::builder().limiter(Throttle::per_second(100)).build();
103 /// hybrid.acquire().await?;
104 /// # Ok(())
105 /// # }
106 /// ```
107 pub async fn acquire(&self) -> Result<(), ThrottleError> {
108 self.acquire_with_cost(1).await
109 }
110
111 /// Takes `cost` tokens from every constituent, waiting until all are free.
112 ///
113 /// # Errors
114 ///
115 /// Returns [`ThrottleError::CostExceedsCapacity`] if some constituent can
116 /// never grant `cost`.
117 pub async fn acquire_with_cost(&self, cost: u32) -> Result<(), ThrottleError> {
118 loop {
119 match acquire_all(self.pairs(cost)) {
120 Decision::Acquired => return Ok(()),
121 Decision::Impossible => {
122 return Err(ThrottleError::CostExceedsCapacity {
123 cost,
124 capacity: self.capacity(),
125 });
126 }
127 Decision::Retry { after } => crate::rt::sleep(after).await,
128 }
129 }
130 }
131}
132
133impl Limiter for Hybrid {
134 #[inline]
135 fn peek(&self, cost: u32) -> Decision {
136 peek_all(self.pairs(cost))
137 }
138
139 #[inline]
140 fn acquire_cost(&self, cost: u32) -> Decision {
141 acquire_all(self.pairs(cost))
142 }
143
144 /// The headroom of the *binding* constituent: the fewest tokens any one of
145 /// them has available. An empty hybrid is unbounded ([`u32::MAX`]).
146 #[inline]
147 fn available(&self) -> u32 {
148 self.constituents
149 .iter()
150 .map(|l| l.available())
151 .min()
152 .unwrap_or(u32::MAX)
153 }
154
155 /// The capacity of the *binding* constituent: the smallest capacity among
156 /// them, since that is the first ceiling a request hits. An empty hybrid is
157 /// unbounded ([`u32::MAX`]).
158 #[inline]
159 fn capacity(&self) -> u32 {
160 self.constituents
161 .iter()
162 .map(|l| l.capacity())
163 .min()
164 .unwrap_or(u32::MAX)
165 }
166}
167
168/// Builder for a [`Hybrid`] limiter.
169///
170/// Add constituents with [`limiter`](Self::limiter); each must outlive the
171/// hybrid, so it is stored behind an [`Arc`]. Finish with [`build`](Self::build).
172///
173/// # Examples
174///
175/// ```
176/// use throttle_net::{Hybrid, Throttle};
177///
178/// let hybrid = Hybrid::builder()
179/// .limiter(Throttle::per_second(5))
180/// .build();
181/// # let _ = hybrid;
182/// ```
183#[derive(Default)]
184pub struct HybridBuilder {
185 constituents: Vec<Arc<dyn Limiter>>,
186}
187
188impl HybridBuilder {
189 /// Adds a constituent the hybrid must satisfy.
190 #[must_use]
191 pub fn limiter(mut self, limiter: impl Limiter + 'static) -> Self {
192 self.constituents.push(Arc::new(limiter));
193 self
194 }
195
196 /// Adds an already-shared constituent (for reusing one limiter across
197 /// several composites).
198 #[must_use]
199 pub fn shared(mut self, limiter: Arc<dyn Limiter>) -> Self {
200 self.constituents.push(limiter);
201 self
202 }
203
204 /// Builds the [`Hybrid`].
205 #[must_use]
206 pub fn build(self) -> Hybrid {
207 Hybrid {
208 constituents: self.constituents.into(),
209 }
210 }
211}
212
213#[cfg(test)]
214mod tests {
215 #![allow(clippy::unwrap_used)]
216
217 use super::Hybrid;
218 use crate::limiter::Limiter;
219 use crate::throttle::Throttle;
220 use clock_lib::ManualClock;
221 use core::time::Duration;
222 use std::sync::Arc;
223
224 fn assert_send_sync<T: Send + Sync>() {}
225
226 #[test]
227 fn test_hybrid_is_send_sync() {
228 assert_send_sync::<Hybrid>();
229 }
230
231 #[test]
232 fn test_request_must_clear_every_constituent() {
233 // 5 per second AND 3 per minute: the tighter (3) binds.
234 let hybrid = Hybrid::builder()
235 .limiter(Throttle::per_second(5))
236 .limiter(Throttle::per_duration(3, Duration::from_secs(60)))
237 .build();
238
239 assert!(hybrid.try_acquire());
240 assert!(hybrid.try_acquire());
241 assert!(hybrid.try_acquire());
242 // The per-minute limiter is now empty even though the per-second one is
243 // not, so the hybrid refuses.
244 assert!(!hybrid.try_acquire());
245 }
246
247 #[test]
248 fn test_peek_does_not_consume() {
249 let hybrid = Hybrid::builder().limiter(Throttle::per_second(2)).build();
250 assert!(hybrid.peek(2).is_acquired());
251 // peek took nothing, so both tokens are still there.
252 assert!(hybrid.try_acquire_with_cost(2));
253 }
254
255 #[test]
256 fn test_no_token_lost_when_a_later_constituent_refuses() {
257 // First limiter has plenty; second is exhausted. The peek-then-commit
258 // contract means the first limiter must NOT lose a token to a request
259 // the second one blocks.
260 let plenty: Arc<dyn Limiter> = Arc::new(Throttle::per_second(100));
261 let exhausted = Throttle::per_second(1);
262 assert!(exhausted.try_acquire()); // drain it to zero
263
264 let hybrid = Hybrid::builder()
265 .shared(plenty.clone())
266 .limiter(exhausted)
267 .build();
268
269 let before = plenty.available();
270 assert!(!hybrid.try_acquire());
271 // The plentiful limiter is untouched: the hybrid peeked, saw the second
272 // constituent could not grant, and never committed to the first.
273 assert_eq!(plenty.available(), before);
274 }
275
276 #[test]
277 fn test_capacity_and_available_report_the_binding_constituent() {
278 let hybrid = Hybrid::builder()
279 .limiter(Throttle::per_second(10))
280 .limiter(Throttle::per_second(4))
281 .build();
282 assert_eq!(hybrid.capacity(), 4);
283 assert_eq!(hybrid.available(), 4);
284 }
285
286 #[test]
287 fn test_refill_recovers_the_hybrid_under_manual_clock() {
288 let clock = Arc::new(ManualClock::new());
289 let hybrid = Hybrid::builder()
290 .limiter(Throttle::per_second(2).with_clock(clock.clone()))
291 .limiter(Throttle::per_second(2).with_clock(clock.clone()))
292 .build();
293
294 assert!(hybrid.try_acquire());
295 assert!(hybrid.try_acquire());
296 assert!(!hybrid.try_acquire());
297
298 clock.advance(Duration::from_secs(1));
299 assert!(hybrid.try_acquire());
300 }
301
302 #[cfg(feature = "runtime")]
303 #[tokio::test]
304 async fn test_acquire_errors_when_a_constituent_can_never_grant() {
305 use crate::error::ThrottleError;
306
307 let hybrid = Hybrid::builder()
308 .limiter(Throttle::per_second(10))
309 .limiter(Throttle::per_second(2))
310 .build();
311 let err = hybrid.acquire_with_cost(5).await.unwrap_err();
312 assert!(matches!(
313 err,
314 ThrottleError::CostExceedsCapacity {
315 cost: 5,
316 capacity: 2
317 }
318 ));
319 }
320}