Skip to main content

allsource_core/security/
adaptive_rate_limit.rs

1/// Adaptive ML-Based Rate Limiting
2///
3/// Automatically adjusts rate limits based on:
4/// - Historical usage patterns
5/// - Traffic anomalies
6/// - System load
7/// - Tenant behavior
8/// - Attack detection
9use crate::error::Result;
10use crate::infrastructure::security::rate_limit::RateLimitResult;
11use chrono::{DateTime, Duration, Timelike, Utc};
12use dashmap::DashMap;
13use parking_lot::RwLock;
14use serde::{Deserialize, Serialize};
15use std::sync::Arc;
16
17/// Adaptive rate limiting configuration
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct AdaptiveRateLimitConfig {
20    /// Enable adaptive rate limiting
21    pub enabled: bool,
22
23    /// Minimum rate limit (safety floor)
24    pub min_rate_limit: u32,
25
26    /// Maximum rate limit (safety ceiling)
27    pub max_rate_limit: u32,
28
29    /// Learning window in hours
30    pub learning_window_hours: i64,
31
32    /// Adjustment factor (how aggressive to adjust, 0.0-1.0)
33    pub adjustment_factor: f64,
34
35    /// Enable anomaly-based throttling
36    pub enable_anomaly_throttling: bool,
37
38    /// Enable load-based adjustment
39    pub enable_load_based_adjustment: bool,
40
41    /// Enable pattern-based prediction
42    pub enable_pattern_prediction: bool,
43}
44
45impl Default for AdaptiveRateLimitConfig {
46    fn default() -> Self {
47        Self {
48            enabled: true,
49            min_rate_limit: 10,
50            max_rate_limit: 10_000,
51            learning_window_hours: 24 * 7, // 1 week
52            adjustment_factor: 0.3,
53            enable_anomaly_throttling: true,
54            enable_load_based_adjustment: true,
55            enable_pattern_prediction: true,
56        }
57    }
58}
59
60/// Tenant usage profile for adaptive learning
61#[derive(Debug, Clone, Serialize, Deserialize)]
62struct TenantUsageProfile {
63    tenant_id: String,
64
65    // Historical patterns
66    hourly_averages: Vec<f64>, // Average requests per hour
67    daily_averages: Vec<f64>,  // Average requests per day
68    peak_times: Vec<u32>,      // Hours with peak usage
69
70    // Statistical metrics
71    avg_requests_per_hour: f64,
72    stddev_requests_per_hour: f64,
73    max_requests_per_hour: f64,
74
75    // Adaptive parameters
76    current_limit: u32,
77    base_limit: u32,
78    adjustment_history: Vec<LimitAdjustment>,
79
80    // Last updated
81    last_updated: DateTime<Utc>,
82    data_points: usize,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86struct LimitAdjustment {
87    timestamp: DateTime<Utc>,
88    old_limit: u32,
89    new_limit: u32,
90    reason: AdjustmentReason,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94enum AdjustmentReason {
95    NormalLearning,
96    AnomalyDetected,
97    HighLoad,
98    AttackMitigation,
99    PatternPrediction,
100}
101
102/// System load metrics
103#[derive(Debug, Clone)]
104pub struct SystemLoad {
105    pub cpu_usage: f64,
106    pub memory_usage: f64,
107    pub active_connections: usize,
108    pub queue_depth: usize,
109}
110
111/// Adaptive rate limiter
112pub struct AdaptiveRateLimiter {
113    config: Arc<RwLock<AdaptiveRateLimitConfig>>,
114
115    // Tenant profiles - using DashMap for lock-free concurrent access
116    profiles: Arc<DashMap<String, TenantUsageProfile>>,
117
118    // Recent requests for pattern analysis
119    recent_requests: Arc<RwLock<Vec<RequestRecord>>>,
120
121    // System load history
122    load_history: Arc<RwLock<Vec<(DateTime<Utc>, SystemLoad)>>>,
123}
124
125#[derive(Debug, Clone)]
126struct RequestRecord {
127    tenant_id: String,
128    timestamp: DateTime<Utc>,
129    allowed: bool,
130    cost: f64,
131}
132
133impl AdaptiveRateLimiter {
134    /// Create new adaptive rate limiter
135    pub fn new(config: AdaptiveRateLimitConfig) -> Self {
136        Self {
137            config: Arc::new(RwLock::new(config)),
138            profiles: Arc::new(DashMap::new()),
139            recent_requests: Arc::new(RwLock::new(Vec::new())),
140            load_history: Arc::new(RwLock::new(Vec::new())),
141        }
142    }
143
144    /// Check rate limit with adaptive adjustment
145    pub fn check_adaptive_limit(&self, tenant_id: &str) -> Result<RateLimitResult> {
146        let config = self.config.read();
147
148        if !config.enabled {
149            return Ok(RateLimitResult {
150                allowed: true,
151                remaining: u32::MAX,
152                retry_after: None,
153                limit: u32::MAX,
154            });
155        }
156
157        // Get or create profile
158        let mut profile = self
159            .profiles
160            .entry(tenant_id.to_string())
161            .or_insert_with(|| {
162                TenantUsageProfile::new(tenant_id.to_string(), config.max_rate_limit)
163            });
164
165        // Record request
166        self.record_request(tenant_id, true, 1.0);
167
168        // Get current hour's request count
169        let recent = self.recent_requests.read();
170        let cutoff = Utc::now() - Duration::hours(1);
171        let recent_count = recent
172            .iter()
173            .filter(|r| r.tenant_id.as_str() == tenant_id && r.timestamp > cutoff)
174            .count();
175
176        // Check against current adaptive limit
177        let allowed = (recent_count as u32) < profile.current_limit;
178
179        let result = RateLimitResult {
180            allowed,
181            remaining: if allowed {
182                profile.current_limit.saturating_sub(recent_count as u32)
183            } else {
184                0
185            },
186            retry_after: if allowed {
187                None
188            } else {
189                Some(std::time::Duration::from_secs(60))
190            },
191            limit: profile.current_limit,
192        };
193
194        // Update profile statistics
195        profile.data_points += 1;
196        profile.last_updated = Utc::now();
197
198        Ok(result)
199    }
200
201    /// Update adaptive limits based on learned patterns
202    pub fn update_adaptive_limits(&self) -> Result<()> {
203        let config = self.config.read();
204
205        if !config.enabled {
206            return Ok(());
207        }
208
209        for mut entry in self.profiles.iter_mut() {
210            let tenant_id = entry.key().clone();
211            let profile = entry.value_mut();
212
213            if profile.data_points < 100 {
214                continue; // Not enough data
215            }
216
217            let mut new_limit = profile.current_limit;
218            let mut reason = AdjustmentReason::NormalLearning;
219
220            // Learning-based adjustment
221            if profile.data_points >= 1000 {
222                let usage_factor = profile.avg_requests_per_hour / f64::from(profile.current_limit);
223
224                if usage_factor > 0.8 {
225                    // High utilization - increase limit
226                    new_limit = (f64::from(profile.current_limit)
227                        * (1.0 + config.adjustment_factor)) as u32;
228                    reason = AdjustmentReason::NormalLearning;
229                } else if usage_factor < 0.3 {
230                    // Low utilization - decrease limit (save resources)
231                    new_limit = (f64::from(profile.current_limit)
232                        * (1.0 - config.adjustment_factor * 0.5))
233                        as u32;
234                    reason = AdjustmentReason::NormalLearning;
235                }
236            }
237
238            // Anomaly-based throttling
239            if config.enable_anomaly_throttling {
240                let recent = self.recent_requests.read();
241                let cutoff = Utc::now() - Duration::minutes(5);
242                let very_recent_count = recent
243                    .iter()
244                    .filter(|r| r.tenant_id.as_str() == tenant_id && r.timestamp > cutoff)
245                    .count();
246
247                // If current rate is 3x average, throttle aggressively
248                let expected_in_5min = profile.avg_requests_per_hour / 12.0;
249                if very_recent_count as f64 > expected_in_5min * 3.0 {
250                    new_limit = (f64::from(profile.current_limit) * 0.5) as u32;
251                    reason = AdjustmentReason::AnomalyDetected;
252                }
253            }
254
255            // Load-based adjustment
256            if config.enable_load_based_adjustment
257                && let Some(load) = self.get_current_load()
258                && (load.cpu_usage > 0.8 || load.memory_usage > 0.8)
259            {
260                // High system load - reduce limits
261                new_limit = (f64::from(profile.current_limit) * 0.7) as u32;
262                reason = AdjustmentReason::HighLoad;
263            }
264
265            // Apply safety limits
266            new_limit = new_limit.clamp(config.min_rate_limit, config.max_rate_limit);
267
268            // Record adjustment if changed
269            if new_limit != profile.current_limit {
270                profile.adjustment_history.push(LimitAdjustment {
271                    timestamp: Utc::now(),
272                    old_limit: profile.current_limit,
273                    new_limit,
274                    reason,
275                });
276
277                profile.current_limit = new_limit;
278
279                // Keep history limited
280                if profile.adjustment_history.len() > 100 {
281                    profile.adjustment_history.remove(0);
282                }
283            }
284        }
285
286        Ok(())
287    }
288
289    /// Predict future load and adjust proactively
290    pub fn predict_and_adjust(&self, tenant_id: &str) -> Result<u32> {
291        let config = self.config.read();
292
293        if !config.enable_pattern_prediction {
294            return Ok(0);
295        }
296
297        if let Some(profile_ref) = self.profiles.get(tenant_id) {
298            let profile = profile_ref.value();
299            if profile.data_points < 1000 {
300                return Ok(profile.current_limit);
301            }
302
303            // Simple pattern: check if we're approaching a known peak time
304            let current_hour = Utc::now().hour();
305
306            if profile.peak_times.contains(&current_hour) {
307                // Increase limit proactively
308                let predicted_limit = (f64::from(profile.current_limit) * 1.2) as u32;
309                return Ok(predicted_limit.min(config.max_rate_limit));
310            }
311        }
312
313        Ok(0)
314    }
315
316    /// Record system load for load-based adjustments
317    pub fn record_system_load(&self, load: SystemLoad) {
318        let mut history = self.load_history.write();
319        history.push((Utc::now(), load));
320
321        // Keep only recent history (last hour)
322        let cutoff = Utc::now() - Duration::hours(1);
323        history.retain(|(ts, _)| *ts > cutoff);
324    }
325
326    fn get_current_load(&self) -> Option<SystemLoad> {
327        let history = self.load_history.read();
328        history.last().map(|(_, load)| load.clone())
329    }
330
331    fn record_request(&self, tenant_id: &str, allowed: bool, cost: f64) {
332        let mut requests = self.recent_requests.write();
333        requests.push(RequestRecord {
334            tenant_id: tenant_id.to_string(),
335            timestamp: Utc::now(),
336            allowed,
337            cost,
338        });
339
340        // Clean old records
341        let cutoff = Utc::now() - Duration::hours(self.config.read().learning_window_hours);
342        requests.retain(|r| r.timestamp > cutoff);
343    }
344
345    /// Get statistics for a tenant
346    pub fn get_tenant_stats(&self, tenant_id: &str) -> Option<AdaptiveLimitStats> {
347        self.profiles.get(tenant_id).map(|profile_ref| {
348            let profile = profile_ref.value();
349            let recent = self.recent_requests.read();
350            let cutoff = Utc::now() - Duration::hours(1);
351            let requests_last_hour = recent
352                .iter()
353                .filter(|r| r.tenant_id.as_str() == tenant_id && r.timestamp > cutoff)
354                .count();
355
356            AdaptiveLimitStats {
357                current_limit: profile.current_limit,
358                base_limit: profile.base_limit,
359                requests_last_hour: requests_last_hour as u32,
360                avg_requests_per_hour: profile.avg_requests_per_hour,
361                utilization: requests_last_hour as f64 / f64::from(profile.current_limit),
362                total_adjustments: profile.adjustment_history.len(),
363                last_adjustment: profile.adjustment_history.last().map(|a| a.timestamp),
364            }
365        })
366    }
367
368    /// Get overall statistics
369    pub fn get_stats(&self) -> AdaptiveRateLimiterStats {
370        let recent = self.recent_requests.read();
371
372        AdaptiveRateLimiterStats {
373            total_tenants: self.profiles.len(),
374            total_requests: recent.len(),
375            config: self.config.read().clone(),
376        }
377    }
378}
379
380impl TenantUsageProfile {
381    fn new(tenant_id: String, base_limit: u32) -> Self {
382        Self {
383            tenant_id,
384            hourly_averages: vec![0.0; 24],
385            daily_averages: vec![0.0; 7],
386            peak_times: Vec::new(),
387            avg_requests_per_hour: 0.0,
388            stddev_requests_per_hour: 0.0,
389            max_requests_per_hour: 0.0,
390            current_limit: base_limit,
391            base_limit,
392            adjustment_history: Vec::new(),
393            last_updated: Utc::now(),
394            data_points: 0,
395        }
396    }
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize)]
400pub struct AdaptiveLimitStats {
401    pub current_limit: u32,
402    pub base_limit: u32,
403    pub requests_last_hour: u32,
404    pub avg_requests_per_hour: f64,
405    pub utilization: f64,
406    pub total_adjustments: usize,
407    pub last_adjustment: Option<DateTime<Utc>>,
408}
409
410#[derive(Debug, Clone, Serialize, Deserialize)]
411pub struct AdaptiveRateLimiterStats {
412    pub total_tenants: usize,
413    pub total_requests: usize,
414    pub config: AdaptiveRateLimitConfig,
415}
416
417#[cfg(test)]
418mod tests {
419    use super::*;
420
421    #[test]
422    fn test_adaptive_limiter_creation() {
423        let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig::default());
424        let stats = limiter.get_stats();
425
426        assert_eq!(stats.total_tenants, 0);
427        assert_eq!(stats.total_requests, 0);
428    }
429
430    #[test]
431    fn test_adaptive_limit_checking() {
432        let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig {
433            enabled: true,
434            min_rate_limit: 10,
435            max_rate_limit: 100,
436            ..Default::default()
437        });
438
439        // First request should be allowed
440        let result = limiter.check_adaptive_limit("tenant1").unwrap();
441        assert!(result.allowed);
442    }
443
444    #[test]
445    fn test_limit_adjustment() {
446        let config = AdaptiveRateLimitConfig {
447            min_rate_limit: 10,
448            max_rate_limit: 1000,
449            ..Default::default()
450        };
451
452        let limiter = AdaptiveRateLimiter::new(config);
453
454        // Create profile with sufficient data
455        {
456            let mut profile = TenantUsageProfile::new("tenant1".to_string(), 100);
457            profile.data_points = 1500;
458            profile.avg_requests_per_hour = 90.0; // High utilization
459            profile.current_limit = 100;
460            limiter.profiles.insert("tenant1".to_string(), profile);
461        }
462
463        // Update limits
464        limiter.update_adaptive_limits().unwrap();
465
466        // Check if limit was adjusted upward
467        let stats = limiter.get_tenant_stats("tenant1").unwrap();
468        assert!(stats.current_limit > 100); // Should have increased
469    }
470
471    #[test]
472    fn test_load_based_adjustment() {
473        let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig::default());
474
475        // Create profile
476        {
477            let mut profile = TenantUsageProfile::new("tenant1".to_string(), 100);
478            profile.data_points = 1000;
479            profile.current_limit = 100;
480            limiter.profiles.insert("tenant1".to_string(), profile);
481        }
482
483        // Record high system load
484        limiter.record_system_load(SystemLoad {
485            cpu_usage: 0.9,
486            memory_usage: 0.85,
487            active_connections: 1000,
488            queue_depth: 500,
489        });
490
491        // Update limits
492        limiter.update_adaptive_limits().unwrap();
493
494        // Check if limit was reduced due to high load
495        let stats = limiter.get_tenant_stats("tenant1").unwrap();
496        assert!(stats.current_limit < 100); // Should have decreased
497    }
498
499    #[test]
500    fn test_disabled_adaptive_limiting() {
501        let config = AdaptiveRateLimitConfig {
502            enabled: false,
503            ..Default::default()
504        };
505
506        let limiter = AdaptiveRateLimiter::new(config);
507        let result = limiter.check_adaptive_limit("tenant1").unwrap();
508
509        assert!(result.allowed);
510        assert_eq!(result.remaining, u32::MAX);
511    }
512
513    #[test]
514    fn test_safety_limits() {
515        let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig {
516            min_rate_limit: 50,
517            max_rate_limit: 200,
518            ..Default::default()
519        });
520
521        // Create profile that would exceed max
522        {
523            let mut profile = TenantUsageProfile::new("tenant1".to_string(), 100);
524            profile.data_points = 1500;
525            profile.avg_requests_per_hour = 180.0; // Very high
526            profile.current_limit = 190;
527            limiter.profiles.insert("tenant1".to_string(), profile);
528        }
529
530        limiter.update_adaptive_limits().unwrap();
531
532        let stats = limiter.get_tenant_stats("tenant1").unwrap();
533        assert!(stats.current_limit <= 200); // Should not exceed max
534        assert!(stats.current_limit >= 50); // Should not go below min
535    }
536
537    #[test]
538    fn test_default_config() {
539        let config = AdaptiveRateLimitConfig::default();
540        assert!(config.enabled);
541        assert!(config.min_rate_limit > 0);
542        assert!(config.max_rate_limit > config.min_rate_limit);
543    }
544
545    #[test]
546    fn test_config_serde() {
547        let config = AdaptiveRateLimitConfig::default();
548        let json = serde_json::to_string(&config).unwrap();
549        let parsed: AdaptiveRateLimitConfig = serde_json::from_str(&json).unwrap();
550        assert_eq!(parsed.enabled, config.enabled);
551        assert_eq!(parsed.min_rate_limit, config.min_rate_limit);
552    }
553
554    #[test]
555    fn test_system_load_clone() {
556        let load = SystemLoad {
557            cpu_usage: 0.5,
558            memory_usage: 0.6,
559            active_connections: 100,
560            queue_depth: 50,
561        };
562
563        let cloned = load.clone();
564        assert_eq!(cloned.cpu_usage, load.cpu_usage);
565        assert_eq!(cloned.active_connections, load.active_connections);
566    }
567
568    #[test]
569    fn test_get_tenant_stats_none() {
570        let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig::default());
571        let stats = limiter.get_tenant_stats("nonexistent");
572        assert!(stats.is_none());
573    }
574
575    #[test]
576    fn test_adaptive_limit_stats_serde() {
577        let stats = AdaptiveLimitStats {
578            current_limit: 100,
579            base_limit: 50,
580            requests_last_hour: 25,
581            avg_requests_per_hour: 30.0,
582            utilization: 0.25,
583            total_adjustments: 5,
584            last_adjustment: Some(Utc::now()),
585        };
586
587        let json = serde_json::to_string(&stats).unwrap();
588        let parsed: AdaptiveLimitStats = serde_json::from_str(&json).unwrap();
589        assert_eq!(parsed.current_limit, stats.current_limit);
590        assert_eq!(parsed.utilization, stats.utilization);
591    }
592
593    #[test]
594    fn test_record_system_load() {
595        let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig::default());
596
597        // Record multiple loads
598        for i in 0..5 {
599            limiter.record_system_load(SystemLoad {
600                cpu_usage: i as f64 * 0.1,
601                memory_usage: 0.5,
602                active_connections: i * 10,
603                queue_depth: i,
604            });
605        }
606
607        let stats = limiter.get_stats();
608        assert_eq!(stats.total_tenants, 0); // No tenants yet
609    }
610
611    #[test]
612    fn test_multiple_tenants() {
613        let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig {
614            enabled: true,
615            ..Default::default()
616        });
617
618        // Check limits for multiple tenants
619        limiter.check_adaptive_limit("tenant1").unwrap();
620        limiter.check_adaptive_limit("tenant2").unwrap();
621        limiter.check_adaptive_limit("tenant3").unwrap();
622
623        let stats = limiter.get_stats();
624        assert_eq!(stats.total_tenants, 3);
625    }
626
627    #[test]
628    fn test_adaptive_limiter_stats_serde() {
629        let stats = AdaptiveRateLimiterStats {
630            total_tenants: 10,
631            total_requests: 1000,
632            config: AdaptiveRateLimitConfig::default(),
633        };
634
635        let json = serde_json::to_string(&stats).unwrap();
636        let parsed: AdaptiveRateLimiterStats = serde_json::from_str(&json).unwrap();
637        assert_eq!(parsed.total_tenants, stats.total_tenants);
638        assert_eq!(parsed.total_requests, stats.total_requests);
639    }
640
641    #[test]
642    fn test_tenant_profile_initialization() {
643        let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig {
644            enabled: true,
645            ..Default::default()
646        });
647
648        // First request creates a profile
649        limiter.check_adaptive_limit("new_tenant").unwrap();
650
651        let stats = limiter.get_tenant_stats("new_tenant");
652        assert!(stats.is_some());
653    }
654}