mecha10_core/rate_limit.rs
1//! Rate Limiting
2//!
3//! Per-topic and per-node rate limiting to prevent message flooding and ensure QoS.
4//!
5//! # Features
6//!
7//! - Token bucket algorithm for smooth rate limiting
8//! - Per-topic limits
9//! - Per-node limits
10//! - Burst capacity
11//! - Configurable policies
12//! - Metrics integration
13//! - Low overhead with async
14//!
15//! # Example
16//!
17//! ```rust
18//! use mecha10::prelude::*;
19//! use mecha10::rate_limit::{RateLimiter, RateLimit};
20//!
21//! # async fn example() -> Result<()> {
22//! // Create rate limiter: 10 messages/sec with burst of 20
23//! let limiter = RateLimiter::new(10.0, 20);
24//!
25//! // Check if message is allowed
26//! if limiter.check_and_consume().await {
27//! // Allowed - publish message
28//! println!("Message allowed");
29//! } else {
30//! // Rate limited - drop or queue
31//! println!("Rate limited!");
32//! }
33//! # Ok(())
34//! # }
35//! ```
36
37use std::collections::HashMap;
38use std::sync::Arc;
39use std::time::{Duration, Instant};
40use tokio::sync::RwLock;
41
42// ============================================================================
43// Token Bucket Rate Limiter
44// ============================================================================
45
46/// Token bucket rate limiter
47///
48/// Implements the token bucket algorithm for smooth rate limiting with burst support.
49///
50/// # Algorithm
51///
52/// - Tokens are added at a constant rate (refill_rate)
53/// - Each message consumes one token
54/// - Bucket has maximum capacity (burst capacity)
55/// - If no tokens available, request is denied
56///
57/// # Example
58///
59/// ```rust
60/// use mecha10::rate_limit::RateLimiter;
61///
62/// # async fn example() {
63/// // 10 msg/sec, burst of 20
64/// let limiter = RateLimiter::new(10.0, 20);
65///
66/// // Check if allowed
67/// if limiter.check_and_consume().await {
68/// println!("Allowed");
69/// }
70/// # }
71/// ```
72pub struct RateLimiter {
73 /// Current number of tokens
74 tokens: Arc<RwLock<f64>>,
75 /// Maximum tokens (burst capacity)
76 capacity: f64,
77 /// Tokens added per second
78 refill_rate: f64,
79 /// Last refill time
80 last_refill: Arc<RwLock<Instant>>,
81}
82
83impl RateLimiter {
84 /// Create a new rate limiter
85 ///
86 /// # Arguments
87 ///
88 /// * `rate` - Messages per second
89 /// * `burst` - Maximum burst size
90 ///
91 /// # Example
92 ///
93 /// ```rust
94 /// use mecha10::rate_limit::RateLimiter;
95 ///
96 /// // 100 msg/sec with burst of 200
97 /// let limiter = RateLimiter::new(100.0, 200);
98 /// ```
99 pub fn new(rate: f64, burst: usize) -> Self {
100 Self {
101 tokens: Arc::new(RwLock::new(burst as f64)),
102 capacity: burst as f64,
103 refill_rate: rate,
104 last_refill: Arc::new(RwLock::new(Instant::now())),
105 }
106 }
107
108 /// Check if request is allowed (does not consume token)
109 pub async fn check(&self) -> bool {
110 self.refill().await;
111 let tokens = self.tokens.read().await;
112 *tokens >= 1.0
113 }
114
115 /// Check and consume a token if available
116 pub async fn check_and_consume(&self) -> bool {
117 self.refill().await;
118
119 let mut tokens = self.tokens.write().await;
120 if *tokens >= 1.0 {
121 *tokens -= 1.0;
122 true
123 } else {
124 false
125 }
126 }
127
128 /// Try to consume multiple tokens
129 pub async fn try_consume(&self, count: f64) -> bool {
130 self.refill().await;
131
132 let mut tokens = self.tokens.write().await;
133 if *tokens >= count {
134 *tokens -= count;
135 true
136 } else {
137 false
138 }
139 }
140
141 /// Wait until a token is available (with timeout)
142 ///
143 /// Returns true if token was acquired, false if timed out
144 pub async fn wait_for_token(&self, timeout: Duration) -> bool {
145 let start = Instant::now();
146
147 while start.elapsed() < timeout {
148 if self.check_and_consume().await {
149 return true;
150 }
151
152 // Sleep for a short time before retrying
153 tokio::time::sleep(Duration::from_millis(10)).await;
154 }
155
156 false
157 }
158
159 /// Get current token count
160 pub async fn available_tokens(&self) -> f64 {
161 self.refill().await;
162 *self.tokens.read().await
163 }
164
165 /// Refill tokens based on elapsed time
166 async fn refill(&self) {
167 let now = Instant::now();
168 let mut last_refill = self.last_refill.write().await;
169 let elapsed = now.duration_since(*last_refill).as_secs_f64();
170
171 if elapsed > 0.0 {
172 let tokens_to_add = elapsed * self.refill_rate;
173 let mut tokens = self.tokens.write().await;
174 *tokens = (*tokens + tokens_to_add).min(self.capacity);
175 *last_refill = now;
176 }
177 }
178
179 /// Reset the rate limiter (fill bucket)
180 pub async fn reset(&self) {
181 let mut tokens = self.tokens.write().await;
182 *tokens = self.capacity;
183 let mut last_refill = self.last_refill.write().await;
184 *last_refill = Instant::now();
185 }
186}
187
188// ============================================================================
189// Rate Limit Policy
190// ============================================================================
191
192/// Rate limit policy configuration
193#[derive(Debug, Clone)]
194pub struct RateLimit {
195 /// Messages per second
196 pub rate: f64,
197 /// Burst capacity
198 pub burst: usize,
199}
200
201impl RateLimit {
202 /// Create a new rate limit policy
203 pub fn new(rate: f64, burst: usize) -> Self {
204 Self { rate, burst }
205 }
206
207 /// Create rate limit from messages per second (burst = rate * 2)
208 pub fn per_second(rate: f64) -> Self {
209 Self {
210 rate,
211 burst: (rate * 2.0) as usize,
212 }
213 }
214
215 /// Create rate limit from messages per minute
216 pub fn per_minute(rate: f64) -> Self {
217 let per_sec = rate / 60.0;
218 Self {
219 rate: per_sec,
220 burst: (per_sec * 2.0) as usize,
221 }
222 }
223
224 /// No limit
225 pub fn unlimited() -> Self {
226 Self {
227 rate: f64::MAX,
228 burst: usize::MAX,
229 }
230 }
231}
232
233// ============================================================================
234// Multi-Key Rate Limiter
235// ============================================================================
236
237/// Rate limiter that tracks limits per key (topic, node, etc.)
238///
239/// # Example
240///
241/// ```rust
242/// use mecha10::rate_limit::{MultiKeyRateLimiter, RateLimit};
243///
244/// # async fn example() {
245/// let limiter = MultiKeyRateLimiter::new();
246///
247/// // Set limits per topic
248/// limiter.set_limit("/camera/rgb", RateLimit::per_second(30.0)).await;
249/// limiter.set_limit("/camera/depth", RateLimit::per_second(30.0)).await;
250///
251/// // Check limits
252/// if limiter.check_and_consume("/camera/rgb").await {
253/// println!("Allowed");
254/// }
255/// # }
256/// ```
257pub struct MultiKeyRateLimiter {
258 limiters: Arc<RwLock<HashMap<String, RateLimiter>>>,
259 default_limit: RateLimit,
260}
261
262impl MultiKeyRateLimiter {
263 /// Create a new multi-key rate limiter with default unlimited
264 pub fn new() -> Self {
265 Self {
266 limiters: Arc::new(RwLock::new(HashMap::new())),
267 default_limit: RateLimit::unlimited(),
268 }
269 }
270
271 /// Create with a default limit for all keys
272 pub fn with_default(default: RateLimit) -> Self {
273 Self {
274 limiters: Arc::new(RwLock::new(HashMap::new())),
275 default_limit: default,
276 }
277 }
278
279 /// Set rate limit for a specific key
280 pub async fn set_limit(&self, key: impl Into<String>, limit: RateLimit) {
281 let key = key.into();
282 let limiter = RateLimiter::new(limit.rate, limit.burst);
283
284 let mut limiters = self.limiters.write().await;
285 limiters.insert(key, limiter);
286 }
287
288 /// Remove rate limit for a key
289 pub async fn remove_limit(&self, key: &str) {
290 let mut limiters = self.limiters.write().await;
291 limiters.remove(key);
292 }
293
294 /// Check if request for key is allowed
295 pub async fn check(&self, key: &str) -> bool {
296 let limiters = self.limiters.read().await;
297
298 if let Some(limiter) = limiters.get(key) {
299 limiter.check().await
300 } else if self.default_limit.rate == f64::MAX {
301 true
302 } else {
303 // Create limiter for this key on first use
304 drop(limiters);
305 self.set_limit(key, self.default_limit.clone()).await;
306 true
307 }
308 }
309
310 /// Check and consume token for key
311 pub async fn check_and_consume(&self, key: &str) -> bool {
312 // Get or create limiter
313 let should_create = {
314 let limiters = self.limiters.read().await;
315 !limiters.contains_key(key) && self.default_limit.rate != f64::MAX
316 };
317
318 if should_create {
319 self.set_limit(key, self.default_limit.clone()).await;
320 }
321
322 let limiters = self.limiters.read().await;
323
324 if let Some(limiter) = limiters.get(key) {
325 limiter.check_and_consume().await
326 } else {
327 // Unlimited
328 true
329 }
330 }
331
332 /// Get available tokens for a key
333 pub async fn available_tokens(&self, key: &str) -> Option<f64> {
334 let limiters = self.limiters.read().await;
335 if let Some(limiter) = limiters.get(key) {
336 Some(limiter.available_tokens().await)
337 } else {
338 None
339 }
340 }
341
342 /// Get all configured limits
343 pub async fn get_limits(&self) -> HashMap<String, RateLimit> {
344 let limiters = self.limiters.read().await;
345 let mut limits = HashMap::new();
346
347 for (key, limiter) in limiters.iter() {
348 limits.insert(
349 key.clone(),
350 RateLimit {
351 rate: limiter.refill_rate,
352 burst: limiter.capacity as usize,
353 },
354 );
355 }
356
357 limits
358 }
359
360 /// Clear all limiters
361 pub async fn clear(&self) {
362 let mut limiters = self.limiters.write().await;
363 limiters.clear();
364 }
365}
366
367impl Default for MultiKeyRateLimiter {
368 fn default() -> Self {
369 Self::new()
370 }
371}
372
373// ============================================================================
374// Rate Limiter for Context Integration
375// ============================================================================
376
377/// Global rate limiter manager for the system
378pub struct RateLimiterManager {
379 /// Per-topic rate limiters
380 topic_limiters: Arc<MultiKeyRateLimiter>,
381 /// Per-node rate limiters
382 node_limiters: Arc<MultiKeyRateLimiter>,
383}
384
385impl RateLimiterManager {
386 /// Create a new rate limiter manager
387 pub fn new() -> Self {
388 Self {
389 topic_limiters: Arc::new(MultiKeyRateLimiter::new()),
390 node_limiters: Arc::new(MultiKeyRateLimiter::new()),
391 }
392 }
393
394 /// Set rate limit for a topic
395 pub async fn set_topic_limit(&self, topic: impl Into<String>, limit: RateLimit) {
396 self.topic_limiters.set_limit(topic, limit).await;
397 }
398
399 /// Set rate limit for a node
400 pub async fn set_node_limit(&self, node_id: impl Into<String>, limit: RateLimit) {
401 self.node_limiters.set_limit(node_id, limit).await;
402 }
403
404 /// Check if publishing to topic is allowed
405 pub async fn check_topic(&self, topic: &str) -> bool {
406 self.topic_limiters.check_and_consume(topic).await
407 }
408
409 /// Check if node can publish
410 pub async fn check_node(&self, node_id: &str) -> bool {
411 self.node_limiters.check_and_consume(node_id).await
412 }
413
414 /// Check both topic and node limits
415 pub async fn check_publish(&self, node_id: &str, topic: &str) -> bool {
416 self.check_node(node_id).await && self.check_topic(topic).await
417 }
418
419 /// Get topic limits
420 pub async fn topic_limits(&self) -> HashMap<String, RateLimit> {
421 self.topic_limiters.get_limits().await
422 }
423
424 /// Get node limits
425 pub async fn node_limits(&self) -> HashMap<String, RateLimit> {
426 self.node_limiters.get_limits().await
427 }
428}
429
430impl Default for RateLimiterManager {
431 fn default() -> Self {
432 Self::new()
433 }
434}