Skip to main content

rate_net/
async_limiter.rs

1//! Optional async-friendly wrapper.
2//!
3//! The core is sync and runtime-free: [`check`](crate::RateLimiter::check) never
4//! blocks, so most async callers just call it and shed load on a denial. This
5//! layer adds the one thing that genuinely needs a runtime — *awaiting* until a
6//! key is allowed — behind the `async` feature, so the core never depends on a
7//! runtime.
8//!
9//! [`AsyncLimiter::until_ready`] retries on each denial, sleeping for the
10//! reported `retry_after` via [`tokio::time::sleep`], until the request is
11//! admitted. It needs a clock that actually advances (the default
12//! [`SystemClock`](clock_lib::SystemClock)); under a frozen `ManualClock` the
13//! allowance never refills, so it would wait forever.
14
15use core::time::Duration;
16
17use clock_lib::{Clock, SystemClock};
18
19use crate::decision::Decision;
20use crate::key::Key;
21use crate::limiter::RateLimiter;
22
23/// An async-friendly wrapper around a [`RateLimiter`].
24///
25/// Holds a limiter and adds [`until_ready`](Self::until_ready) /
26/// [`until_ready_n`](Self::until_ready_n), which await until the key is admitted
27/// rather than returning a denial. The synchronous [`check`](Self::check) /
28/// [`check_n`](Self::check_n) pass straight through.
29///
30/// # Examples
31///
32/// ```
33/// use rate_net::{AsyncLimiter, RateLimiter};
34///
35/// # async fn demo() {
36/// let limiter = AsyncLimiter::new(RateLimiter::per_second(100));
37///
38/// // Non-blocking: returns immediately, allow or deny.
39/// let _ = limiter.check("user:42");
40///
41/// // Awaiting: returns once the key is within its limit.
42/// limiter.until_ready("user:42").await;
43/// # }
44/// ```
45pub struct AsyncLimiter<C: Clock + Clone = SystemClock> {
46    inner: RateLimiter<C>,
47}
48
49impl<C: Clock + Clone> AsyncLimiter<C> {
50    /// Wraps a limiter for async use.
51    #[must_use]
52    pub fn new(inner: RateLimiter<C>) -> Self {
53        Self { inner }
54    }
55
56    /// Borrows the wrapped limiter.
57    #[must_use]
58    pub fn inner(&self) -> &RateLimiter<C> {
59        &self.inner
60    }
61
62    /// Unwraps back into the limiter.
63    #[must_use]
64    pub fn into_inner(self) -> RateLimiter<C> {
65        self.inner
66    }
67
68    /// Checks a single unit against `key` without awaiting — a straight
69    /// pass-through to [`RateLimiter::check`].
70    pub fn check(&self, key: impl Into<Key>) -> Decision {
71        self.inner.check(key)
72    }
73
74    /// Checks `n` units against `key` without awaiting — a straight pass-through
75    /// to [`RateLimiter::check_n`].
76    pub fn check_n(&self, key: impl Into<Key>, n: u32) -> Decision {
77        self.inner.check_n(key, n)
78    }
79
80    /// Awaits until a single unit is admitted for `key`.
81    ///
82    /// Equivalent to `until_ready_n(key, 1)`.
83    pub async fn until_ready(&self, key: impl Into<Key>) {
84        self.until_ready_n(key, 1).await;
85    }
86
87    /// Awaits until `n` units are admitted for `key`, sleeping for the reported
88    /// `retry_after` after each denial.
89    ///
90    /// Returns immediately if `n` exceeds what the limit can ever grant (a
91    /// `retry_after` of [`Duration::MAX`]): no amount of waiting would help, so
92    /// it gives up rather than sleep forever.
93    pub async fn until_ready_n(&self, key: impl Into<Key>, n: u32) {
94        let key = key.into();
95        loop {
96            match self.inner.check_n(key.clone(), n) {
97                Decision::Allow => return,
98                Decision::Deny { retry_after } => {
99                    if retry_after == Duration::MAX {
100                        return; // can never succeed; do not wait forever
101                    }
102                    tokio::time::sleep(retry_after).await;
103                }
104            }
105        }
106    }
107}
108
109impl<C: Clock + Clone> From<RateLimiter<C>> for AsyncLimiter<C> {
110    fn from(inner: RateLimiter<C>) -> Self {
111        Self::new(inner)
112    }
113}
114
115#[cfg(test)]
116mod tests {
117    use super::AsyncLimiter;
118    use crate::limiter::RateLimiter;
119
120    #[test]
121    fn test_check_passthrough_is_sync() {
122        let limiter = AsyncLimiter::new(RateLimiter::per_second(1));
123        assert!(limiter.check("k").is_allow());
124        assert!(limiter.check("k").is_deny());
125    }
126
127    #[tokio::test]
128    async fn test_until_ready_admits_after_waiting() {
129        // A fast rate keeps the wait to a few milliseconds of real time.
130        let limiter = AsyncLimiter::new(RateLimiter::per_second(200));
131        // Drain the key, so the next admit must wait for a refill.
132        for _ in 0..200 {
133            assert!(limiter.check("k").is_allow());
134        }
135        assert!(limiter.check("k").is_deny());
136        // Awaiting must complete once a token refills (~5ms for 200/s). The
137        // timeout guards against a hang rather than asserting exact timing,
138        // which the OS scheduler makes nondeterministic.
139        let completed =
140            tokio::time::timeout(std::time::Duration::from_secs(2), limiter.until_ready("k")).await;
141        assert!(completed.is_ok(), "until_ready did not complete within 2s");
142    }
143
144    #[tokio::test]
145    async fn test_until_ready_n_gives_up_when_impossible() {
146        let limiter = AsyncLimiter::new(RateLimiter::per_second(5));
147        // 6 > capacity of 5 → can never succeed; must return without hanging.
148        limiter.until_ready_n("k", 6).await;
149    }
150
151    #[test]
152    fn test_from_and_into_inner_round_trip() {
153        let limiter: AsyncLimiter = RateLimiter::per_second(10).into();
154        assert_eq!(limiter.inner().quota().limit(), 10);
155        let back = limiter.into_inner();
156        assert_eq!(back.quota().limit(), 10);
157    }
158}