Skip to main content

allsource_core/security/
adaptive_rate_limit.rs

1/// Adaptive ML-Based Rate Limiting
2///
3/// Automatically adjusts rate limits based on:
4/// - Historical usage patterns
5/// - Traffic anomalies
6/// - System load
7/// - Tenant behavior
8/// - Attack detection
9use crate::error::Result;
10use crate::infrastructure::security::rate_limit::RateLimitResult;
11use chrono::{DateTime, Duration, Timelike, Utc};
12use parking_lot::RwLock;
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::sync::Arc;
16
17/// Adaptive rate limiting configuration
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct AdaptiveRateLimitConfig {
20    /// Enable adaptive rate limiting
21    pub enabled: bool,
22
23    /// Minimum rate limit (safety floor)
24    pub min_rate_limit: u32,
25
26    /// Maximum rate limit (safety ceiling)
27    pub max_rate_limit: u32,
28
29    /// Learning window in hours
30    pub learning_window_hours: i64,
31
32    /// Adjustment factor (how aggressive to adjust, 0.0-1.0)
33    pub adjustment_factor: f64,
34
35    /// Enable anomaly-based throttling
36    pub enable_anomaly_throttling: bool,
37
38    /// Enable load-based adjustment
39    pub enable_load_based_adjustment: bool,
40
41    /// Enable pattern-based prediction
42    pub enable_pattern_prediction: bool,
43}
44
45impl Default for AdaptiveRateLimitConfig {
46    fn default() -> Self {
47        Self {
48            enabled: true,
49            min_rate_limit: 10,
50            max_rate_limit: 10_000,
51            learning_window_hours: 24 * 7, // 1 week
52            adjustment_factor: 0.3,
53            enable_anomaly_throttling: true,
54            enable_load_based_adjustment: true,
55            enable_pattern_prediction: true,
56        }
57    }
58}
59
60/// Tenant usage profile for adaptive learning
61#[derive(Debug, Clone, Serialize, Deserialize)]
62struct TenantUsageProfile {
63    tenant_id: String,
64
65    // Historical patterns
66    hourly_averages: Vec<f64>, // Average requests per hour
67    daily_averages: Vec<f64>,  // Average requests per day
68    peak_times: Vec<u32>,      // Hours with peak usage
69
70    // Statistical metrics
71    avg_requests_per_hour: f64,
72    stddev_requests_per_hour: f64,
73    max_requests_per_hour: f64,
74
75    // Adaptive parameters
76    current_limit: u32,
77    base_limit: u32,
78    adjustment_history: Vec<LimitAdjustment>,
79
80    // Last updated
81    last_updated: DateTime<Utc>,
82    data_points: usize,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86struct LimitAdjustment {
87    timestamp: DateTime<Utc>,
88    old_limit: u32,
89    new_limit: u32,
90    reason: AdjustmentReason,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94enum AdjustmentReason {
95    NormalLearning,
96    AnomalyDetected,
97    HighLoad,
98    AttackMitigation,
99    PatternPrediction,
100}
101
102/// System load metrics
103#[derive(Debug, Clone)]
104pub struct SystemLoad {
105    pub cpu_usage: f64,
106    pub memory_usage: f64,
107    pub active_connections: usize,
108    pub queue_depth: usize,
109}
110
111/// Adaptive rate limiter
112pub struct AdaptiveRateLimiter {
113    config: Arc<RwLock<AdaptiveRateLimitConfig>>,
114
115    // Tenant profiles
116    profiles: Arc<RwLock<HashMap<String, TenantUsageProfile>>>,
117
118    // Recent requests for pattern analysis
119    recent_requests: Arc<RwLock<Vec<RequestRecord>>>,
120
121    // System load history
122    load_history: Arc<RwLock<Vec<(DateTime<Utc>, SystemLoad)>>>,
123}
124
125#[derive(Debug, Clone)]
126struct RequestRecord {
127    tenant_id: String,
128    timestamp: DateTime<Utc>,
129    allowed: bool,
130    cost: f64,
131}
132
133impl AdaptiveRateLimiter {
134    /// Create new adaptive rate limiter
135    pub fn new(config: AdaptiveRateLimitConfig) -> Self {
136        Self {
137            config: Arc::new(RwLock::new(config)),
138            profiles: Arc::new(RwLock::new(HashMap::new())),
139            recent_requests: Arc::new(RwLock::new(Vec::new())),
140            load_history: Arc::new(RwLock::new(Vec::new())),
141        }
142    }
143
144    /// Check rate limit with adaptive adjustment
145    pub fn check_adaptive_limit(&self, tenant_id: &str) -> Result<RateLimitResult> {
146        let config = self.config.read();
147
148        if !config.enabled {
149            return Ok(RateLimitResult {
150                allowed: true,
151                remaining: u32::MAX,
152                retry_after: None,
153                limit: u32::MAX,
154            });
155        }
156
157        // Get or create profile
158        let mut profiles = self.profiles.write();
159        let profile = profiles.entry(tenant_id.to_string()).or_insert_with(|| {
160            TenantUsageProfile::new(tenant_id.to_string(), config.max_rate_limit)
161        });
162
163        // Record request
164        self.record_request(tenant_id, true, 1.0);
165
166        // Get current hour's request count
167        let recent = self.recent_requests.read();
168        let cutoff = Utc::now() - Duration::hours(1);
169        let recent_count = recent
170            .iter()
171            .filter(|r| r.tenant_id.as_str() == tenant_id && r.timestamp > cutoff)
172            .count();
173
174        // Check against current adaptive limit
175        let allowed = (recent_count as u32) < profile.current_limit;
176
177        let result = RateLimitResult {
178            allowed,
179            remaining: if allowed {
180                profile.current_limit.saturating_sub(recent_count as u32)
181            } else {
182                0
183            },
184            retry_after: if allowed {
185                None
186            } else {
187                Some(std::time::Duration::from_secs(60))
188            },
189            limit: profile.current_limit,
190        };
191
192        // Update profile statistics
193        profile.data_points += 1;
194        profile.last_updated = Utc::now();
195
196        Ok(result)
197    }
198
199    /// Update adaptive limits based on learned patterns
200    pub fn update_adaptive_limits(&self) -> Result<()> {
201        let config = self.config.read();
202
203        if !config.enabled {
204            return Ok(());
205        }
206
207        let mut profiles = self.profiles.write();
208
209        for (tenant_id, profile) in profiles.iter_mut() {
210            if profile.data_points < 100 {
211                continue; // Not enough data
212            }
213
214            let mut new_limit = profile.current_limit;
215            let mut reason = AdjustmentReason::NormalLearning;
216
217            // Learning-based adjustment
218            if profile.data_points >= 1000 {
219                let usage_factor = profile.avg_requests_per_hour / profile.current_limit as f64;
220
221                if usage_factor > 0.8 {
222                    // High utilization - increase limit
223                    new_limit =
224                        ((profile.current_limit as f64) * (1.0 + config.adjustment_factor)) as u32;
225                    reason = AdjustmentReason::NormalLearning;
226                } else if usage_factor < 0.3 {
227                    // Low utilization - decrease limit (save resources)
228                    new_limit = ((profile.current_limit as f64)
229                        * (1.0 - config.adjustment_factor * 0.5))
230                        as u32;
231                    reason = AdjustmentReason::NormalLearning;
232                }
233            }
234
235            // Anomaly-based throttling
236            if config.enable_anomaly_throttling {
237                let recent = self.recent_requests.read();
238                let cutoff = Utc::now() - Duration::minutes(5);
239                let very_recent_count = recent
240                    .iter()
241                    .filter(|r| r.tenant_id.as_str() == tenant_id && r.timestamp > cutoff)
242                    .count();
243
244                // If current rate is 3x average, throttle aggressively
245                let expected_in_5min = profile.avg_requests_per_hour / 12.0;
246                if very_recent_count as f64 > expected_in_5min * 3.0 {
247                    new_limit = ((profile.current_limit as f64) * 0.5) as u32;
248                    reason = AdjustmentReason::AnomalyDetected;
249                }
250            }
251
252            // Load-based adjustment
253            if config.enable_load_based_adjustment {
254                if let Some(load) = self.get_current_load() {
255                    if load.cpu_usage > 0.8 || load.memory_usage > 0.8 {
256                        // High system load - reduce limits
257                        new_limit = ((profile.current_limit as f64) * 0.7) as u32;
258                        reason = AdjustmentReason::HighLoad;
259                    }
260                }
261            }
262
263            // Apply safety limits
264            new_limit = new_limit.clamp(config.min_rate_limit, config.max_rate_limit);
265
266            // Record adjustment if changed
267            if new_limit != profile.current_limit {
268                profile.adjustment_history.push(LimitAdjustment {
269                    timestamp: Utc::now(),
270                    old_limit: profile.current_limit,
271                    new_limit,
272                    reason,
273                });
274
275                profile.current_limit = new_limit;
276
277                // Keep history limited
278                if profile.adjustment_history.len() > 100 {
279                    profile.adjustment_history.remove(0);
280                }
281            }
282        }
283
284        Ok(())
285    }
286
287    /// Predict future load and adjust proactively
288    pub fn predict_and_adjust(&self, tenant_id: &str) -> Result<u32> {
289        let config = self.config.read();
290
291        if !config.enable_pattern_prediction {
292            return Ok(0);
293        }
294
295        let profiles = self.profiles.read();
296        if let Some(profile) = profiles.get(tenant_id) {
297            if profile.data_points < 1000 {
298                return Ok(profile.current_limit);
299            }
300
301            // Simple pattern: check if we're approaching a known peak time
302            let current_hour = Utc::now().hour();
303
304            if profile.peak_times.contains(&current_hour) {
305                // Increase limit proactively
306                let predicted_limit = ((profile.current_limit as f64) * 1.2) as u32;
307                return Ok(predicted_limit.min(config.max_rate_limit));
308            }
309        }
310
311        Ok(0)
312    }
313
314    /// Record system load for load-based adjustments
315    pub fn record_system_load(&self, load: SystemLoad) {
316        let mut history = self.load_history.write();
317        history.push((Utc::now(), load));
318
319        // Keep only recent history (last hour)
320        let cutoff = Utc::now() - Duration::hours(1);
321        history.retain(|(ts, _)| *ts > cutoff);
322    }
323
324    fn get_current_load(&self) -> Option<SystemLoad> {
325        let history = self.load_history.read();
326        history.last().map(|(_, load)| load.clone())
327    }
328
329    fn record_request(&self, tenant_id: &str, allowed: bool, cost: f64) {
330        let mut requests = self.recent_requests.write();
331        requests.push(RequestRecord {
332            tenant_id: tenant_id.to_string(),
333            timestamp: Utc::now(),
334            allowed,
335            cost,
336        });
337
338        // Clean old records
339        let cutoff = Utc::now() - Duration::hours(self.config.read().learning_window_hours);
340        requests.retain(|r| r.timestamp > cutoff);
341    }
342
343    /// Get statistics for a tenant
344    pub fn get_tenant_stats(&self, tenant_id: &str) -> Option<AdaptiveLimitStats> {
345        let profiles = self.profiles.read();
346        profiles.get(tenant_id).map(|profile| {
347            let recent = self.recent_requests.read();
348            let cutoff = Utc::now() - Duration::hours(1);
349            let requests_last_hour = recent
350                .iter()
351                .filter(|r| r.tenant_id.as_str() == tenant_id && r.timestamp > cutoff)
352                .count();
353
354            AdaptiveLimitStats {
355                current_limit: profile.current_limit,
356                base_limit: profile.base_limit,
357                requests_last_hour: requests_last_hour as u32,
358                avg_requests_per_hour: profile.avg_requests_per_hour,
359                utilization: requests_last_hour as f64 / profile.current_limit as f64,
360                total_adjustments: profile.adjustment_history.len(),
361                last_adjustment: profile.adjustment_history.last().map(|a| a.timestamp),
362            }
363        })
364    }
365
366    /// Get overall statistics
367    pub fn get_stats(&self) -> AdaptiveRateLimiterStats {
368        let profiles = self.profiles.read();
369        let recent = self.recent_requests.read();
370
371        AdaptiveRateLimiterStats {
372            total_tenants: profiles.len(),
373            total_requests: recent.len(),
374            config: self.config.read().clone(),
375        }
376    }
377}
378
379impl TenantUsageProfile {
380    fn new(tenant_id: String, base_limit: u32) -> Self {
381        Self {
382            tenant_id,
383            hourly_averages: vec![0.0; 24],
384            daily_averages: vec![0.0; 7],
385            peak_times: Vec::new(),
386            avg_requests_per_hour: 0.0,
387            stddev_requests_per_hour: 0.0,
388            max_requests_per_hour: 0.0,
389            current_limit: base_limit,
390            base_limit,
391            adjustment_history: Vec::new(),
392            last_updated: Utc::now(),
393            data_points: 0,
394        }
395    }
396}
397
398#[derive(Debug, Clone, Serialize, Deserialize)]
399pub struct AdaptiveLimitStats {
400    pub current_limit: u32,
401    pub base_limit: u32,
402    pub requests_last_hour: u32,
403    pub avg_requests_per_hour: f64,
404    pub utilization: f64,
405    pub total_adjustments: usize,
406    pub last_adjustment: Option<DateTime<Utc>>,
407}
408
409#[derive(Debug, Clone, Serialize, Deserialize)]
410pub struct AdaptiveRateLimiterStats {
411    pub total_tenants: usize,
412    pub total_requests: usize,
413    pub config: AdaptiveRateLimitConfig,
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419
420    #[test]
421    fn test_adaptive_limiter_creation() {
422        let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig::default());
423        let stats = limiter.get_stats();
424
425        assert_eq!(stats.total_tenants, 0);
426        assert_eq!(stats.total_requests, 0);
427    }
428
429    #[test]
430    fn test_adaptive_limit_checking() {
431        let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig {
432            enabled: true,
433            min_rate_limit: 10,
434            max_rate_limit: 100,
435            ..Default::default()
436        });
437
438        // First request should be allowed
439        let result = limiter.check_adaptive_limit("tenant1").unwrap();
440        assert!(result.allowed);
441    }
442
443    #[test]
444    fn test_limit_adjustment() {
445        let config = AdaptiveRateLimitConfig {
446            min_rate_limit: 10,
447            max_rate_limit: 1000,
448            ..Default::default()
449        };
450
451        let limiter = AdaptiveRateLimiter::new(config);
452
453        // Create profile with sufficient data
454        {
455            let mut profiles = limiter.profiles.write();
456            let mut profile = TenantUsageProfile::new("tenant1".to_string(), 100);
457            profile.data_points = 1500;
458            profile.avg_requests_per_hour = 90.0; // High utilization
459            profile.current_limit = 100;
460            profiles.insert("tenant1".to_string(), profile);
461        }
462
463        // Update limits
464        limiter.update_adaptive_limits().unwrap();
465
466        // Check if limit was adjusted upward
467        let stats = limiter.get_tenant_stats("tenant1").unwrap();
468        assert!(stats.current_limit > 100); // Should have increased
469    }
470
471    #[test]
472    fn test_load_based_adjustment() {
473        let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig::default());
474
475        // Create profile
476        {
477            let mut profiles = limiter.profiles.write();
478            let mut profile = TenantUsageProfile::new("tenant1".to_string(), 100);
479            profile.data_points = 1000;
480            profile.current_limit = 100;
481            profiles.insert("tenant1".to_string(), profile);
482        }
483
484        // Record high system load
485        limiter.record_system_load(SystemLoad {
486            cpu_usage: 0.9,
487            memory_usage: 0.85,
488            active_connections: 1000,
489            queue_depth: 500,
490        });
491
492        // Update limits
493        limiter.update_adaptive_limits().unwrap();
494
495        // Check if limit was reduced due to high load
496        let stats = limiter.get_tenant_stats("tenant1").unwrap();
497        assert!(stats.current_limit < 100); // Should have decreased
498    }
499
500    #[test]
501    fn test_disabled_adaptive_limiting() {
502        let config = AdaptiveRateLimitConfig {
503            enabled: false,
504            ..Default::default()
505        };
506
507        let limiter = AdaptiveRateLimiter::new(config);
508        let result = limiter.check_adaptive_limit("tenant1").unwrap();
509
510        assert!(result.allowed);
511        assert_eq!(result.remaining, u32::MAX);
512    }
513
514    #[test]
515    fn test_safety_limits() {
516        let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig {
517            min_rate_limit: 50,
518            max_rate_limit: 200,
519            ..Default::default()
520        });
521
522        // Create profile that would exceed max
523        {
524            let mut profiles = limiter.profiles.write();
525            let mut profile = TenantUsageProfile::new("tenant1".to_string(), 100);
526            profile.data_points = 1500;
527            profile.avg_requests_per_hour = 180.0; // Very high
528            profile.current_limit = 190;
529            profiles.insert("tenant1".to_string(), profile);
530        }
531
532        limiter.update_adaptive_limits().unwrap();
533
534        let stats = limiter.get_tenant_stats("tenant1").unwrap();
535        assert!(stats.current_limit <= 200); // Should not exceed max
536        assert!(stats.current_limit >= 50); // Should not go below min
537    }
538
539    #[test]
540    fn test_default_config() {
541        let config = AdaptiveRateLimitConfig::default();
542        assert!(config.enabled);
543        assert!(config.min_rate_limit > 0);
544        assert!(config.max_rate_limit > config.min_rate_limit);
545    }
546
547    #[test]
548    fn test_config_serde() {
549        let config = AdaptiveRateLimitConfig::default();
550        let json = serde_json::to_string(&config).unwrap();
551        let parsed: AdaptiveRateLimitConfig = serde_json::from_str(&json).unwrap();
552        assert_eq!(parsed.enabled, config.enabled);
553        assert_eq!(parsed.min_rate_limit, config.min_rate_limit);
554    }
555
556    #[test]
557    fn test_system_load_clone() {
558        let load = SystemLoad {
559            cpu_usage: 0.5,
560            memory_usage: 0.6,
561            active_connections: 100,
562            queue_depth: 50,
563        };
564
565        let cloned = load.clone();
566        assert_eq!(cloned.cpu_usage, load.cpu_usage);
567        assert_eq!(cloned.active_connections, load.active_connections);
568    }
569
570    #[test]
571    fn test_get_tenant_stats_none() {
572        let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig::default());
573        let stats = limiter.get_tenant_stats("nonexistent");
574        assert!(stats.is_none());
575    }
576
577    #[test]
578    fn test_adaptive_limit_stats_serde() {
579        let stats = AdaptiveLimitStats {
580            current_limit: 100,
581            base_limit: 50,
582            requests_last_hour: 25,
583            avg_requests_per_hour: 30.0,
584            utilization: 0.25,
585            total_adjustments: 5,
586            last_adjustment: Some(Utc::now()),
587        };
588
589        let json = serde_json::to_string(&stats).unwrap();
590        let parsed: AdaptiveLimitStats = serde_json::from_str(&json).unwrap();
591        assert_eq!(parsed.current_limit, stats.current_limit);
592        assert_eq!(parsed.utilization, stats.utilization);
593    }
594
595    #[test]
596    fn test_record_system_load() {
597        let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig::default());
598
599        // Record multiple loads
600        for i in 0..5 {
601            limiter.record_system_load(SystemLoad {
602                cpu_usage: i as f64 * 0.1,
603                memory_usage: 0.5,
604                active_connections: i * 10,
605                queue_depth: i,
606            });
607        }
608
609        let stats = limiter.get_stats();
610        assert_eq!(stats.total_tenants, 0); // No tenants yet
611    }
612
613    #[test]
614    fn test_multiple_tenants() {
615        let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig {
616            enabled: true,
617            ..Default::default()
618        });
619
620        // Check limits for multiple tenants
621        limiter.check_adaptive_limit("tenant1").unwrap();
622        limiter.check_adaptive_limit("tenant2").unwrap();
623        limiter.check_adaptive_limit("tenant3").unwrap();
624
625        let stats = limiter.get_stats();
626        assert_eq!(stats.total_tenants, 3);
627    }
628
629    #[test]
630    fn test_adaptive_limiter_stats_serde() {
631        let stats = AdaptiveRateLimiterStats {
632            total_tenants: 10,
633            total_requests: 1000,
634            config: AdaptiveRateLimitConfig::default(),
635        };
636
637        let json = serde_json::to_string(&stats).unwrap();
638        let parsed: AdaptiveRateLimiterStats = serde_json::from_str(&json).unwrap();
639        assert_eq!(parsed.total_tenants, stats.total_tenants);
640        assert_eq!(parsed.total_requests, stats.total_requests);
641    }
642
643    #[test]
644    fn test_tenant_profile_initialization() {
645        let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig {
646            enabled: true,
647            ..Default::default()
648        });
649
650        // First request creates a profile
651        limiter.check_adaptive_limit("new_tenant").unwrap();
652
653        let stats = limiter.get_tenant_stats("new_tenant");
654        assert!(stats.is_some());
655    }
656}