Skip to main content

throttle_net/
hybrid.rs

1//! A hybrid limiter: a request must clear every constituent.
2
3use std::sync::Arc;
4
5use crate::decision::Decision;
6#[cfg(feature = "runtime")]
7use crate::error::ThrottleError;
8use crate::limiter::{Limiter, acquire_all, peek_all};
9
10/// Several limiters combined so a request must satisfy **all** of them.
11///
12/// The classic use is layering windows on one resource — say "at most 10 per
13/// second *and* at most 100 per minute" — where either ceiling can bind. A
14/// hybrid is itself a [`Limiter`], so hybrids nest and slot anywhere a single
15/// limiter does.
16///
17/// Acquisition is two-phase to stay correct: every constituent is first
18/// [`peek`](Limiter::peek)ed, and tokens are only taken once all of them would
19/// grant. Without that, an early constituent could spend a token for a request a
20/// later one refuses. See [`Limiter`] for the full rationale.
21///
22/// Build one with [`Hybrid::builder`].
23///
24/// # Examples
25///
26/// ```
27/// use std::time::Duration;
28/// use throttle_net::{Hybrid, Throttle};
29///
30/// // 10 per second, and no more than 100 per minute.
31/// let hybrid = Hybrid::builder()
32///     .limiter(Throttle::per_second(10))
33///     .limiter(Throttle::per_duration(100, Duration::from_secs(60)))
34///     .build();
35///
36/// assert!(hybrid.try_acquire());
37/// ```
38#[derive(Clone)]
39pub struct Hybrid {
40    constituents: Arc<[Arc<dyn Limiter>]>,
41}
42
43impl Hybrid {
44    /// Starts building a hybrid limiter.
45    #[must_use]
46    pub fn builder() -> HybridBuilder {
47        HybridBuilder {
48            constituents: Vec::new(),
49        }
50    }
51
52    #[inline]
53    fn pairs(&self, cost: u32) -> impl Iterator<Item = (&dyn Limiter, u32)> + Clone {
54        self.constituents.iter().map(move |l| (l.as_ref(), cost))
55    }
56
57    /// Attempts to take one token from every constituent without waiting,
58    /// returning whether all granted.
59    ///
60    /// All-or-nothing across constituents: either every one is debited or, on a
61    /// refusal, the call reports failure.
62    ///
63    /// # Examples
64    ///
65    /// ```
66    /// use throttle_net::{Hybrid, Throttle};
67    ///
68    /// let hybrid = Hybrid::builder().limiter(Throttle::per_second(1)).build();
69    /// assert!(hybrid.try_acquire());
70    /// assert!(!hybrid.try_acquire());
71    /// ```
72    #[inline]
73    #[must_use]
74    pub fn try_acquire(&self) -> bool {
75        self.try_acquire_with_cost(1)
76    }
77
78    /// Attempts to take `cost` tokens from every constituent without waiting.
79    #[inline]
80    #[must_use]
81    pub fn try_acquire_with_cost(&self, cost: u32) -> bool {
82        acquire_all(self.pairs(cost)).is_acquired()
83    }
84}
85
86#[cfg(feature = "runtime")]
87#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
88impl Hybrid {
89    /// Takes one token from every constituent, waiting until all are free.
90    ///
91    /// # Errors
92    ///
93    /// Returns [`ThrottleError::CostExceedsCapacity`] if some constituent's
94    /// capacity is too small to ever grant the request.
95    ///
96    /// # Examples
97    ///
98    /// ```
99    /// # async fn run() -> Result<(), throttle_net::ThrottleError> {
100    /// use throttle_net::{Hybrid, Throttle};
101    ///
102    /// let hybrid = Hybrid::builder().limiter(Throttle::per_second(100)).build();
103    /// hybrid.acquire().await?;
104    /// # Ok(())
105    /// # }
106    /// ```
107    pub async fn acquire(&self) -> Result<(), ThrottleError> {
108        self.acquire_with_cost(1).await
109    }
110
111    /// Takes `cost` tokens from every constituent, waiting until all are free.
112    ///
113    /// # Errors
114    ///
115    /// Returns [`ThrottleError::CostExceedsCapacity`] if some constituent can
116    /// never grant `cost`.
117    pub async fn acquire_with_cost(&self, cost: u32) -> Result<(), ThrottleError> {
118        loop {
119            match acquire_all(self.pairs(cost)) {
120                Decision::Acquired => return Ok(()),
121                Decision::Impossible => {
122                    return Err(ThrottleError::CostExceedsCapacity {
123                        cost,
124                        capacity: self.capacity(),
125                    });
126                }
127                Decision::Retry { after } => crate::rt::sleep(after).await,
128            }
129        }
130    }
131}
132
133impl Limiter for Hybrid {
134    #[inline]
135    fn peek(&self, cost: u32) -> Decision {
136        peek_all(self.pairs(cost))
137    }
138
139    #[inline]
140    fn acquire_cost(&self, cost: u32) -> Decision {
141        acquire_all(self.pairs(cost))
142    }
143
144    /// The headroom of the *binding* constituent: the fewest tokens any one of
145    /// them has available. An empty hybrid is unbounded ([`u32::MAX`]).
146    #[inline]
147    fn available(&self) -> u32 {
148        self.constituents
149            .iter()
150            .map(|l| l.available())
151            .min()
152            .unwrap_or(u32::MAX)
153    }
154
155    /// The capacity of the *binding* constituent: the smallest capacity among
156    /// them, since that is the first ceiling a request hits. An empty hybrid is
157    /// unbounded ([`u32::MAX`]).
158    #[inline]
159    fn capacity(&self) -> u32 {
160        self.constituents
161            .iter()
162            .map(|l| l.capacity())
163            .min()
164            .unwrap_or(u32::MAX)
165    }
166}
167
168/// Builder for a [`Hybrid`] limiter.
169///
170/// Add constituents with [`limiter`](Self::limiter); each must outlive the
171/// hybrid, so it is stored behind an [`Arc`]. Finish with [`build`](Self::build).
172///
173/// # Examples
174///
175/// ```
176/// use throttle_net::{Hybrid, Throttle};
177///
178/// let hybrid = Hybrid::builder()
179///     .limiter(Throttle::per_second(5))
180///     .build();
181/// # let _ = hybrid;
182/// ```
183#[derive(Default)]
184pub struct HybridBuilder {
185    constituents: Vec<Arc<dyn Limiter>>,
186}
187
188impl HybridBuilder {
189    /// Adds a constituent the hybrid must satisfy.
190    #[must_use]
191    pub fn limiter(mut self, limiter: impl Limiter + 'static) -> Self {
192        self.constituents.push(Arc::new(limiter));
193        self
194    }
195
196    /// Adds an already-shared constituent (for reusing one limiter across
197    /// several composites).
198    #[must_use]
199    pub fn shared(mut self, limiter: Arc<dyn Limiter>) -> Self {
200        self.constituents.push(limiter);
201        self
202    }
203
204    /// Builds the [`Hybrid`].
205    #[must_use]
206    pub fn build(self) -> Hybrid {
207        Hybrid {
208            constituents: self.constituents.into(),
209        }
210    }
211}
212
213#[cfg(test)]
214mod tests {
215    #![allow(clippy::unwrap_used)]
216
217    use super::Hybrid;
218    use crate::limiter::Limiter;
219    use crate::throttle::Throttle;
220    use clock_lib::ManualClock;
221    use core::time::Duration;
222    use std::sync::Arc;
223
224    fn assert_send_sync<T: Send + Sync>() {}
225
226    #[test]
227    fn test_hybrid_is_send_sync() {
228        assert_send_sync::<Hybrid>();
229    }
230
231    #[test]
232    fn test_request_must_clear_every_constituent() {
233        // 5 per second AND 3 per minute: the tighter (3) binds.
234        let hybrid = Hybrid::builder()
235            .limiter(Throttle::per_second(5))
236            .limiter(Throttle::per_duration(3, Duration::from_secs(60)))
237            .build();
238
239        assert!(hybrid.try_acquire());
240        assert!(hybrid.try_acquire());
241        assert!(hybrid.try_acquire());
242        // The per-minute limiter is now empty even though the per-second one is
243        // not, so the hybrid refuses.
244        assert!(!hybrid.try_acquire());
245    }
246
247    #[test]
248    fn test_peek_does_not_consume() {
249        let hybrid = Hybrid::builder().limiter(Throttle::per_second(2)).build();
250        assert!(hybrid.peek(2).is_acquired());
251        // peek took nothing, so both tokens are still there.
252        assert!(hybrid.try_acquire_with_cost(2));
253    }
254
255    #[test]
256    fn test_no_token_lost_when_a_later_constituent_refuses() {
257        // First limiter has plenty; second is exhausted. The peek-then-commit
258        // contract means the first limiter must NOT lose a token to a request
259        // the second one blocks.
260        let plenty: Arc<dyn Limiter> = Arc::new(Throttle::per_second(100));
261        let exhausted = Throttle::per_second(1);
262        assert!(exhausted.try_acquire()); // drain it to zero
263
264        let hybrid = Hybrid::builder()
265            .shared(plenty.clone())
266            .limiter(exhausted)
267            .build();
268
269        let before = plenty.available();
270        assert!(!hybrid.try_acquire());
271        // The plentiful limiter is untouched: the hybrid peeked, saw the second
272        // constituent could not grant, and never committed to the first.
273        assert_eq!(plenty.available(), before);
274    }
275
276    #[test]
277    fn test_capacity_and_available_report_the_binding_constituent() {
278        let hybrid = Hybrid::builder()
279            .limiter(Throttle::per_second(10))
280            .limiter(Throttle::per_second(4))
281            .build();
282        assert_eq!(hybrid.capacity(), 4);
283        assert_eq!(hybrid.available(), 4);
284    }
285
286    #[test]
287    fn test_refill_recovers_the_hybrid_under_manual_clock() {
288        let clock = Arc::new(ManualClock::new());
289        let hybrid = Hybrid::builder()
290            .limiter(Throttle::per_second(2).with_clock(clock.clone()))
291            .limiter(Throttle::per_second(2).with_clock(clock.clone()))
292            .build();
293
294        assert!(hybrid.try_acquire());
295        assert!(hybrid.try_acquire());
296        assert!(!hybrid.try_acquire());
297
298        clock.advance(Duration::from_secs(1));
299        assert!(hybrid.try_acquire());
300    }
301
302    #[cfg(feature = "runtime")]
303    #[tokio::test]
304    async fn test_acquire_errors_when_a_constituent_can_never_grant() {
305        use crate::error::ThrottleError;
306
307        let hybrid = Hybrid::builder()
308            .limiter(Throttle::per_second(10))
309            .limiter(Throttle::per_second(2))
310            .build();
311        let err = hybrid.acquire_with_cost(5).await.unwrap_err();
312        assert!(matches!(
313            err,
314            ThrottleError::CostExceedsCapacity {
315                cost: 5,
316                capacity: 2
317            }
318        ));
319    }
320}