async_rate_limiter/
lib.rs

1//! async-rate-limiter implements a [token bucket
2//! algorithm](https://en.wikipedia.org/wiki/Token_bucket) that can be used to
3//! limit API access frequency.
4//!
5//! ## Features
6//!
7//! - Simple to use
8//! - Support concurrent access
9//! - Low overhead, the number of tokens is calculated over time
10//!
11//! Thanks to Rust’s `async` / `await`, this crate is very simple to use. Just
12//! put your function call after [`RateLimiter::acquire()`].`await`, then the
13//! function will be called with the specified rate limit.
14//!
15//! [`RateLimiter`] also implements `Clone` trait, so you can use it in
16//! multiple tasks environment easily.
17//!
18//! ## Example
19//!
20//! Update your `Cargo.toml`:
21//!
22//! ```toml
23//! [dependencies]
24//! # Change features to ["rt-async-std"] if you are using async-std runtime.
25//! async-rate-limiter = { version = "1", features = ["rt-tokio"] }
26//! ```
27//!
28//! Here is a simple example:
29//!
30//! ```rust
31//! use async_rate_limiter::RateLimiter;
32//! use std::time::Duration;
33//! use tokio::spawn;
34//!
35//! #[tokio::main]
36//! async fn main() {
37//!     let rl = RateLimiter::new(3);
38//!     // You can change `burst` at anytime
39//!     rl.burst(5);
40//!     
41//!     rl.acquire().await;
42//!     println!("Do something that you want to limit the rate ...");
43//!
44//!     let res = rl.try_acquire();
45//!     if res.is_ok() {
46//!         println!("Do something that you want to limit the rate ...");
47//!     }
48//!
49//!     // acquire with a timeout
50//!     let ok = rl.acquire_with_timeout(Duration::from_secs(10)).await;
51//!     if ok {
52//!         println!("Do something that you want to limit the rate ...");
53//!     }
54//!
55//!     // Concurrent use
56//!     let rl = rl.clone();
57//!     spawn(async move {
58//!         let res = rl.acquire_with_timeout(Duration::from_millis(340)).await;
59//!         assert!(res);
60//!     });
61//! }
62//!
63//! ```
64//!
65//! async-rate-limiter can support different async runtimes, tokio & async-std
66//! are supported currently. You can use features to switch async runtimes.
67
68mod token_bucket;
69pub use token_bucket::TokenBucketRateLimiter as RateLimiter;
70
71mod rt;
72
73#[cfg(test)]
74mod tests {
75    use std::time::{Duration, Instant};
76    use tokio::spawn;
77
78    use super::*;
79
80    // When using async-std runtime, the delay/interval seems to be longer than
81    // expected, so the time check condition is deliberately relaxed in the
82    // test case.
83
84    #[tokio::test]
85    #[cfg(any(feature = "rt-tokio", feature = "rt-async-std"))]
86    async fn test_try_acquire() {
87        use rt::delay;
88
89        let rl = RateLimiter::new(3);
90        rl.burst(5);
91
92        rl.try_acquire().unwrap();
93
94        let duration = rl.try_acquire().unwrap_err();
95        assert!(duration > Duration::from_millis(330));
96        assert!(duration < Duration::from_millis(340));
97
98        delay(duration).await;
99
100        rl.try_acquire().unwrap();
101    }
102
103    #[tokio::test]
104    #[cfg(any(feature = "rt-tokio", feature = "rt-async-std"))]
105    async fn test_acquire() {
106        let rl = RateLimiter::new(3);
107        rl.burst(5);
108
109        let start = Instant::now();
110        rl.acquire().await;
111        assert!(start.elapsed() < Duration::from_millis(10));
112        rl.acquire().await;
113        assert!(start.elapsed() > Duration::from_millis(330));
114        assert!(start.elapsed() < Duration::from_millis(340));
115        rl.acquire().await;
116        assert!(start.elapsed() > Duration::from_millis(660));
117        assert!(start.elapsed() < Duration::from_millis(680));
118
119        let res = rl.acquire_with_timeout(Duration::from_millis(5000)).await;
120        assert!(res);
121        assert!(
122            start.elapsed() >= Duration::from_secs(1),
123            "got: {:?}",
124            start.elapsed()
125        );
126        assert!(start.elapsed() < Duration::from_millis(1030));
127
128        let res = rl.acquire_with_timeout(Duration::from_millis(10)).await;
129        assert!(!res);
130        assert!(start.elapsed() < Duration::from_millis(1050));
131    }
132
133    #[tokio::test]
134    #[cfg(any(feature = "rt-tokio", feature = "rt-async-std"))]
135    async fn test_clone() {
136        let rl = RateLimiter::new(3);
137        rl.burst(5);
138
139        let start = Instant::now();
140        rl.acquire().await;
141        assert!(start.elapsed() < Duration::from_millis(10));
142        rl.acquire().await;
143        assert!(start.elapsed() > Duration::from_millis(330));
144        assert!(start.elapsed() < Duration::from_millis(340));
145        rl.acquire().await;
146        assert!(start.elapsed() > Duration::from_millis(660));
147        assert!(start.elapsed() < Duration::from_millis(680));
148
149        let rl2 = rl.clone();
150        let jh = spawn(async move {
151            let rl = rl2;
152            let start = Instant::now();
153            let res = rl.acquire_with_timeout(Duration::from_millis(700)).await;
154            assert!(res);
155            assert!(
156                start.elapsed() <= Duration::from_millis(700),
157                "got: {:?}",
158                start.elapsed()
159            );
160        });
161
162        let res = rl.acquire_with_timeout(Duration::from_millis(700)).await;
163        assert!(res);
164
165        assert!(jh.await.is_ok());
166    }
167
168    #[tokio::test]
169    #[cfg(any(feature = "rt-tokio", feature = "rt-async-std"))]
170    async fn test_cancel_task() {
171        use rt::delay;
172
173        let rl = RateLimiter::new(3);
174        rl.burst(5);
175
176        let start = Instant::now();
177        rl.acquire().await;
178        assert!(start.elapsed() < Duration::from_millis(10));
179        rl.acquire().await;
180        assert!(start.elapsed() > Duration::from_millis(330));
181        assert!(start.elapsed() < Duration::from_millis(340));
182        rl.acquire().await;
183        assert!(start.elapsed() > Duration::from_millis(660));
184        assert!(start.elapsed() < Duration::from_millis(680));
185
186        let rl2 = rl.clone();
187        let jh = spawn(async move {
188            let rl = rl2;
189            let start = Instant::now();
190            let res = rl.acquire_with_timeout(Duration::from_millis(700)).await;
191            assert!(res);
192            assert!(start.elapsed() <= Duration::from_millis(700));
193        });
194        delay(Duration::from_millis(100)).await;
195        jh.abort();
196
197        let start = Instant::now();
198        let res = rl.acquire_with_timeout(Duration::from_millis(5000)).await;
199        assert!(res);
200        assert!(
201            start.elapsed() <= Duration::from_millis(700),
202            "got: {:?}",
203            start.elapsed()
204        );
205
206        let start = Instant::now();
207        let res = rl.acquire_with_timeout(Duration::from_millis(5000)).await;
208        assert!(res);
209        assert!(
210            start.elapsed() <= Duration::from_millis(10),
211            "got: {:?}",
212            start.elapsed()
213        );
214    }
215}