1use crate::error::Result;
11use crate::rate_limit::RateLimitResult;
12use chrono::{DateTime, Duration, Timelike, Utc};
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::sync::Arc;
16use parking_lot::RwLock;
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct AdaptiveRateLimitConfig {
21 pub enabled: bool,
23
24 pub min_rate_limit: u32,
26
27 pub max_rate_limit: u32,
29
30 pub learning_window_hours: i64,
32
33 pub adjustment_factor: f64,
35
36 pub enable_anomaly_throttling: bool,
38
39 pub enable_load_based_adjustment: bool,
41
42 pub enable_pattern_prediction: bool,
44}
45
46impl Default for AdaptiveRateLimitConfig {
47 fn default() -> Self {
48 Self {
49 enabled: true,
50 min_rate_limit: 10,
51 max_rate_limit: 10_000,
52 learning_window_hours: 24 * 7, adjustment_factor: 0.3,
54 enable_anomaly_throttling: true,
55 enable_load_based_adjustment: true,
56 enable_pattern_prediction: true,
57 }
58 }
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63struct TenantUsageProfile {
64 tenant_id: String,
65
66 hourly_averages: Vec<f64>, daily_averages: Vec<f64>, peak_times: Vec<u32>, avg_requests_per_hour: f64,
73 stddev_requests_per_hour: f64,
74 max_requests_per_hour: f64,
75
76 current_limit: u32,
78 base_limit: u32,
79 adjustment_history: Vec<LimitAdjustment>,
80
81 last_updated: DateTime<Utc>,
83 data_points: usize,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
87struct LimitAdjustment {
88 timestamp: DateTime<Utc>,
89 old_limit: u32,
90 new_limit: u32,
91 reason: AdjustmentReason,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95enum AdjustmentReason {
96 NormalLearning,
97 AnomalyDetected,
98 HighLoad,
99 AttackMitigation,
100 PatternPrediction,
101}
102
103#[derive(Debug, Clone)]
105pub struct SystemLoad {
106 pub cpu_usage: f64,
107 pub memory_usage: f64,
108 pub active_connections: usize,
109 pub queue_depth: usize,
110}
111
112pub struct AdaptiveRateLimiter {
114 config: Arc<RwLock<AdaptiveRateLimitConfig>>,
115
116 profiles: Arc<RwLock<HashMap<String, TenantUsageProfile>>>,
118
119 recent_requests: Arc<RwLock<Vec<RequestRecord>>>,
121
122 load_history: Arc<RwLock<Vec<(DateTime<Utc>, SystemLoad)>>>,
124}
125
126#[derive(Debug, Clone)]
127struct RequestRecord {
128 tenant_id: String,
129 timestamp: DateTime<Utc>,
130 allowed: bool,
131 cost: f64,
132}
133
134impl AdaptiveRateLimiter {
135 pub fn new(config: AdaptiveRateLimitConfig) -> Self {
137 Self {
138 config: Arc::new(RwLock::new(config)),
139 profiles: Arc::new(RwLock::new(HashMap::new())),
140 recent_requests: Arc::new(RwLock::new(Vec::new())),
141 load_history: Arc::new(RwLock::new(Vec::new())),
142 }
143 }
144
145 pub fn check_adaptive_limit(&self, tenant_id: &str) -> Result<RateLimitResult> {
147 let config = self.config.read();
148
149 if !config.enabled {
150 return Ok(RateLimitResult {
151 allowed: true,
152 remaining: u32::MAX,
153 retry_after: None,
154 limit: u32::MAX,
155 });
156 }
157
158 let mut profiles = self.profiles.write();
160 let profile = profiles.entry(tenant_id.to_string())
161 .or_insert_with(|| TenantUsageProfile::new(tenant_id.to_string(), config.max_rate_limit));
162
163 self.record_request(tenant_id, true, 1.0);
165
166 let recent = self.recent_requests.read();
168 let cutoff = Utc::now() - Duration::hours(1);
169 let recent_count = recent.iter()
170 .filter(|r| r.tenant_id.as_str() == tenant_id && r.timestamp > cutoff)
171 .count();
172
173 let allowed = (recent_count as u32) < profile.current_limit;
175
176 let result = RateLimitResult {
177 allowed,
178 remaining: if allowed {
179 profile.current_limit.saturating_sub(recent_count as u32)
180 } else {
181 0
182 },
183 retry_after: if allowed { None } else { Some(std::time::Duration::from_secs(60)) },
184 limit: profile.current_limit,
185 };
186
187 profile.data_points += 1;
189 profile.last_updated = Utc::now();
190
191 Ok(result)
192 }
193
194 pub fn update_adaptive_limits(&self) -> Result<()> {
196 let config = self.config.read();
197
198 if !config.enabled {
199 return Ok(());
200 }
201
202 let mut profiles = self.profiles.write();
203
204 for (tenant_id, profile) in profiles.iter_mut() {
205 if profile.data_points < 100 {
206 continue; }
208
209 let mut new_limit = profile.current_limit;
210 let mut reason = AdjustmentReason::NormalLearning;
211
212 if profile.data_points >= 1000 {
214 let usage_factor = profile.avg_requests_per_hour / profile.current_limit as f64;
215
216 if usage_factor > 0.8 {
217 new_limit = ((profile.current_limit as f64) * (1.0 + config.adjustment_factor)) as u32;
219 reason = AdjustmentReason::NormalLearning;
220 } else if usage_factor < 0.3 {
221 new_limit = ((profile.current_limit as f64) * (1.0 - config.adjustment_factor * 0.5)) as u32;
223 reason = AdjustmentReason::NormalLearning;
224 }
225 }
226
227 if config.enable_anomaly_throttling {
229 let recent = self.recent_requests.read();
230 let cutoff = Utc::now() - Duration::minutes(5);
231 let very_recent_count = recent.iter()
232 .filter(|r| r.tenant_id.as_str() == tenant_id && r.timestamp > cutoff)
233 .count();
234
235 let expected_in_5min = profile.avg_requests_per_hour / 12.0;
237 if very_recent_count as f64 > expected_in_5min * 3.0 {
238 new_limit = ((profile.current_limit as f64) * 0.5) as u32;
239 reason = AdjustmentReason::AnomalyDetected;
240 }
241 }
242
243 if config.enable_load_based_adjustment {
245 if let Some(load) = self.get_current_load() {
246 if load.cpu_usage > 0.8 || load.memory_usage > 0.8 {
247 new_limit = ((profile.current_limit as f64) * 0.7) as u32;
249 reason = AdjustmentReason::HighLoad;
250 }
251 }
252 }
253
254 new_limit = new_limit.clamp(config.min_rate_limit, config.max_rate_limit);
256
257 if new_limit != profile.current_limit {
259 profile.adjustment_history.push(LimitAdjustment {
260 timestamp: Utc::now(),
261 old_limit: profile.current_limit,
262 new_limit,
263 reason,
264 });
265
266 profile.current_limit = new_limit;
267
268 if profile.adjustment_history.len() > 100 {
270 profile.adjustment_history.remove(0);
271 }
272 }
273 }
274
275 Ok(())
276 }
277
278 pub fn predict_and_adjust(&self, tenant_id: &str) -> Result<u32> {
280 let config = self.config.read();
281
282 if !config.enable_pattern_prediction {
283 return Ok(0);
284 }
285
286 let profiles = self.profiles.read();
287 if let Some(profile) = profiles.get(tenant_id) {
288 if profile.data_points < 1000 {
289 return Ok(profile.current_limit);
290 }
291
292 let current_hour = Utc::now().hour();
294
295 if profile.peak_times.contains(¤t_hour) {
296 let predicted_limit = ((profile.current_limit as f64) * 1.2) as u32;
298 return Ok(predicted_limit.min(config.max_rate_limit));
299 }
300 }
301
302 Ok(0)
303 }
304
305 pub fn record_system_load(&self, load: SystemLoad) {
307 let mut history = self.load_history.write();
308 history.push((Utc::now(), load));
309
310 let cutoff = Utc::now() - Duration::hours(1);
312 history.retain(|(ts, _)| *ts > cutoff);
313 }
314
315 fn get_current_load(&self) -> Option<SystemLoad> {
316 let history = self.load_history.read();
317 history.last().map(|(_, load)| load.clone())
318 }
319
320 fn record_request(&self, tenant_id: &str, allowed: bool, cost: f64) {
321 let mut requests = self.recent_requests.write();
322 requests.push(RequestRecord {
323 tenant_id: tenant_id.to_string(),
324 timestamp: Utc::now(),
325 allowed,
326 cost,
327 });
328
329 let cutoff = Utc::now() - Duration::hours(self.config.read().learning_window_hours);
331 requests.retain(|r| r.timestamp > cutoff);
332 }
333
334 pub fn get_tenant_stats(&self, tenant_id: &str) -> Option<AdaptiveLimitStats> {
336 let profiles = self.profiles.read();
337 profiles.get(tenant_id).map(|profile| {
338 let recent = self.recent_requests.read();
339 let cutoff = Utc::now() - Duration::hours(1);
340 let requests_last_hour = recent.iter()
341 .filter(|r| r.tenant_id.as_str() == tenant_id && r.timestamp > cutoff)
342 .count();
343
344 AdaptiveLimitStats {
345 current_limit: profile.current_limit,
346 base_limit: profile.base_limit,
347 requests_last_hour: requests_last_hour as u32,
348 avg_requests_per_hour: profile.avg_requests_per_hour,
349 utilization: requests_last_hour as f64 / profile.current_limit as f64,
350 total_adjustments: profile.adjustment_history.len(),
351 last_adjustment: profile.adjustment_history.last().map(|a| a.timestamp),
352 }
353 })
354 }
355
356 pub fn get_stats(&self) -> AdaptiveRateLimiterStats {
358 let profiles = self.profiles.read();
359 let recent = self.recent_requests.read();
360
361 AdaptiveRateLimiterStats {
362 total_tenants: profiles.len(),
363 total_requests: recent.len(),
364 config: self.config.read().clone(),
365 }
366 }
367}
368
369impl TenantUsageProfile {
370 fn new(tenant_id: String, base_limit: u32) -> Self {
371 Self {
372 tenant_id,
373 hourly_averages: vec![0.0; 24],
374 daily_averages: vec![0.0; 7],
375 peak_times: Vec::new(),
376 avg_requests_per_hour: 0.0,
377 stddev_requests_per_hour: 0.0,
378 max_requests_per_hour: 0.0,
379 current_limit: base_limit,
380 base_limit,
381 adjustment_history: Vec::new(),
382 last_updated: Utc::now(),
383 data_points: 0,
384 }
385 }
386}
387
388#[derive(Debug, Clone, Serialize, Deserialize)]
389pub struct AdaptiveLimitStats {
390 pub current_limit: u32,
391 pub base_limit: u32,
392 pub requests_last_hour: u32,
393 pub avg_requests_per_hour: f64,
394 pub utilization: f64,
395 pub total_adjustments: usize,
396 pub last_adjustment: Option<DateTime<Utc>>,
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize)]
400pub struct AdaptiveRateLimiterStats {
401 pub total_tenants: usize,
402 pub total_requests: usize,
403 pub config: AdaptiveRateLimitConfig,
404}
405
406#[cfg(test)]
407mod tests {
408 use super::*;
409
410 #[test]
411 fn test_adaptive_limiter_creation() {
412 let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig::default());
413 let stats = limiter.get_stats();
414
415 assert_eq!(stats.total_tenants, 0);
416 assert_eq!(stats.total_requests, 0);
417 }
418
419 #[test]
420 fn test_adaptive_limit_checking() {
421 let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig {
422 enabled: true,
423 min_rate_limit: 10,
424 max_rate_limit: 100,
425 ..Default::default()
426 });
427
428 let result = limiter.check_adaptive_limit("tenant1").unwrap();
430 assert!(result.allowed);
431 }
432
433 #[test]
434 fn test_limit_adjustment() {
435 let mut config = AdaptiveRateLimitConfig::default();
436 config.min_rate_limit = 10;
437 config.max_rate_limit = 1000;
438
439 let limiter = AdaptiveRateLimiter::new(config);
440
441 {
443 let mut profiles = limiter.profiles.write();
444 let mut profile = TenantUsageProfile::new("tenant1".to_string(), 100);
445 profile.data_points = 1500;
446 profile.avg_requests_per_hour = 90.0; profile.current_limit = 100;
448 profiles.insert("tenant1".to_string(), profile);
449 }
450
451 limiter.update_adaptive_limits().unwrap();
453
454 let stats = limiter.get_tenant_stats("tenant1").unwrap();
456 assert!(stats.current_limit > 100); }
458
459 #[test]
460 fn test_load_based_adjustment() {
461 let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig::default());
462
463 {
465 let mut profiles = limiter.profiles.write();
466 let mut profile = TenantUsageProfile::new("tenant1".to_string(), 100);
467 profile.data_points = 1000;
468 profile.current_limit = 100;
469 profiles.insert("tenant1".to_string(), profile);
470 }
471
472 limiter.record_system_load(SystemLoad {
474 cpu_usage: 0.9,
475 memory_usage: 0.85,
476 active_connections: 1000,
477 queue_depth: 500,
478 });
479
480 limiter.update_adaptive_limits().unwrap();
482
483 let stats = limiter.get_tenant_stats("tenant1").unwrap();
485 assert!(stats.current_limit < 100); }
487
488 #[test]
489 fn test_disabled_adaptive_limiting() {
490 let mut config = AdaptiveRateLimitConfig::default();
491 config.enabled = false;
492
493 let limiter = AdaptiveRateLimiter::new(config);
494 let result = limiter.check_adaptive_limit("tenant1").unwrap();
495
496 assert!(result.allowed);
497 assert_eq!(result.remaining, u32::MAX);
498 }
499
500 #[test]
501 fn test_safety_limits() {
502 let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig {
503 min_rate_limit: 50,
504 max_rate_limit: 200,
505 ..Default::default()
506 });
507
508 {
510 let mut profiles = limiter.profiles.write();
511 let mut profile = TenantUsageProfile::new("tenant1".to_string(), 100);
512 profile.data_points = 1500;
513 profile.avg_requests_per_hour = 180.0; profile.current_limit = 190;
515 profiles.insert("tenant1".to_string(), profile);
516 }
517
518 limiter.update_adaptive_limits().unwrap();
519
520 let stats = limiter.get_tenant_stats("tenant1").unwrap();
521 assert!(stats.current_limit <= 200); assert!(stats.current_limit >= 50); }
524}