agent_chain_core/rate_limiters.rs
1use async_trait::async_trait;
2use std::sync::Mutex;
3use std::time::Instant;
4
5/// Base trait for rate limiters.
6///
7/// Usage of the base limiter is through the acquire and aacquire methods depending
8/// on whether running in a sync or async context.
9///
10/// Implementations are free to add a timeout parameter to their initialize method
11/// to allow users to specify a timeout for acquiring the necessary tokens when
12/// using a blocking call.
13///
14/// Current limitations:
15///
16/// - Rate limiting information is not surfaced in tracing or callbacks. This means
17/// that the total time it takes to invoke a chat model will encompass both
18/// the time spent waiting for tokens and the time spent making the request.
19#[async_trait]
20pub trait BaseRateLimiter: Send + Sync {
21 /// Attempt to acquire the necessary tokens for the rate limiter.
22 ///
23 /// This method blocks until the required tokens are available if `blocking`
24 /// is set to `true`.
25 ///
26 /// If `blocking` is set to `false`, the method will immediately return the result
27 /// of the attempt to acquire the tokens.
28 ///
29 /// # Arguments
30 ///
31 /// * `blocking` - If `true`, the method will block until the tokens are available.
32 /// If `false`, the method will return immediately with the result of
33 /// the attempt.
34 ///
35 /// # Returns
36 ///
37 /// `true` if the tokens were successfully acquired, `false` otherwise.
38 fn acquire(&self, blocking: bool) -> bool;
39
40 /// Attempt to acquire the necessary tokens for the rate limiter. Async version.
41 ///
42 /// This method blocks until the required tokens are available if `blocking`
43 /// is set to `true`.
44 ///
45 /// If `blocking` is set to `false`, the method will immediately return the result
46 /// of the attempt to acquire the tokens.
47 ///
48 /// # Arguments
49 ///
50 /// * `blocking` - If `true`, the method will block until the tokens are available.
51 /// If `false`, the method will return immediately with the result of
52 /// the attempt.
53 ///
54 /// # Returns
55 ///
56 /// `true` if the tokens were successfully acquired, `false` otherwise.
57 async fn aacquire(&self, blocking: bool) -> bool;
58}
59
60/// Configuration for InMemoryRateLimiter.
61#[derive(Debug, Clone)]
62pub struct InMemoryRateLimiterConfig {
63 /// The number of tokens to add per second to the bucket.
64 /// The tokens represent "credit" that can be used to make requests.
65 pub requests_per_second: f64,
66 /// Check whether the tokens are available every this many seconds.
67 /// Can be a float to represent fractions of a second.
68 pub check_every_n_seconds: f64,
69 /// The maximum number of tokens that can be in the bucket.
70 /// Must be at least 1. Used to prevent bursts of requests.
71 pub max_bucket_size: f64,
72}
73
74impl Default for InMemoryRateLimiterConfig {
75 fn default() -> Self {
76 Self {
77 requests_per_second: 1.0,
78 check_every_n_seconds: 0.1,
79 max_bucket_size: 1.0,
80 }
81 }
82}
83
84struct InMemoryRateLimiterState {
85 available_tokens: f64,
86 last: Option<Instant>,
87}
88
89/// An in memory rate limiter based on a token bucket algorithm.
90///
91/// This is an in memory rate limiter, so it cannot rate limit across
92/// different processes.
93///
94/// The rate limiter only allows time-based rate limiting and does not
95/// take into account any information about the input or the output, so it
96/// cannot be used to rate limit based on the size of the request.
97///
98/// It is thread safe and can be used in either a sync or async context.
99///
100/// The in memory rate limiter is based on a token bucket. The bucket is filled
101/// with tokens at a given rate. Each request consumes a token. If there are
102/// not enough tokens in the bucket, the request is blocked until there are
103/// enough tokens.
104///
105/// These tokens have nothing to do with LLM tokens. They are just
106/// a way to keep track of how many requests can be made at a given time.
107///
108/// Current limitations:
109///
110/// - The rate limiter is not designed to work across different processes. It is
111/// an in-memory rate limiter, but it is thread safe.
112/// - The rate limiter only supports time-based rate limiting. It does not take
113/// into account the size of the request or any other factors.
114///
115/// # Example
116///
117/// ```rust,ignore
118/// use agent_chain_core::rate_limiters::{InMemoryRateLimiter, InMemoryRateLimiterConfig, BaseRateLimiter};
119///
120/// let rate_limiter = InMemoryRateLimiter::new(InMemoryRateLimiterConfig {
121/// requests_per_second: 0.1, // Can only make a request once every 10 seconds
122/// check_every_n_seconds: 0.1, // Wake up every 100 ms to check whether allowed to make a request
123/// max_bucket_size: 10.0, // Controls the maximum burst size
124/// });
125///
126/// // In sync context
127/// rate_limiter.acquire(true);
128///
129/// // In async context
130/// rate_limiter.aacquire(true).await;
131/// ```
132pub struct InMemoryRateLimiter {
133 requests_per_second: f64,
134 max_bucket_size: f64,
135 check_every_n_seconds: f64,
136 state: Mutex<InMemoryRateLimiterState>,
137}
138
139impl InMemoryRateLimiter {
140 /// Create a new InMemoryRateLimiter with the given configuration.
141 ///
142 /// These tokens have nothing to do with LLM tokens. They are just
143 /// a way to keep track of how many requests can be made at a given time.
144 ///
145 /// This rate limiter is designed to work in a threaded environment.
146 ///
147 /// It works by filling up a bucket with tokens at a given rate. Each
148 /// request consumes a given number of tokens. If there are not enough
149 /// tokens in the bucket, the request is blocked until there are enough
150 /// tokens.
151 pub fn new(config: InMemoryRateLimiterConfig) -> Self {
152 Self {
153 requests_per_second: config.requests_per_second,
154 max_bucket_size: config.max_bucket_size,
155 check_every_n_seconds: config.check_every_n_seconds,
156 state: Mutex::new(InMemoryRateLimiterState {
157 available_tokens: 0.0,
158 last: None,
159 }),
160 }
161 }
162
163 /// Try to consume a token.
164 ///
165 /// Returns `true` if the tokens were consumed and the caller can proceed to
166 /// make the request. Returns `false` if the tokens were not consumed and
167 /// the caller should try again later.
168 fn consume(&self) -> bool {
169 let mut state = self.state.lock().expect("lock poisoned");
170 let now = Instant::now();
171
172 if let Some(last) = state.last {
173 let elapsed = now.duration_since(last).as_secs_f64();
174
175 if elapsed * self.requests_per_second >= 1.0 {
176 state.available_tokens += elapsed * self.requests_per_second;
177 state.last = Some(now);
178 }
179 } else {
180 state.last = Some(now);
181 }
182
183 state.available_tokens = state.available_tokens.min(self.max_bucket_size);
184
185 if state.available_tokens >= 1.0 {
186 state.available_tokens -= 1.0;
187 return true;
188 }
189
190 false
191 }
192}
193
194#[async_trait]
195impl BaseRateLimiter for InMemoryRateLimiter {
196 /// Attempt to acquire a token from the rate limiter.
197 ///
198 /// This method blocks until the required tokens are available if `blocking`
199 /// is set to `true`.
200 ///
201 /// If `blocking` is set to `false`, the method will immediately return the result
202 /// of the attempt to acquire the tokens.
203 ///
204 /// # Arguments
205 ///
206 /// * `blocking` - If `true`, the method will block until the tokens are available.
207 /// If `false`, the method will return immediately with the result of
208 /// the attempt.
209 ///
210 /// # Returns
211 ///
212 /// `true` if the tokens were successfully acquired, `false` otherwise.
213 fn acquire(&self, blocking: bool) -> bool {
214 if !blocking {
215 return self.consume();
216 }
217
218 while !self.consume() {
219 std::thread::sleep(std::time::Duration::from_secs_f64(
220 self.check_every_n_seconds,
221 ));
222 }
223 true
224 }
225
226 /// Attempt to acquire a token from the rate limiter. Async version.
227 ///
228 /// This method blocks until the required tokens are available if `blocking`
229 /// is set to `true`.
230 ///
231 /// If `blocking` is set to `false`, the method will immediately return the result
232 /// of the attempt to acquire the tokens.
233 ///
234 /// # Arguments
235 ///
236 /// * `blocking` - If `true`, the method will block until the tokens are available.
237 /// If `false`, the method will return immediately with the result of
238 /// the attempt.
239 ///
240 /// # Returns
241 ///
242 /// `true` if the tokens were successfully acquired, `false` otherwise.
243 async fn aacquire(&self, blocking: bool) -> bool {
244 if !blocking {
245 return self.consume();
246 }
247
248 while !self.consume() {
249 tokio::time::sleep(std::time::Duration::from_secs_f64(
250 self.check_every_n_seconds,
251 ))
252 .await;
253 }
254 true
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261
262 #[test]
263 fn test_rate_limiter_non_blocking() {
264 let rate_limiter = InMemoryRateLimiter::new(InMemoryRateLimiterConfig {
265 requests_per_second: 10.0,
266 check_every_n_seconds: 0.01,
267 max_bucket_size: 1.0,
268 });
269
270 let result = rate_limiter.acquire(false);
271 assert!(!result);
272 }
273
274 #[test]
275 fn test_rate_limiter_blocking() {
276 let rate_limiter = InMemoryRateLimiter::new(InMemoryRateLimiterConfig {
277 requests_per_second: 100.0,
278 check_every_n_seconds: 0.001,
279 max_bucket_size: 1.0,
280 });
281
282 let start = Instant::now();
283 let result = rate_limiter.acquire(true);
284 let elapsed = start.elapsed();
285
286 assert!(result);
287 assert!(elapsed.as_millis() < 100);
288 }
289
290 #[tokio::test]
291 async fn test_rate_limiter_async_non_blocking() {
292 let rate_limiter = InMemoryRateLimiter::new(InMemoryRateLimiterConfig {
293 requests_per_second: 10.0,
294 check_every_n_seconds: 0.01,
295 max_bucket_size: 1.0,
296 });
297
298 let result = rate_limiter.aacquire(false).await;
299 assert!(!result);
300 }
301
302 #[tokio::test]
303 async fn test_rate_limiter_async_blocking() {
304 let rate_limiter = InMemoryRateLimiter::new(InMemoryRateLimiterConfig {
305 requests_per_second: 100.0,
306 check_every_n_seconds: 0.001,
307 max_bucket_size: 1.0,
308 });
309
310 let start = Instant::now();
311 let result = rate_limiter.aacquire(true).await;
312 let elapsed = start.elapsed();
313
314 assert!(result);
315 assert!(elapsed.as_millis() < 100);
316 }
317
318 #[test]
319 fn test_rate_limiter_burst() {
320 let rate_limiter = InMemoryRateLimiter::new(InMemoryRateLimiterConfig {
321 requests_per_second: 1000.0,
322 check_every_n_seconds: 0.001,
323 max_bucket_size: 5.0,
324 });
325
326 std::thread::sleep(std::time::Duration::from_millis(10));
327
328 let mut successes = 0;
329 for _ in 0..10 {
330 if rate_limiter.acquire(false) {
331 successes += 1;
332 }
333 }
334
335 assert!(successes <= 5);
336 }
337
338 #[test]
339 fn test_default_config() {
340 let config = InMemoryRateLimiterConfig::default();
341 assert!((config.requests_per_second - 1.0).abs() < f64::EPSILON);
342 assert!((config.check_every_n_seconds - 0.1).abs() < f64::EPSILON);
343 assert!((config.max_bucket_size - 1.0).abs() < f64::EPSILON);
344 }
345}