1use crate::error::Result;
10use crate::infrastructure::security::rate_limit::RateLimitResult;
11use chrono::{DateTime, Duration, Timelike, Utc};
12use dashmap::DashMap;
13use parking_lot::RwLock;
14use serde::{Deserialize, Serialize};
15use std::sync::Arc;
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct AdaptiveRateLimitConfig {
20 pub enabled: bool,
22
23 pub min_rate_limit: u32,
25
26 pub max_rate_limit: u32,
28
29 pub learning_window_hours: i64,
31
32 pub adjustment_factor: f64,
34
35 pub enable_anomaly_throttling: bool,
37
38 pub enable_load_based_adjustment: bool,
40
41 pub enable_pattern_prediction: bool,
43}
44
45impl Default for AdaptiveRateLimitConfig {
46 fn default() -> Self {
47 Self {
48 enabled: true,
49 min_rate_limit: 10,
50 max_rate_limit: 10_000,
51 learning_window_hours: 24 * 7, adjustment_factor: 0.3,
53 enable_anomaly_throttling: true,
54 enable_load_based_adjustment: true,
55 enable_pattern_prediction: true,
56 }
57 }
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62struct TenantUsageProfile {
63 tenant_id: String,
64
65 hourly_averages: Vec<f64>, daily_averages: Vec<f64>, peak_times: Vec<u32>, avg_requests_per_hour: f64,
72 stddev_requests_per_hour: f64,
73 max_requests_per_hour: f64,
74
75 current_limit: u32,
77 base_limit: u32,
78 adjustment_history: Vec<LimitAdjustment>,
79
80 last_updated: DateTime<Utc>,
82 data_points: usize,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86struct LimitAdjustment {
87 timestamp: DateTime<Utc>,
88 old_limit: u32,
89 new_limit: u32,
90 reason: AdjustmentReason,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94enum AdjustmentReason {
95 NormalLearning,
96 AnomalyDetected,
97 HighLoad,
98 AttackMitigation,
99 PatternPrediction,
100}
101
102#[derive(Debug, Clone)]
104pub struct SystemLoad {
105 pub cpu_usage: f64,
106 pub memory_usage: f64,
107 pub active_connections: usize,
108 pub queue_depth: usize,
109}
110
111pub struct AdaptiveRateLimiter {
113 config: Arc<RwLock<AdaptiveRateLimitConfig>>,
114
115 profiles: Arc<DashMap<String, TenantUsageProfile>>,
117
118 recent_requests: Arc<RwLock<Vec<RequestRecord>>>,
120
121 load_history: Arc<RwLock<Vec<(DateTime<Utc>, SystemLoad)>>>,
123}
124
125#[derive(Debug, Clone)]
126struct RequestRecord {
127 tenant_id: String,
128 timestamp: DateTime<Utc>,
129 allowed: bool,
130 cost: f64,
131}
132
133impl AdaptiveRateLimiter {
134 pub fn new(config: AdaptiveRateLimitConfig) -> Self {
136 Self {
137 config: Arc::new(RwLock::new(config)),
138 profiles: Arc::new(DashMap::new()),
139 recent_requests: Arc::new(RwLock::new(Vec::new())),
140 load_history: Arc::new(RwLock::new(Vec::new())),
141 }
142 }
143
144 pub fn check_adaptive_limit(&self, tenant_id: &str) -> Result<RateLimitResult> {
146 let config = self.config.read();
147
148 if !config.enabled {
149 return Ok(RateLimitResult {
150 allowed: true,
151 remaining: u32::MAX,
152 retry_after: None,
153 limit: u32::MAX,
154 });
155 }
156
157 let mut profile = self
159 .profiles
160 .entry(tenant_id.to_string())
161 .or_insert_with(|| {
162 TenantUsageProfile::new(tenant_id.to_string(), config.max_rate_limit)
163 });
164
165 self.record_request(tenant_id, true, 1.0);
167
168 let recent = self.recent_requests.read();
170 let cutoff = Utc::now() - Duration::hours(1);
171 let recent_count = recent
172 .iter()
173 .filter(|r| r.tenant_id.as_str() == tenant_id && r.timestamp > cutoff)
174 .count();
175
176 let allowed = (recent_count as u32) < profile.current_limit;
178
179 let result = RateLimitResult {
180 allowed,
181 remaining: if allowed {
182 profile.current_limit.saturating_sub(recent_count as u32)
183 } else {
184 0
185 },
186 retry_after: if allowed {
187 None
188 } else {
189 Some(std::time::Duration::from_secs(60))
190 },
191 limit: profile.current_limit,
192 };
193
194 profile.data_points += 1;
196 profile.last_updated = Utc::now();
197
198 Ok(result)
199 }
200
201 pub fn update_adaptive_limits(&self) -> Result<()> {
203 let config = self.config.read();
204
205 if !config.enabled {
206 return Ok(());
207 }
208
209 for mut entry in self.profiles.iter_mut() {
210 let tenant_id = entry.key().clone();
211 let profile = entry.value_mut();
212
213 if profile.data_points < 100 {
214 continue; }
216
217 let mut new_limit = profile.current_limit;
218 let mut reason = AdjustmentReason::NormalLearning;
219
220 if profile.data_points >= 1000 {
222 let usage_factor = profile.avg_requests_per_hour / f64::from(profile.current_limit);
223
224 if usage_factor > 0.8 {
225 new_limit = (f64::from(profile.current_limit)
227 * (1.0 + config.adjustment_factor)) as u32;
228 reason = AdjustmentReason::NormalLearning;
229 } else if usage_factor < 0.3 {
230 new_limit = (f64::from(profile.current_limit)
232 * (1.0 - config.adjustment_factor * 0.5))
233 as u32;
234 reason = AdjustmentReason::NormalLearning;
235 }
236 }
237
238 if config.enable_anomaly_throttling {
240 let recent = self.recent_requests.read();
241 let cutoff = Utc::now() - Duration::minutes(5);
242 let very_recent_count = recent
243 .iter()
244 .filter(|r| r.tenant_id.as_str() == tenant_id && r.timestamp > cutoff)
245 .count();
246
247 let expected_in_5min = profile.avg_requests_per_hour / 12.0;
249 if very_recent_count as f64 > expected_in_5min * 3.0 {
250 new_limit = (f64::from(profile.current_limit) * 0.5) as u32;
251 reason = AdjustmentReason::AnomalyDetected;
252 }
253 }
254
255 if config.enable_load_based_adjustment
257 && let Some(load) = self.get_current_load()
258 && (load.cpu_usage > 0.8 || load.memory_usage > 0.8)
259 {
260 new_limit = (f64::from(profile.current_limit) * 0.7) as u32;
262 reason = AdjustmentReason::HighLoad;
263 }
264
265 new_limit = new_limit.clamp(config.min_rate_limit, config.max_rate_limit);
267
268 if new_limit != profile.current_limit {
270 profile.adjustment_history.push(LimitAdjustment {
271 timestamp: Utc::now(),
272 old_limit: profile.current_limit,
273 new_limit,
274 reason,
275 });
276
277 profile.current_limit = new_limit;
278
279 if profile.adjustment_history.len() > 100 {
281 profile.adjustment_history.remove(0);
282 }
283 }
284 }
285
286 Ok(())
287 }
288
289 pub fn predict_and_adjust(&self, tenant_id: &str) -> Result<u32> {
291 let config = self.config.read();
292
293 if !config.enable_pattern_prediction {
294 return Ok(0);
295 }
296
297 if let Some(profile_ref) = self.profiles.get(tenant_id) {
298 let profile = profile_ref.value();
299 if profile.data_points < 1000 {
300 return Ok(profile.current_limit);
301 }
302
303 let current_hour = Utc::now().hour();
305
306 if profile.peak_times.contains(¤t_hour) {
307 let predicted_limit = (f64::from(profile.current_limit) * 1.2) as u32;
309 return Ok(predicted_limit.min(config.max_rate_limit));
310 }
311 }
312
313 Ok(0)
314 }
315
316 pub fn record_system_load(&self, load: SystemLoad) {
318 let mut history = self.load_history.write();
319 history.push((Utc::now(), load));
320
321 let cutoff = Utc::now() - Duration::hours(1);
323 history.retain(|(ts, _)| *ts > cutoff);
324 }
325
326 fn get_current_load(&self) -> Option<SystemLoad> {
327 let history = self.load_history.read();
328 history.last().map(|(_, load)| load.clone())
329 }
330
331 fn record_request(&self, tenant_id: &str, allowed: bool, cost: f64) {
332 let mut requests = self.recent_requests.write();
333 requests.push(RequestRecord {
334 tenant_id: tenant_id.to_string(),
335 timestamp: Utc::now(),
336 allowed,
337 cost,
338 });
339
340 let cutoff = Utc::now() - Duration::hours(self.config.read().learning_window_hours);
342 requests.retain(|r| r.timestamp > cutoff);
343 }
344
345 pub fn get_tenant_stats(&self, tenant_id: &str) -> Option<AdaptiveLimitStats> {
347 self.profiles.get(tenant_id).map(|profile_ref| {
348 let profile = profile_ref.value();
349 let recent = self.recent_requests.read();
350 let cutoff = Utc::now() - Duration::hours(1);
351 let requests_last_hour = recent
352 .iter()
353 .filter(|r| r.tenant_id.as_str() == tenant_id && r.timestamp > cutoff)
354 .count();
355
356 AdaptiveLimitStats {
357 current_limit: profile.current_limit,
358 base_limit: profile.base_limit,
359 requests_last_hour: requests_last_hour as u32,
360 avg_requests_per_hour: profile.avg_requests_per_hour,
361 utilization: requests_last_hour as f64 / f64::from(profile.current_limit),
362 total_adjustments: profile.adjustment_history.len(),
363 last_adjustment: profile.adjustment_history.last().map(|a| a.timestamp),
364 }
365 })
366 }
367
368 pub fn get_stats(&self) -> AdaptiveRateLimiterStats {
370 let recent = self.recent_requests.read();
371
372 AdaptiveRateLimiterStats {
373 total_tenants: self.profiles.len(),
374 total_requests: recent.len(),
375 config: self.config.read().clone(),
376 }
377 }
378}
379
380impl TenantUsageProfile {
381 fn new(tenant_id: String, base_limit: u32) -> Self {
382 Self {
383 tenant_id,
384 hourly_averages: vec![0.0; 24],
385 daily_averages: vec![0.0; 7],
386 peak_times: Vec::new(),
387 avg_requests_per_hour: 0.0,
388 stddev_requests_per_hour: 0.0,
389 max_requests_per_hour: 0.0,
390 current_limit: base_limit,
391 base_limit,
392 adjustment_history: Vec::new(),
393 last_updated: Utc::now(),
394 data_points: 0,
395 }
396 }
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize)]
400pub struct AdaptiveLimitStats {
401 pub current_limit: u32,
402 pub base_limit: u32,
403 pub requests_last_hour: u32,
404 pub avg_requests_per_hour: f64,
405 pub utilization: f64,
406 pub total_adjustments: usize,
407 pub last_adjustment: Option<DateTime<Utc>>,
408}
409
410#[derive(Debug, Clone, Serialize, Deserialize)]
411pub struct AdaptiveRateLimiterStats {
412 pub total_tenants: usize,
413 pub total_requests: usize,
414 pub config: AdaptiveRateLimitConfig,
415}
416
417#[cfg(test)]
418mod tests {
419 use super::*;
420
421 #[test]
422 fn test_adaptive_limiter_creation() {
423 let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig::default());
424 let stats = limiter.get_stats();
425
426 assert_eq!(stats.total_tenants, 0);
427 assert_eq!(stats.total_requests, 0);
428 }
429
430 #[test]
431 fn test_adaptive_limit_checking() {
432 let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig {
433 enabled: true,
434 min_rate_limit: 10,
435 max_rate_limit: 100,
436 ..Default::default()
437 });
438
439 let result = limiter.check_adaptive_limit("tenant1").unwrap();
441 assert!(result.allowed);
442 }
443
444 #[test]
445 fn test_limit_adjustment() {
446 let config = AdaptiveRateLimitConfig {
447 min_rate_limit: 10,
448 max_rate_limit: 1000,
449 ..Default::default()
450 };
451
452 let limiter = AdaptiveRateLimiter::new(config);
453
454 {
456 let mut profile = TenantUsageProfile::new("tenant1".to_string(), 100);
457 profile.data_points = 1500;
458 profile.avg_requests_per_hour = 90.0; profile.current_limit = 100;
460 limiter.profiles.insert("tenant1".to_string(), profile);
461 }
462
463 limiter.update_adaptive_limits().unwrap();
465
466 let stats = limiter.get_tenant_stats("tenant1").unwrap();
468 assert!(stats.current_limit > 100); }
470
471 #[test]
472 fn test_load_based_adjustment() {
473 let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig::default());
474
475 {
477 let mut profile = TenantUsageProfile::new("tenant1".to_string(), 100);
478 profile.data_points = 1000;
479 profile.current_limit = 100;
480 limiter.profiles.insert("tenant1".to_string(), profile);
481 }
482
483 limiter.record_system_load(SystemLoad {
485 cpu_usage: 0.9,
486 memory_usage: 0.85,
487 active_connections: 1000,
488 queue_depth: 500,
489 });
490
491 limiter.update_adaptive_limits().unwrap();
493
494 let stats = limiter.get_tenant_stats("tenant1").unwrap();
496 assert!(stats.current_limit < 100); }
498
499 #[test]
500 fn test_disabled_adaptive_limiting() {
501 let config = AdaptiveRateLimitConfig {
502 enabled: false,
503 ..Default::default()
504 };
505
506 let limiter = AdaptiveRateLimiter::new(config);
507 let result = limiter.check_adaptive_limit("tenant1").unwrap();
508
509 assert!(result.allowed);
510 assert_eq!(result.remaining, u32::MAX);
511 }
512
513 #[test]
514 fn test_safety_limits() {
515 let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig {
516 min_rate_limit: 50,
517 max_rate_limit: 200,
518 ..Default::default()
519 });
520
521 {
523 let mut profile = TenantUsageProfile::new("tenant1".to_string(), 100);
524 profile.data_points = 1500;
525 profile.avg_requests_per_hour = 180.0; profile.current_limit = 190;
527 limiter.profiles.insert("tenant1".to_string(), profile);
528 }
529
530 limiter.update_adaptive_limits().unwrap();
531
532 let stats = limiter.get_tenant_stats("tenant1").unwrap();
533 assert!(stats.current_limit <= 200); assert!(stats.current_limit >= 50); }
536
537 #[test]
538 fn test_default_config() {
539 let config = AdaptiveRateLimitConfig::default();
540 assert!(config.enabled);
541 assert!(config.min_rate_limit > 0);
542 assert!(config.max_rate_limit > config.min_rate_limit);
543 }
544
545 #[test]
546 fn test_config_serde() {
547 let config = AdaptiveRateLimitConfig::default();
548 let json = serde_json::to_string(&config).unwrap();
549 let parsed: AdaptiveRateLimitConfig = serde_json::from_str(&json).unwrap();
550 assert_eq!(parsed.enabled, config.enabled);
551 assert_eq!(parsed.min_rate_limit, config.min_rate_limit);
552 }
553
554 #[test]
555 fn test_system_load_clone() {
556 let load = SystemLoad {
557 cpu_usage: 0.5,
558 memory_usage: 0.6,
559 active_connections: 100,
560 queue_depth: 50,
561 };
562
563 let cloned = load.clone();
564 assert_eq!(cloned.cpu_usage, load.cpu_usage);
565 assert_eq!(cloned.active_connections, load.active_connections);
566 }
567
568 #[test]
569 fn test_get_tenant_stats_none() {
570 let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig::default());
571 let stats = limiter.get_tenant_stats("nonexistent");
572 assert!(stats.is_none());
573 }
574
575 #[test]
576 fn test_adaptive_limit_stats_serde() {
577 let stats = AdaptiveLimitStats {
578 current_limit: 100,
579 base_limit: 50,
580 requests_last_hour: 25,
581 avg_requests_per_hour: 30.0,
582 utilization: 0.25,
583 total_adjustments: 5,
584 last_adjustment: Some(Utc::now()),
585 };
586
587 let json = serde_json::to_string(&stats).unwrap();
588 let parsed: AdaptiveLimitStats = serde_json::from_str(&json).unwrap();
589 assert_eq!(parsed.current_limit, stats.current_limit);
590 assert_eq!(parsed.utilization, stats.utilization);
591 }
592
593 #[test]
594 fn test_record_system_load() {
595 let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig::default());
596
597 for i in 0..5 {
599 limiter.record_system_load(SystemLoad {
600 cpu_usage: i as f64 * 0.1,
601 memory_usage: 0.5,
602 active_connections: i * 10,
603 queue_depth: i,
604 });
605 }
606
607 let stats = limiter.get_stats();
608 assert_eq!(stats.total_tenants, 0); }
610
611 #[test]
612 fn test_multiple_tenants() {
613 let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig {
614 enabled: true,
615 ..Default::default()
616 });
617
618 limiter.check_adaptive_limit("tenant1").unwrap();
620 limiter.check_adaptive_limit("tenant2").unwrap();
621 limiter.check_adaptive_limit("tenant3").unwrap();
622
623 let stats = limiter.get_stats();
624 assert_eq!(stats.total_tenants, 3);
625 }
626
627 #[test]
628 fn test_adaptive_limiter_stats_serde() {
629 let stats = AdaptiveRateLimiterStats {
630 total_tenants: 10,
631 total_requests: 1000,
632 config: AdaptiveRateLimitConfig::default(),
633 };
634
635 let json = serde_json::to_string(&stats).unwrap();
636 let parsed: AdaptiveRateLimiterStats = serde_json::from_str(&json).unwrap();
637 assert_eq!(parsed.total_tenants, stats.total_tenants);
638 assert_eq!(parsed.total_requests, stats.total_requests);
639 }
640
641 #[test]
642 fn test_tenant_profile_initialization() {
643 let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig {
644 enabled: true,
645 ..Default::default()
646 });
647
648 limiter.check_adaptive_limit("new_tenant").unwrap();
650
651 let stats = limiter.get_tenant_stats("new_tenant");
652 assert!(stats.is_some());
653 }
654}