agent_chain_core/
rate_limiters.rs

1use async_trait::async_trait;
2use std::sync::Mutex;
3use std::time::Instant;
4
5/// Base trait for rate limiters.
6///
7/// Usage of the base limiter is through the acquire and aacquire methods depending
8/// on whether running in a sync or async context.
9///
10/// Implementations are free to add a timeout parameter to their initialize method
11/// to allow users to specify a timeout for acquiring the necessary tokens when
12/// using a blocking call.
13///
14/// Current limitations:
15///
16/// - Rate limiting information is not surfaced in tracing or callbacks. This means
17///     that the total time it takes to invoke a chat model will encompass both
18///     the time spent waiting for tokens and the time spent making the request.
19#[async_trait]
20pub trait BaseRateLimiter: Send + Sync {
21    /// Attempt to acquire the necessary tokens for the rate limiter.
22    ///
23    /// This method blocks until the required tokens are available if `blocking`
24    /// is set to `true`.
25    ///
26    /// If `blocking` is set to `false`, the method will immediately return the result
27    /// of the attempt to acquire the tokens.
28    ///
29    /// # Arguments
30    ///
31    /// * `blocking` - If `true`, the method will block until the tokens are available.
32    ///     If `false`, the method will return immediately with the result of
33    ///     the attempt.
34    ///
35    /// # Returns
36    ///
37    /// `true` if the tokens were successfully acquired, `false` otherwise.
38    fn acquire(&self, blocking: bool) -> bool;
39
40    /// Attempt to acquire the necessary tokens for the rate limiter. Async version.
41    ///
42    /// This method blocks until the required tokens are available if `blocking`
43    /// is set to `true`.
44    ///
45    /// If `blocking` is set to `false`, the method will immediately return the result
46    /// of the attempt to acquire the tokens.
47    ///
48    /// # Arguments
49    ///
50    /// * `blocking` - If `true`, the method will block until the tokens are available.
51    ///     If `false`, the method will return immediately with the result of
52    ///     the attempt.
53    ///
54    /// # Returns
55    ///
56    /// `true` if the tokens were successfully acquired, `false` otherwise.
57    async fn aacquire(&self, blocking: bool) -> bool;
58}
59
60/// Configuration for InMemoryRateLimiter.
61#[derive(Debug, Clone)]
62pub struct InMemoryRateLimiterConfig {
63    /// The number of tokens to add per second to the bucket.
64    /// The tokens represent "credit" that can be used to make requests.
65    pub requests_per_second: f64,
66    /// Check whether the tokens are available every this many seconds.
67    /// Can be a float to represent fractions of a second.
68    pub check_every_n_seconds: f64,
69    /// The maximum number of tokens that can be in the bucket.
70    /// Must be at least 1. Used to prevent bursts of requests.
71    pub max_bucket_size: f64,
72}
73
74impl Default for InMemoryRateLimiterConfig {
75    fn default() -> Self {
76        Self {
77            requests_per_second: 1.0,
78            check_every_n_seconds: 0.1,
79            max_bucket_size: 1.0,
80        }
81    }
82}
83
84struct InMemoryRateLimiterState {
85    available_tokens: f64,
86    last: Option<Instant>,
87}
88
89/// An in memory rate limiter based on a token bucket algorithm.
90///
91/// This is an in memory rate limiter, so it cannot rate limit across
92/// different processes.
93///
94/// The rate limiter only allows time-based rate limiting and does not
95/// take into account any information about the input or the output, so it
96/// cannot be used to rate limit based on the size of the request.
97///
98/// It is thread safe and can be used in either a sync or async context.
99///
100/// The in memory rate limiter is based on a token bucket. The bucket is filled
101/// with tokens at a given rate. Each request consumes a token. If there are
102/// not enough tokens in the bucket, the request is blocked until there are
103/// enough tokens.
104///
105/// These tokens have nothing to do with LLM tokens. They are just
106/// a way to keep track of how many requests can be made at a given time.
107///
108/// Current limitations:
109///
110/// - The rate limiter is not designed to work across different processes. It is
111///   an in-memory rate limiter, but it is thread safe.
112/// - The rate limiter only supports time-based rate limiting. It does not take
113///   into account the size of the request or any other factors.
114///
115/// # Example
116///
117/// ```rust,ignore
118/// use agent_chain_core::rate_limiters::{InMemoryRateLimiter, InMemoryRateLimiterConfig, BaseRateLimiter};
119///
120/// let rate_limiter = InMemoryRateLimiter::new(InMemoryRateLimiterConfig {
121///     requests_per_second: 0.1,  // Can only make a request once every 10 seconds
122///     check_every_n_seconds: 0.1,  // Wake up every 100 ms to check whether allowed to make a request
123///     max_bucket_size: 10.0,  // Controls the maximum burst size
124/// });
125///
126/// // In sync context
127/// rate_limiter.acquire(true);
128///
129/// // In async context
130/// rate_limiter.aacquire(true).await;
131/// ```
132pub struct InMemoryRateLimiter {
133    requests_per_second: f64,
134    max_bucket_size: f64,
135    check_every_n_seconds: f64,
136    state: Mutex<InMemoryRateLimiterState>,
137}
138
139impl InMemoryRateLimiter {
140    /// Create a new InMemoryRateLimiter with the given configuration.
141    ///
142    /// These tokens have nothing to do with LLM tokens. They are just
143    /// a way to keep track of how many requests can be made at a given time.
144    ///
145    /// This rate limiter is designed to work in a threaded environment.
146    ///
147    /// It works by filling up a bucket with tokens at a given rate. Each
148    /// request consumes a given number of tokens. If there are not enough
149    /// tokens in the bucket, the request is blocked until there are enough
150    /// tokens.
151    pub fn new(config: InMemoryRateLimiterConfig) -> Self {
152        Self {
153            requests_per_second: config.requests_per_second,
154            max_bucket_size: config.max_bucket_size,
155            check_every_n_seconds: config.check_every_n_seconds,
156            state: Mutex::new(InMemoryRateLimiterState {
157                available_tokens: 0.0,
158                last: None,
159            }),
160        }
161    }
162
163    /// Try to consume a token.
164    ///
165    /// Returns `true` if the tokens were consumed and the caller can proceed to
166    /// make the request. Returns `false` if the tokens were not consumed and
167    /// the caller should try again later.
168    fn consume(&self) -> bool {
169        let mut state = self.state.lock().expect("lock poisoned");
170        let now = Instant::now();
171
172        if let Some(last) = state.last {
173            let elapsed = now.duration_since(last).as_secs_f64();
174
175            if elapsed * self.requests_per_second >= 1.0 {
176                state.available_tokens += elapsed * self.requests_per_second;
177                state.last = Some(now);
178            }
179        } else {
180            state.last = Some(now);
181        }
182
183        state.available_tokens = state.available_tokens.min(self.max_bucket_size);
184
185        if state.available_tokens >= 1.0 {
186            state.available_tokens -= 1.0;
187            return true;
188        }
189
190        false
191    }
192}
193
194#[async_trait]
195impl BaseRateLimiter for InMemoryRateLimiter {
196    /// Attempt to acquire a token from the rate limiter.
197    ///
198    /// This method blocks until the required tokens are available if `blocking`
199    /// is set to `true`.
200    ///
201    /// If `blocking` is set to `false`, the method will immediately return the result
202    /// of the attempt to acquire the tokens.
203    ///
204    /// # Arguments
205    ///
206    /// * `blocking` - If `true`, the method will block until the tokens are available.
207    ///     If `false`, the method will return immediately with the result of
208    ///     the attempt.
209    ///
210    /// # Returns
211    ///
212    /// `true` if the tokens were successfully acquired, `false` otherwise.
213    fn acquire(&self, blocking: bool) -> bool {
214        if !blocking {
215            return self.consume();
216        }
217
218        while !self.consume() {
219            std::thread::sleep(std::time::Duration::from_secs_f64(
220                self.check_every_n_seconds,
221            ));
222        }
223        true
224    }
225
226    /// Attempt to acquire a token from the rate limiter. Async version.
227    ///
228    /// This method blocks until the required tokens are available if `blocking`
229    /// is set to `true`.
230    ///
231    /// If `blocking` is set to `false`, the method will immediately return the result
232    /// of the attempt to acquire the tokens.
233    ///
234    /// # Arguments
235    ///
236    /// * `blocking` - If `true`, the method will block until the tokens are available.
237    ///     If `false`, the method will return immediately with the result of
238    ///     the attempt.
239    ///
240    /// # Returns
241    ///
242    /// `true` if the tokens were successfully acquired, `false` otherwise.
243    async fn aacquire(&self, blocking: bool) -> bool {
244        if !blocking {
245            return self.consume();
246        }
247
248        while !self.consume() {
249            tokio::time::sleep(std::time::Duration::from_secs_f64(
250                self.check_every_n_seconds,
251            ))
252            .await;
253        }
254        true
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261
262    #[test]
263    fn test_rate_limiter_non_blocking() {
264        let rate_limiter = InMemoryRateLimiter::new(InMemoryRateLimiterConfig {
265            requests_per_second: 10.0,
266            check_every_n_seconds: 0.01,
267            max_bucket_size: 1.0,
268        });
269
270        let result = rate_limiter.acquire(false);
271        assert!(!result);
272    }
273
274    #[test]
275    fn test_rate_limiter_blocking() {
276        let rate_limiter = InMemoryRateLimiter::new(InMemoryRateLimiterConfig {
277            requests_per_second: 100.0,
278            check_every_n_seconds: 0.001,
279            max_bucket_size: 1.0,
280        });
281
282        let start = Instant::now();
283        let result = rate_limiter.acquire(true);
284        let elapsed = start.elapsed();
285
286        assert!(result);
287        assert!(elapsed.as_millis() < 100);
288    }
289
290    #[tokio::test]
291    async fn test_rate_limiter_async_non_blocking() {
292        let rate_limiter = InMemoryRateLimiter::new(InMemoryRateLimiterConfig {
293            requests_per_second: 10.0,
294            check_every_n_seconds: 0.01,
295            max_bucket_size: 1.0,
296        });
297
298        let result = rate_limiter.aacquire(false).await;
299        assert!(!result);
300    }
301
302    #[tokio::test]
303    async fn test_rate_limiter_async_blocking() {
304        let rate_limiter = InMemoryRateLimiter::new(InMemoryRateLimiterConfig {
305            requests_per_second: 100.0,
306            check_every_n_seconds: 0.001,
307            max_bucket_size: 1.0,
308        });
309
310        let start = Instant::now();
311        let result = rate_limiter.aacquire(true).await;
312        let elapsed = start.elapsed();
313
314        assert!(result);
315        assert!(elapsed.as_millis() < 100);
316    }
317
318    #[test]
319    fn test_rate_limiter_burst() {
320        let rate_limiter = InMemoryRateLimiter::new(InMemoryRateLimiterConfig {
321            requests_per_second: 1000.0,
322            check_every_n_seconds: 0.001,
323            max_bucket_size: 5.0,
324        });
325
326        std::thread::sleep(std::time::Duration::from_millis(10));
327
328        let mut successes = 0;
329        for _ in 0..10 {
330            if rate_limiter.acquire(false) {
331                successes += 1;
332            }
333        }
334
335        assert!(successes <= 5);
336    }
337
338    #[test]
339    fn test_default_config() {
340        let config = InMemoryRateLimiterConfig::default();
341        assert!((config.requests_per_second - 1.0).abs() < f64::EPSILON);
342        assert!((config.check_every_n_seconds - 0.1).abs() < f64::EPSILON);
343        assert!((config.max_bucket_size - 1.0).abs() < f64::EPSILON);
344    }
345}