rate_net/async_limiter.rs
1//! Optional async-friendly wrapper.
2//!
3//! The core is sync and runtime-free: [`check`](crate::RateLimiter::check) never
4//! blocks, so most async callers just call it and shed load on a denial. This
5//! layer adds the one thing that genuinely needs a runtime — *awaiting* until a
6//! key is allowed — behind the `async` feature, so the core never depends on a
7//! runtime.
8//!
9//! [`AsyncLimiter::until_ready`] retries on each denial, sleeping for the
10//! reported `retry_after` via [`tokio::time::sleep`], until the request is
11//! admitted. It needs a clock that actually advances (the default
12//! [`SystemClock`](clock_lib::SystemClock)); under a frozen `ManualClock` the
13//! allowance never refills, so it would wait forever.
14
15use core::time::Duration;
16
17use clock_lib::{Clock, SystemClock};
18
19use crate::decision::Decision;
20use crate::key::Key;
21use crate::limiter::RateLimiter;
22
23/// An async-friendly wrapper around a [`RateLimiter`].
24///
25/// Holds a limiter and adds [`until_ready`](Self::until_ready) /
26/// [`until_ready_n`](Self::until_ready_n), which await until the key is admitted
27/// rather than returning a denial. The synchronous [`check`](Self::check) /
28/// [`check_n`](Self::check_n) pass straight through.
29///
30/// # Examples
31///
32/// ```
33/// use rate_net::{AsyncLimiter, RateLimiter};
34///
35/// # async fn demo() {
36/// let limiter = AsyncLimiter::new(RateLimiter::per_second(100));
37///
38/// // Non-blocking: returns immediately, allow or deny.
39/// let _ = limiter.check("user:42");
40///
41/// // Awaiting: returns once the key is within its limit.
42/// limiter.until_ready("user:42").await;
43/// # }
44/// ```
45pub struct AsyncLimiter<C: Clock + Clone = SystemClock> {
46 inner: RateLimiter<C>,
47}
48
49impl<C: Clock + Clone> AsyncLimiter<C> {
50 /// Wraps a limiter for async use.
51 #[must_use]
52 pub fn new(inner: RateLimiter<C>) -> Self {
53 Self { inner }
54 }
55
56 /// Borrows the wrapped limiter.
57 #[must_use]
58 pub fn inner(&self) -> &RateLimiter<C> {
59 &self.inner
60 }
61
62 /// Unwraps back into the limiter.
63 #[must_use]
64 pub fn into_inner(self) -> RateLimiter<C> {
65 self.inner
66 }
67
68 /// Checks a single unit against `key` without awaiting — a straight
69 /// pass-through to [`RateLimiter::check`].
70 pub fn check(&self, key: impl Into<Key>) -> Decision {
71 self.inner.check(key)
72 }
73
74 /// Checks `n` units against `key` without awaiting — a straight pass-through
75 /// to [`RateLimiter::check_n`].
76 pub fn check_n(&self, key: impl Into<Key>, n: u32) -> Decision {
77 self.inner.check_n(key, n)
78 }
79
80 /// Awaits until a single unit is admitted for `key`.
81 ///
82 /// Equivalent to `until_ready_n(key, 1)`.
83 pub async fn until_ready(&self, key: impl Into<Key>) {
84 self.until_ready_n(key, 1).await;
85 }
86
87 /// Awaits until `n` units are admitted for `key`, sleeping for the reported
88 /// `retry_after` after each denial.
89 ///
90 /// Returns immediately if `n` exceeds what the limit can ever grant (a
91 /// `retry_after` of [`Duration::MAX`]): no amount of waiting would help, so
92 /// it gives up rather than sleep forever.
93 pub async fn until_ready_n(&self, key: impl Into<Key>, n: u32) {
94 let key = key.into();
95 loop {
96 match self.inner.check_n(key.clone(), n) {
97 Decision::Allow => return,
98 Decision::Deny { retry_after } => {
99 if retry_after == Duration::MAX {
100 return; // can never succeed; do not wait forever
101 }
102 tokio::time::sleep(retry_after).await;
103 }
104 }
105 }
106 }
107}
108
109impl<C: Clock + Clone> From<RateLimiter<C>> for AsyncLimiter<C> {
110 fn from(inner: RateLimiter<C>) -> Self {
111 Self::new(inner)
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 use super::AsyncLimiter;
118 use crate::limiter::RateLimiter;
119
120 #[test]
121 fn test_check_passthrough_is_sync() {
122 let limiter = AsyncLimiter::new(RateLimiter::per_second(1));
123 assert!(limiter.check("k").is_allow());
124 assert!(limiter.check("k").is_deny());
125 }
126
127 #[tokio::test]
128 async fn test_until_ready_admits_after_waiting() {
129 // A fast rate keeps the wait to a few milliseconds of real time.
130 let limiter = AsyncLimiter::new(RateLimiter::per_second(200));
131 // Drain the key, so the next admit must wait for a refill.
132 for _ in 0..200 {
133 assert!(limiter.check("k").is_allow());
134 }
135 assert!(limiter.check("k").is_deny());
136 // Awaiting must complete once a token refills (~5ms for 200/s). The
137 // timeout guards against a hang rather than asserting exact timing,
138 // which the OS scheduler makes nondeterministic.
139 let completed =
140 tokio::time::timeout(std::time::Duration::from_secs(2), limiter.until_ready("k")).await;
141 assert!(completed.is_ok(), "until_ready did not complete within 2s");
142 }
143
144 #[tokio::test]
145 async fn test_until_ready_n_gives_up_when_impossible() {
146 let limiter = AsyncLimiter::new(RateLimiter::per_second(5));
147 // 6 > capacity of 5 → can never succeed; must return without hanging.
148 limiter.until_ready_n("k", 6).await;
149 }
150
151 #[test]
152 fn test_from_and_into_inner_round_trip() {
153 let limiter: AsyncLimiter = RateLimiter::per_second(10).into();
154 assert_eq!(limiter.inner().quota().limit(), 10);
155 let back = limiter.into_inner();
156 assert_eq!(back.quota().limit(), 10);
157 }
158}