mecha10_core/
rate_limit.rs

1//! Rate Limiting
2//!
3//! Per-topic and per-node rate limiting to prevent message flooding and ensure QoS.
4//!
5//! # Features
6//!
7//! - Token bucket algorithm for smooth rate limiting
8//! - Per-topic limits
9//! - Per-node limits
10//! - Burst capacity
11//! - Configurable policies
12//! - Metrics integration
13//! - Low overhead with async
14//!
15//! # Example
16//!
17//! ```rust
18//! use mecha10::prelude::*;
19//! use mecha10::rate_limit::{RateLimiter, RateLimit};
20//!
21//! # async fn example() -> Result<()> {
22//! // Create rate limiter: 10 messages/sec with burst of 20
23//! let limiter = RateLimiter::new(10.0, 20);
24//!
25//! // Check if message is allowed
26//! if limiter.check_and_consume().await {
27//!     // Allowed - publish message
28//!     println!("Message allowed");
29//! } else {
30//!     // Rate limited - drop or queue
31//!     println!("Rate limited!");
32//! }
33//! # Ok(())
34//! # }
35//! ```
36
37use std::collections::HashMap;
38use std::sync::Arc;
39use std::time::{Duration, Instant};
40use tokio::sync::RwLock;
41
42// ============================================================================
43// Token Bucket Rate Limiter
44// ============================================================================
45
46/// Token bucket rate limiter
47///
48/// Implements the token bucket algorithm for smooth rate limiting with burst support.
49///
50/// # Algorithm
51///
52/// - Tokens are added at a constant rate (refill_rate)
53/// - Each message consumes one token
54/// - Bucket has maximum capacity (burst capacity)
55/// - If no tokens available, request is denied
56///
57/// # Example
58///
59/// ```rust
60/// use mecha10::rate_limit::RateLimiter;
61///
62/// # async fn example() {
63/// // 10 msg/sec, burst of 20
64/// let limiter = RateLimiter::new(10.0, 20);
65///
66/// // Check if allowed
67/// if limiter.check_and_consume().await {
68///     println!("Allowed");
69/// }
70/// # }
71/// ```
72pub struct RateLimiter {
73    /// Current number of tokens
74    tokens: Arc<RwLock<f64>>,
75    /// Maximum tokens (burst capacity)
76    capacity: f64,
77    /// Tokens added per second
78    refill_rate: f64,
79    /// Last refill time
80    last_refill: Arc<RwLock<Instant>>,
81}
82
83impl RateLimiter {
84    /// Create a new rate limiter
85    ///
86    /// # Arguments
87    ///
88    /// * `rate` - Messages per second
89    /// * `burst` - Maximum burst size
90    ///
91    /// # Example
92    ///
93    /// ```rust
94    /// use mecha10::rate_limit::RateLimiter;
95    ///
96    /// // 100 msg/sec with burst of 200
97    /// let limiter = RateLimiter::new(100.0, 200);
98    /// ```
99    pub fn new(rate: f64, burst: usize) -> Self {
100        Self {
101            tokens: Arc::new(RwLock::new(burst as f64)),
102            capacity: burst as f64,
103            refill_rate: rate,
104            last_refill: Arc::new(RwLock::new(Instant::now())),
105        }
106    }
107
108    /// Check if request is allowed (does not consume token)
109    pub async fn check(&self) -> bool {
110        self.refill().await;
111        let tokens = self.tokens.read().await;
112        *tokens >= 1.0
113    }
114
115    /// Check and consume a token if available
116    pub async fn check_and_consume(&self) -> bool {
117        self.refill().await;
118
119        let mut tokens = self.tokens.write().await;
120        if *tokens >= 1.0 {
121            *tokens -= 1.0;
122            true
123        } else {
124            false
125        }
126    }
127
128    /// Try to consume multiple tokens
129    pub async fn try_consume(&self, count: f64) -> bool {
130        self.refill().await;
131
132        let mut tokens = self.tokens.write().await;
133        if *tokens >= count {
134            *tokens -= count;
135            true
136        } else {
137            false
138        }
139    }
140
141    /// Wait until a token is available (with timeout)
142    ///
143    /// Returns true if token was acquired, false if timed out
144    pub async fn wait_for_token(&self, timeout: Duration) -> bool {
145        let start = Instant::now();
146
147        while start.elapsed() < timeout {
148            if self.check_and_consume().await {
149                return true;
150            }
151
152            // Sleep for a short time before retrying
153            tokio::time::sleep(Duration::from_millis(10)).await;
154        }
155
156        false
157    }
158
159    /// Get current token count
160    pub async fn available_tokens(&self) -> f64 {
161        self.refill().await;
162        *self.tokens.read().await
163    }
164
165    /// Refill tokens based on elapsed time
166    async fn refill(&self) {
167        let now = Instant::now();
168        let mut last_refill = self.last_refill.write().await;
169        let elapsed = now.duration_since(*last_refill).as_secs_f64();
170
171        if elapsed > 0.0 {
172            let tokens_to_add = elapsed * self.refill_rate;
173            let mut tokens = self.tokens.write().await;
174            *tokens = (*tokens + tokens_to_add).min(self.capacity);
175            *last_refill = now;
176        }
177    }
178
179    /// Reset the rate limiter (fill bucket)
180    pub async fn reset(&self) {
181        let mut tokens = self.tokens.write().await;
182        *tokens = self.capacity;
183        let mut last_refill = self.last_refill.write().await;
184        *last_refill = Instant::now();
185    }
186}
187
188// ============================================================================
189// Rate Limit Policy
190// ============================================================================
191
192/// Rate limit policy configuration
193#[derive(Debug, Clone)]
194pub struct RateLimit {
195    /// Messages per second
196    pub rate: f64,
197    /// Burst capacity
198    pub burst: usize,
199}
200
201impl RateLimit {
202    /// Create a new rate limit policy
203    pub fn new(rate: f64, burst: usize) -> Self {
204        Self { rate, burst }
205    }
206
207    /// Create rate limit from messages per second (burst = rate * 2)
208    pub fn per_second(rate: f64) -> Self {
209        Self {
210            rate,
211            burst: (rate * 2.0) as usize,
212        }
213    }
214
215    /// Create rate limit from messages per minute
216    pub fn per_minute(rate: f64) -> Self {
217        let per_sec = rate / 60.0;
218        Self {
219            rate: per_sec,
220            burst: (per_sec * 2.0) as usize,
221        }
222    }
223
224    /// No limit
225    pub fn unlimited() -> Self {
226        Self {
227            rate: f64::MAX,
228            burst: usize::MAX,
229        }
230    }
231}
232
233// ============================================================================
234// Multi-Key Rate Limiter
235// ============================================================================
236
237/// Rate limiter that tracks limits per key (topic, node, etc.)
238///
239/// # Example
240///
241/// ```rust
242/// use mecha10::rate_limit::{MultiKeyRateLimiter, RateLimit};
243///
244/// # async fn example() {
245/// let limiter = MultiKeyRateLimiter::new();
246///
247/// // Set limits per topic
248/// limiter.set_limit("/camera/rgb", RateLimit::per_second(30.0)).await;
249/// limiter.set_limit("/camera/depth", RateLimit::per_second(30.0)).await;
250///
251/// // Check limits
252/// if limiter.check_and_consume("/camera/rgb").await {
253///     println!("Allowed");
254/// }
255/// # }
256/// ```
257pub struct MultiKeyRateLimiter {
258    limiters: Arc<RwLock<HashMap<String, RateLimiter>>>,
259    default_limit: RateLimit,
260}
261
262impl MultiKeyRateLimiter {
263    /// Create a new multi-key rate limiter with default unlimited
264    pub fn new() -> Self {
265        Self {
266            limiters: Arc::new(RwLock::new(HashMap::new())),
267            default_limit: RateLimit::unlimited(),
268        }
269    }
270
271    /// Create with a default limit for all keys
272    pub fn with_default(default: RateLimit) -> Self {
273        Self {
274            limiters: Arc::new(RwLock::new(HashMap::new())),
275            default_limit: default,
276        }
277    }
278
279    /// Set rate limit for a specific key
280    pub async fn set_limit(&self, key: impl Into<String>, limit: RateLimit) {
281        let key = key.into();
282        let limiter = RateLimiter::new(limit.rate, limit.burst);
283
284        let mut limiters = self.limiters.write().await;
285        limiters.insert(key, limiter);
286    }
287
288    /// Remove rate limit for a key
289    pub async fn remove_limit(&self, key: &str) {
290        let mut limiters = self.limiters.write().await;
291        limiters.remove(key);
292    }
293
294    /// Check if request for key is allowed
295    pub async fn check(&self, key: &str) -> bool {
296        let limiters = self.limiters.read().await;
297
298        if let Some(limiter) = limiters.get(key) {
299            limiter.check().await
300        } else if self.default_limit.rate == f64::MAX {
301            true
302        } else {
303            // Create limiter for this key on first use
304            drop(limiters);
305            self.set_limit(key, self.default_limit.clone()).await;
306            true
307        }
308    }
309
310    /// Check and consume token for key
311    pub async fn check_and_consume(&self, key: &str) -> bool {
312        // Get or create limiter
313        let should_create = {
314            let limiters = self.limiters.read().await;
315            !limiters.contains_key(key) && self.default_limit.rate != f64::MAX
316        };
317
318        if should_create {
319            self.set_limit(key, self.default_limit.clone()).await;
320        }
321
322        let limiters = self.limiters.read().await;
323
324        if let Some(limiter) = limiters.get(key) {
325            limiter.check_and_consume().await
326        } else {
327            // Unlimited
328            true
329        }
330    }
331
332    /// Get available tokens for a key
333    pub async fn available_tokens(&self, key: &str) -> Option<f64> {
334        let limiters = self.limiters.read().await;
335        if let Some(limiter) = limiters.get(key) {
336            Some(limiter.available_tokens().await)
337        } else {
338            None
339        }
340    }
341
342    /// Get all configured limits
343    pub async fn get_limits(&self) -> HashMap<String, RateLimit> {
344        let limiters = self.limiters.read().await;
345        let mut limits = HashMap::new();
346
347        for (key, limiter) in limiters.iter() {
348            limits.insert(
349                key.clone(),
350                RateLimit {
351                    rate: limiter.refill_rate,
352                    burst: limiter.capacity as usize,
353                },
354            );
355        }
356
357        limits
358    }
359
360    /// Clear all limiters
361    pub async fn clear(&self) {
362        let mut limiters = self.limiters.write().await;
363        limiters.clear();
364    }
365}
366
367impl Default for MultiKeyRateLimiter {
368    fn default() -> Self {
369        Self::new()
370    }
371}
372
373// ============================================================================
374// Rate Limiter for Context Integration
375// ============================================================================
376
377/// Global rate limiter manager for the system
378pub struct RateLimiterManager {
379    /// Per-topic rate limiters
380    topic_limiters: Arc<MultiKeyRateLimiter>,
381    /// Per-node rate limiters
382    node_limiters: Arc<MultiKeyRateLimiter>,
383}
384
385impl RateLimiterManager {
386    /// Create a new rate limiter manager
387    pub fn new() -> Self {
388        Self {
389            topic_limiters: Arc::new(MultiKeyRateLimiter::new()),
390            node_limiters: Arc::new(MultiKeyRateLimiter::new()),
391        }
392    }
393
394    /// Set rate limit for a topic
395    pub async fn set_topic_limit(&self, topic: impl Into<String>, limit: RateLimit) {
396        self.topic_limiters.set_limit(topic, limit).await;
397    }
398
399    /// Set rate limit for a node
400    pub async fn set_node_limit(&self, node_id: impl Into<String>, limit: RateLimit) {
401        self.node_limiters.set_limit(node_id, limit).await;
402    }
403
404    /// Check if publishing to topic is allowed
405    pub async fn check_topic(&self, topic: &str) -> bool {
406        self.topic_limiters.check_and_consume(topic).await
407    }
408
409    /// Check if node can publish
410    pub async fn check_node(&self, node_id: &str) -> bool {
411        self.node_limiters.check_and_consume(node_id).await
412    }
413
414    /// Check both topic and node limits
415    pub async fn check_publish(&self, node_id: &str, topic: &str) -> bool {
416        self.check_node(node_id).await && self.check_topic(topic).await
417    }
418
419    /// Get topic limits
420    pub async fn topic_limits(&self) -> HashMap<String, RateLimit> {
421        self.topic_limiters.get_limits().await
422    }
423
424    /// Get node limits
425    pub async fn node_limits(&self) -> HashMap<String, RateLimit> {
426        self.node_limiters.get_limits().await
427    }
428}
429
430impl Default for RateLimiterManager {
431    fn default() -> Self {
432        Self::new()
433    }
434}