1use crate::error::Result;
10use crate::infrastructure::security::rate_limit::RateLimitResult;
11use chrono::{DateTime, Duration, Timelike, Utc};
12use parking_lot::RwLock;
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::sync::Arc;
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct AdaptiveRateLimitConfig {
20 pub enabled: bool,
22
23 pub min_rate_limit: u32,
25
26 pub max_rate_limit: u32,
28
29 pub learning_window_hours: i64,
31
32 pub adjustment_factor: f64,
34
35 pub enable_anomaly_throttling: bool,
37
38 pub enable_load_based_adjustment: bool,
40
41 pub enable_pattern_prediction: bool,
43}
44
45impl Default for AdaptiveRateLimitConfig {
46 fn default() -> Self {
47 Self {
48 enabled: true,
49 min_rate_limit: 10,
50 max_rate_limit: 10_000,
51 learning_window_hours: 24 * 7, adjustment_factor: 0.3,
53 enable_anomaly_throttling: true,
54 enable_load_based_adjustment: true,
55 enable_pattern_prediction: true,
56 }
57 }
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62struct TenantUsageProfile {
63 tenant_id: String,
64
65 hourly_averages: Vec<f64>, daily_averages: Vec<f64>, peak_times: Vec<u32>, avg_requests_per_hour: f64,
72 stddev_requests_per_hour: f64,
73 max_requests_per_hour: f64,
74
75 current_limit: u32,
77 base_limit: u32,
78 adjustment_history: Vec<LimitAdjustment>,
79
80 last_updated: DateTime<Utc>,
82 data_points: usize,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86struct LimitAdjustment {
87 timestamp: DateTime<Utc>,
88 old_limit: u32,
89 new_limit: u32,
90 reason: AdjustmentReason,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94enum AdjustmentReason {
95 NormalLearning,
96 AnomalyDetected,
97 HighLoad,
98 AttackMitigation,
99 PatternPrediction,
100}
101
102#[derive(Debug, Clone)]
104pub struct SystemLoad {
105 pub cpu_usage: f64,
106 pub memory_usage: f64,
107 pub active_connections: usize,
108 pub queue_depth: usize,
109}
110
111pub struct AdaptiveRateLimiter {
113 config: Arc<RwLock<AdaptiveRateLimitConfig>>,
114
115 profiles: Arc<RwLock<HashMap<String, TenantUsageProfile>>>,
117
118 recent_requests: Arc<RwLock<Vec<RequestRecord>>>,
120
121 load_history: Arc<RwLock<Vec<(DateTime<Utc>, SystemLoad)>>>,
123}
124
125#[derive(Debug, Clone)]
126struct RequestRecord {
127 tenant_id: String,
128 timestamp: DateTime<Utc>,
129 allowed: bool,
130 cost: f64,
131}
132
133impl AdaptiveRateLimiter {
134 pub fn new(config: AdaptiveRateLimitConfig) -> Self {
136 Self {
137 config: Arc::new(RwLock::new(config)),
138 profiles: Arc::new(RwLock::new(HashMap::new())),
139 recent_requests: Arc::new(RwLock::new(Vec::new())),
140 load_history: Arc::new(RwLock::new(Vec::new())),
141 }
142 }
143
144 pub fn check_adaptive_limit(&self, tenant_id: &str) -> Result<RateLimitResult> {
146 let config = self.config.read();
147
148 if !config.enabled {
149 return Ok(RateLimitResult {
150 allowed: true,
151 remaining: u32::MAX,
152 retry_after: None,
153 limit: u32::MAX,
154 });
155 }
156
157 let mut profiles = self.profiles.write();
159 let profile = profiles.entry(tenant_id.to_string()).or_insert_with(|| {
160 TenantUsageProfile::new(tenant_id.to_string(), config.max_rate_limit)
161 });
162
163 self.record_request(tenant_id, true, 1.0);
165
166 let recent = self.recent_requests.read();
168 let cutoff = Utc::now() - Duration::hours(1);
169 let recent_count = recent
170 .iter()
171 .filter(|r| r.tenant_id.as_str() == tenant_id && r.timestamp > cutoff)
172 .count();
173
174 let allowed = (recent_count as u32) < profile.current_limit;
176
177 let result = RateLimitResult {
178 allowed,
179 remaining: if allowed {
180 profile.current_limit.saturating_sub(recent_count as u32)
181 } else {
182 0
183 },
184 retry_after: if allowed {
185 None
186 } else {
187 Some(std::time::Duration::from_secs(60))
188 },
189 limit: profile.current_limit,
190 };
191
192 profile.data_points += 1;
194 profile.last_updated = Utc::now();
195
196 Ok(result)
197 }
198
199 pub fn update_adaptive_limits(&self) -> Result<()> {
201 let config = self.config.read();
202
203 if !config.enabled {
204 return Ok(());
205 }
206
207 let mut profiles = self.profiles.write();
208
209 for (tenant_id, profile) in profiles.iter_mut() {
210 if profile.data_points < 100 {
211 continue; }
213
214 let mut new_limit = profile.current_limit;
215 let mut reason = AdjustmentReason::NormalLearning;
216
217 if profile.data_points >= 1000 {
219 let usage_factor = profile.avg_requests_per_hour / profile.current_limit as f64;
220
221 if usage_factor > 0.8 {
222 new_limit =
224 ((profile.current_limit as f64) * (1.0 + config.adjustment_factor)) as u32;
225 reason = AdjustmentReason::NormalLearning;
226 } else if usage_factor < 0.3 {
227 new_limit = ((profile.current_limit as f64)
229 * (1.0 - config.adjustment_factor * 0.5))
230 as u32;
231 reason = AdjustmentReason::NormalLearning;
232 }
233 }
234
235 if config.enable_anomaly_throttling {
237 let recent = self.recent_requests.read();
238 let cutoff = Utc::now() - Duration::minutes(5);
239 let very_recent_count = recent
240 .iter()
241 .filter(|r| r.tenant_id.as_str() == tenant_id && r.timestamp > cutoff)
242 .count();
243
244 let expected_in_5min = profile.avg_requests_per_hour / 12.0;
246 if very_recent_count as f64 > expected_in_5min * 3.0 {
247 new_limit = ((profile.current_limit as f64) * 0.5) as u32;
248 reason = AdjustmentReason::AnomalyDetected;
249 }
250 }
251
252 if config.enable_load_based_adjustment {
254 if let Some(load) = self.get_current_load() {
255 if load.cpu_usage > 0.8 || load.memory_usage > 0.8 {
256 new_limit = ((profile.current_limit as f64) * 0.7) as u32;
258 reason = AdjustmentReason::HighLoad;
259 }
260 }
261 }
262
263 new_limit = new_limit.clamp(config.min_rate_limit, config.max_rate_limit);
265
266 if new_limit != profile.current_limit {
268 profile.adjustment_history.push(LimitAdjustment {
269 timestamp: Utc::now(),
270 old_limit: profile.current_limit,
271 new_limit,
272 reason,
273 });
274
275 profile.current_limit = new_limit;
276
277 if profile.adjustment_history.len() > 100 {
279 profile.adjustment_history.remove(0);
280 }
281 }
282 }
283
284 Ok(())
285 }
286
287 pub fn predict_and_adjust(&self, tenant_id: &str) -> Result<u32> {
289 let config = self.config.read();
290
291 if !config.enable_pattern_prediction {
292 return Ok(0);
293 }
294
295 let profiles = self.profiles.read();
296 if let Some(profile) = profiles.get(tenant_id) {
297 if profile.data_points < 1000 {
298 return Ok(profile.current_limit);
299 }
300
301 let current_hour = Utc::now().hour();
303
304 if profile.peak_times.contains(¤t_hour) {
305 let predicted_limit = ((profile.current_limit as f64) * 1.2) as u32;
307 return Ok(predicted_limit.min(config.max_rate_limit));
308 }
309 }
310
311 Ok(0)
312 }
313
314 pub fn record_system_load(&self, load: SystemLoad) {
316 let mut history = self.load_history.write();
317 history.push((Utc::now(), load));
318
319 let cutoff = Utc::now() - Duration::hours(1);
321 history.retain(|(ts, _)| *ts > cutoff);
322 }
323
324 fn get_current_load(&self) -> Option<SystemLoad> {
325 let history = self.load_history.read();
326 history.last().map(|(_, load)| load.clone())
327 }
328
329 fn record_request(&self, tenant_id: &str, allowed: bool, cost: f64) {
330 let mut requests = self.recent_requests.write();
331 requests.push(RequestRecord {
332 tenant_id: tenant_id.to_string(),
333 timestamp: Utc::now(),
334 allowed,
335 cost,
336 });
337
338 let cutoff = Utc::now() - Duration::hours(self.config.read().learning_window_hours);
340 requests.retain(|r| r.timestamp > cutoff);
341 }
342
343 pub fn get_tenant_stats(&self, tenant_id: &str) -> Option<AdaptiveLimitStats> {
345 let profiles = self.profiles.read();
346 profiles.get(tenant_id).map(|profile| {
347 let recent = self.recent_requests.read();
348 let cutoff = Utc::now() - Duration::hours(1);
349 let requests_last_hour = recent
350 .iter()
351 .filter(|r| r.tenant_id.as_str() == tenant_id && r.timestamp > cutoff)
352 .count();
353
354 AdaptiveLimitStats {
355 current_limit: profile.current_limit,
356 base_limit: profile.base_limit,
357 requests_last_hour: requests_last_hour as u32,
358 avg_requests_per_hour: profile.avg_requests_per_hour,
359 utilization: requests_last_hour as f64 / profile.current_limit as f64,
360 total_adjustments: profile.adjustment_history.len(),
361 last_adjustment: profile.adjustment_history.last().map(|a| a.timestamp),
362 }
363 })
364 }
365
366 pub fn get_stats(&self) -> AdaptiveRateLimiterStats {
368 let profiles = self.profiles.read();
369 let recent = self.recent_requests.read();
370
371 AdaptiveRateLimiterStats {
372 total_tenants: profiles.len(),
373 total_requests: recent.len(),
374 config: self.config.read().clone(),
375 }
376 }
377}
378
379impl TenantUsageProfile {
380 fn new(tenant_id: String, base_limit: u32) -> Self {
381 Self {
382 tenant_id,
383 hourly_averages: vec![0.0; 24],
384 daily_averages: vec![0.0; 7],
385 peak_times: Vec::new(),
386 avg_requests_per_hour: 0.0,
387 stddev_requests_per_hour: 0.0,
388 max_requests_per_hour: 0.0,
389 current_limit: base_limit,
390 base_limit,
391 adjustment_history: Vec::new(),
392 last_updated: Utc::now(),
393 data_points: 0,
394 }
395 }
396}
397
398#[derive(Debug, Clone, Serialize, Deserialize)]
399pub struct AdaptiveLimitStats {
400 pub current_limit: u32,
401 pub base_limit: u32,
402 pub requests_last_hour: u32,
403 pub avg_requests_per_hour: f64,
404 pub utilization: f64,
405 pub total_adjustments: usize,
406 pub last_adjustment: Option<DateTime<Utc>>,
407}
408
409#[derive(Debug, Clone, Serialize, Deserialize)]
410pub struct AdaptiveRateLimiterStats {
411 pub total_tenants: usize,
412 pub total_requests: usize,
413 pub config: AdaptiveRateLimitConfig,
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419
420 #[test]
421 fn test_adaptive_limiter_creation() {
422 let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig::default());
423 let stats = limiter.get_stats();
424
425 assert_eq!(stats.total_tenants, 0);
426 assert_eq!(stats.total_requests, 0);
427 }
428
429 #[test]
430 fn test_adaptive_limit_checking() {
431 let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig {
432 enabled: true,
433 min_rate_limit: 10,
434 max_rate_limit: 100,
435 ..Default::default()
436 });
437
438 let result = limiter.check_adaptive_limit("tenant1").unwrap();
440 assert!(result.allowed);
441 }
442
443 #[test]
444 fn test_limit_adjustment() {
445 let config = AdaptiveRateLimitConfig {
446 min_rate_limit: 10,
447 max_rate_limit: 1000,
448 ..Default::default()
449 };
450
451 let limiter = AdaptiveRateLimiter::new(config);
452
453 {
455 let mut profiles = limiter.profiles.write();
456 let mut profile = TenantUsageProfile::new("tenant1".to_string(), 100);
457 profile.data_points = 1500;
458 profile.avg_requests_per_hour = 90.0; profile.current_limit = 100;
460 profiles.insert("tenant1".to_string(), profile);
461 }
462
463 limiter.update_adaptive_limits().unwrap();
465
466 let stats = limiter.get_tenant_stats("tenant1").unwrap();
468 assert!(stats.current_limit > 100); }
470
471 #[test]
472 fn test_load_based_adjustment() {
473 let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig::default());
474
475 {
477 let mut profiles = limiter.profiles.write();
478 let mut profile = TenantUsageProfile::new("tenant1".to_string(), 100);
479 profile.data_points = 1000;
480 profile.current_limit = 100;
481 profiles.insert("tenant1".to_string(), profile);
482 }
483
484 limiter.record_system_load(SystemLoad {
486 cpu_usage: 0.9,
487 memory_usage: 0.85,
488 active_connections: 1000,
489 queue_depth: 500,
490 });
491
492 limiter.update_adaptive_limits().unwrap();
494
495 let stats = limiter.get_tenant_stats("tenant1").unwrap();
497 assert!(stats.current_limit < 100); }
499
500 #[test]
501 fn test_disabled_adaptive_limiting() {
502 let config = AdaptiveRateLimitConfig {
503 enabled: false,
504 ..Default::default()
505 };
506
507 let limiter = AdaptiveRateLimiter::new(config);
508 let result = limiter.check_adaptive_limit("tenant1").unwrap();
509
510 assert!(result.allowed);
511 assert_eq!(result.remaining, u32::MAX);
512 }
513
514 #[test]
515 fn test_safety_limits() {
516 let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig {
517 min_rate_limit: 50,
518 max_rate_limit: 200,
519 ..Default::default()
520 });
521
522 {
524 let mut profiles = limiter.profiles.write();
525 let mut profile = TenantUsageProfile::new("tenant1".to_string(), 100);
526 profile.data_points = 1500;
527 profile.avg_requests_per_hour = 180.0; profile.current_limit = 190;
529 profiles.insert("tenant1".to_string(), profile);
530 }
531
532 limiter.update_adaptive_limits().unwrap();
533
534 let stats = limiter.get_tenant_stats("tenant1").unwrap();
535 assert!(stats.current_limit <= 200); assert!(stats.current_limit >= 50); }
538
539 #[test]
540 fn test_default_config() {
541 let config = AdaptiveRateLimitConfig::default();
542 assert!(config.enabled);
543 assert!(config.min_rate_limit > 0);
544 assert!(config.max_rate_limit > config.min_rate_limit);
545 }
546
547 #[test]
548 fn test_config_serde() {
549 let config = AdaptiveRateLimitConfig::default();
550 let json = serde_json::to_string(&config).unwrap();
551 let parsed: AdaptiveRateLimitConfig = serde_json::from_str(&json).unwrap();
552 assert_eq!(parsed.enabled, config.enabled);
553 assert_eq!(parsed.min_rate_limit, config.min_rate_limit);
554 }
555
556 #[test]
557 fn test_system_load_clone() {
558 let load = SystemLoad {
559 cpu_usage: 0.5,
560 memory_usage: 0.6,
561 active_connections: 100,
562 queue_depth: 50,
563 };
564
565 let cloned = load.clone();
566 assert_eq!(cloned.cpu_usage, load.cpu_usage);
567 assert_eq!(cloned.active_connections, load.active_connections);
568 }
569
570 #[test]
571 fn test_get_tenant_stats_none() {
572 let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig::default());
573 let stats = limiter.get_tenant_stats("nonexistent");
574 assert!(stats.is_none());
575 }
576
577 #[test]
578 fn test_adaptive_limit_stats_serde() {
579 let stats = AdaptiveLimitStats {
580 current_limit: 100,
581 base_limit: 50,
582 requests_last_hour: 25,
583 avg_requests_per_hour: 30.0,
584 utilization: 0.25,
585 total_adjustments: 5,
586 last_adjustment: Some(Utc::now()),
587 };
588
589 let json = serde_json::to_string(&stats).unwrap();
590 let parsed: AdaptiveLimitStats = serde_json::from_str(&json).unwrap();
591 assert_eq!(parsed.current_limit, stats.current_limit);
592 assert_eq!(parsed.utilization, stats.utilization);
593 }
594
595 #[test]
596 fn test_record_system_load() {
597 let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig::default());
598
599 for i in 0..5 {
601 limiter.record_system_load(SystemLoad {
602 cpu_usage: i as f64 * 0.1,
603 memory_usage: 0.5,
604 active_connections: i * 10,
605 queue_depth: i,
606 });
607 }
608
609 let stats = limiter.get_stats();
610 assert_eq!(stats.total_tenants, 0); }
612
613 #[test]
614 fn test_multiple_tenants() {
615 let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig {
616 enabled: true,
617 ..Default::default()
618 });
619
620 limiter.check_adaptive_limit("tenant1").unwrap();
622 limiter.check_adaptive_limit("tenant2").unwrap();
623 limiter.check_adaptive_limit("tenant3").unwrap();
624
625 let stats = limiter.get_stats();
626 assert_eq!(stats.total_tenants, 3);
627 }
628
629 #[test]
630 fn test_adaptive_limiter_stats_serde() {
631 let stats = AdaptiveRateLimiterStats {
632 total_tenants: 10,
633 total_requests: 1000,
634 config: AdaptiveRateLimitConfig::default(),
635 };
636
637 let json = serde_json::to_string(&stats).unwrap();
638 let parsed: AdaptiveRateLimiterStats = serde_json::from_str(&json).unwrap();
639 assert_eq!(parsed.total_tenants, stats.total_tenants);
640 assert_eq!(parsed.total_requests, stats.total_requests);
641 }
642
643 #[test]
644 fn test_tenant_profile_initialization() {
645 let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig {
646 enabled: true,
647 ..Default::default()
648 });
649
650 limiter.check_adaptive_limit("new_tenant").unwrap();
652
653 let stats = limiter.get_tenant_stats("new_tenant");
654 assert!(stats.is_some());
655 }
656}