ractor/factory/
ratelim.rs

1// Copyright (c) Sean Lawlor
2//
3// This source code is licensed under both the MIT license found in the
4// LICENSE-MIT file in the root directory of this source tree.
5
6//! Rate limiting protocols for factory routers
7
8use std::collections::HashMap;
9
10use crate::concurrency::Duration;
11use crate::concurrency::Instant;
12use crate::factory::routing::RouteResult;
13use crate::factory::routing::Router;
14use crate::factory::Job;
15use crate::factory::JobKey;
16use crate::factory::WorkerId;
17use crate::factory::WorkerProperties;
18use crate::ActorProcessingErr;
19use crate::Message;
20use crate::State;
21
22/// The maximum supported balance for leaky bucket rate limiting.
23pub const MAX_LB_BALANCE: usize = isize::MAX as usize;
24
25/// A basic trait which allows controlling rate limiting of message routing
26pub trait RateLimiter: State {
27    /// Check if we have not violated the rate limiter
28    ///
29    /// Returns [false] if we're in violation and should start rate-limiting traffic
30    /// [true] otherwise
31    fn check(&mut self) -> bool;
32
33    /// Bump the rate limit internal counter, as we've routed a message
34    /// to a worker
35    fn bump(&mut self);
36}
37
38/// A generic struct which wraps the message router and adds support for a rate-limiting implementation to rate limit
39/// jobs processed by the factory. This handles the plubming around wrapping a rate limited message router
40#[derive(Debug, bon::Builder)]
41pub struct RateLimitedRouter<TRouter, TRateLimit> {
42    /// The underlying message router which does NOT implement rate limiting
43    pub router: TRouter,
44    /// The rate limiter to apply to the message routing
45    pub rate_limiter: TRateLimit,
46}
47
48impl<TKey, TMsg, TRouter, TRateLimit> Router<TKey, TMsg> for RateLimitedRouter<TRouter, TRateLimit>
49where
50    TKey: JobKey,
51    TMsg: Message,
52    TRouter: Router<TKey, TMsg>,
53    TRateLimit: RateLimiter,
54{
55    fn route_message(
56        &mut self,
57        job: Job<TKey, TMsg>,
58        pool_size: usize,
59        worker_hint: Option<WorkerId>,
60        worker_pool: &mut HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
61    ) -> Result<RouteResult<TKey, TMsg>, ActorProcessingErr> {
62        if !self.rate_limiter.check() {
63            Ok(RouteResult::RateLimited(job))
64        } else {
65            let result = self
66                .router
67                .route_message(job, pool_size, worker_hint, worker_pool);
68            if matches!(result, Ok(RouteResult::Handled)) {
69                // only bump the internal state if we successfully routed a message
70                self.rate_limiter.bump();
71            }
72            result
73        }
74    }
75
76    fn choose_target_worker(
77        &mut self,
78        job: &Job<TKey, TMsg>,
79        pool_size: usize,
80        worker_hint: Option<WorkerId>,
81        worker_pool: &HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
82    ) -> Option<WorkerId> {
83        self.router
84            .choose_target_worker(job, pool_size, worker_hint, worker_pool)
85    }
86
87    fn is_factory_queueing(&self) -> bool {
88        self.router.is_factory_queueing()
89    }
90}
91
92/// A basic leaky-bucket rate limiter. This is a synchronous implementation
93/// with no interior locking since it's only used by the [RateLimitedRouter]
94/// uniquely and doesn't share its state
95#[derive(Debug)]
96pub struct LeakyBucketRateLimiter {
97    /// Tokens to add every `per` duration.
98    pub refill: usize,
99    /// Interval in milliseconds to add tokens.
100    pub interval: Duration,
101    /// Max number of tokens associated with the rate limiter.
102    pub max: usize,
103    /// The "balance" of the rate limiter, i.e. the number of tokens still available
104    pub balance: usize,
105    /// The deadline to perform another refill
106    deadline: Instant,
107}
108
109#[bon::bon]
110impl LeakyBucketRateLimiter {
111    /// Create a new [LeakyBucketRateLimiter] instance
112    ///
113    /// * `refill` - Tokens to add every `per` duration.
114    /// * `interval` - Interval to add tokens.
115    /// * `max` - The maximum number of tokens associated with the rate limiter. Default = [MAX_LB_BALANCE]
116    /// * `initial` - The initial starting balance. If [None] will be = to max
117    ///
118    /// Returns a new [LeakyBucketRateLimiter] instance
119    #[builder]
120    pub fn new(
121        refill: usize,
122        interval: Duration,
123        #[builder(default = MAX_LB_BALANCE)] max: usize,
124        initial: Option<usize>,
125    ) -> LeakyBucketRateLimiter {
126        LeakyBucketRateLimiter {
127            refill,
128            interval,
129            max,
130            balance: initial.unwrap_or(max),
131            deadline: Instant::now() + interval,
132        }
133    }
134
135    fn refresh(&mut self, now: Instant) {
136        if now < self.deadline {
137            return;
138        }
139
140        // Time elapsed in milliseconds since the last deadline.
141        let millis = self.interval.as_millis();
142        let since = now.saturating_duration_since(self.deadline).as_millis();
143
144        let periods = usize::try_from(since / millis + 1).unwrap_or(usize::MAX);
145
146        let tokens = periods
147            .checked_mul(self.refill)
148            .unwrap_or(MAX_LB_BALANCE)
149            .min(MAX_LB_BALANCE);
150
151        let remaining_millis_until_next_deadline =
152            u64::try_from(since % millis).unwrap_or(u64::MAX);
153        self.deadline = now
154            + self
155                .interval
156                .saturating_sub(Duration::from_millis(remaining_millis_until_next_deadline));
157        self.balance = (self.balance + tokens).min(self.max);
158    }
159}
160
161impl RateLimiter for LeakyBucketRateLimiter {
162    fn check(&mut self) -> bool {
163        let now = Instant::now();
164        self.refresh(now);
165        self.balance > 0
166    }
167
168    fn bump(&mut self) {
169        if self.balance > 0 {
170            self.balance -= 1;
171        }
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178    use crate::concurrency::sleep;
179
180    #[crate::concurrency::test]
181    async fn test_basic_leaky_bucket() {
182        let mut limiter = LeakyBucketRateLimiter::builder()
183            .refill(1)
184            .initial(1)
185            .interval(Duration::from_millis(100))
186            .build();
187
188        assert!(limiter.check());
189        limiter.bump();
190        assert!(!limiter.check());
191
192        sleep(limiter.interval * 2).await;
193
194        assert!(limiter.check());
195        limiter.bump();
196        assert!(limiter.check());
197        limiter.bump();
198        assert!(!limiter.check());
199    }
200
201    #[crate::concurrency::test]
202    async fn test_leaky_bucket_max() {
203        let mut limiter = LeakyBucketRateLimiter::builder()
204            .refill(1)
205            .initial(1)
206            .max(1)
207            .interval(Duration::from_millis(100))
208            .build();
209
210        assert!(limiter.check());
211        limiter.bump();
212        assert!(!limiter.check());
213
214        sleep(limiter.interval * 2).await;
215
216        assert!(limiter.check());
217        limiter.bump();
218        assert!(!limiter.check());
219    }
220}