allsource_core/security/
adaptive_rate_limit.rs

1/// Adaptive ML-Based Rate Limiting
2///
3/// Automatically adjusts rate limits based on:
4/// - Historical usage patterns
5/// - Traffic anomalies
6/// - System load
7/// - Tenant behavior
8/// - Attack detection
9
10use crate::error::Result;
11use crate::rate_limit::RateLimitResult;
12use chrono::{DateTime, Duration, Timelike, Utc};
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::sync::Arc;
16use parking_lot::RwLock;
17
18/// Adaptive rate limiting configuration
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct AdaptiveRateLimitConfig {
21    /// Enable adaptive rate limiting
22    pub enabled: bool,
23
24    /// Minimum rate limit (safety floor)
25    pub min_rate_limit: u32,
26
27    /// Maximum rate limit (safety ceiling)
28    pub max_rate_limit: u32,
29
30    /// Learning window in hours
31    pub learning_window_hours: i64,
32
33    /// Adjustment factor (how aggressive to adjust, 0.0-1.0)
34    pub adjustment_factor: f64,
35
36    /// Enable anomaly-based throttling
37    pub enable_anomaly_throttling: bool,
38
39    /// Enable load-based adjustment
40    pub enable_load_based_adjustment: bool,
41
42    /// Enable pattern-based prediction
43    pub enable_pattern_prediction: bool,
44}
45
46impl Default for AdaptiveRateLimitConfig {
47    fn default() -> Self {
48        Self {
49            enabled: true,
50            min_rate_limit: 10,
51            max_rate_limit: 10_000,
52            learning_window_hours: 24 * 7, // 1 week
53            adjustment_factor: 0.3,
54            enable_anomaly_throttling: true,
55            enable_load_based_adjustment: true,
56            enable_pattern_prediction: true,
57        }
58    }
59}
60
61/// Tenant usage profile for adaptive learning
62#[derive(Debug, Clone, Serialize, Deserialize)]
63struct TenantUsageProfile {
64    tenant_id: String,
65
66    // Historical patterns
67    hourly_averages: Vec<f64>,           // Average requests per hour
68    daily_averages: Vec<f64>,            // Average requests per day
69    peak_times: Vec<u32>,                // Hours with peak usage
70
71    // Statistical metrics
72    avg_requests_per_hour: f64,
73    stddev_requests_per_hour: f64,
74    max_requests_per_hour: f64,
75
76    // Adaptive parameters
77    current_limit: u32,
78    base_limit: u32,
79    adjustment_history: Vec<LimitAdjustment>,
80
81    // Last updated
82    last_updated: DateTime<Utc>,
83    data_points: usize,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
87struct LimitAdjustment {
88    timestamp: DateTime<Utc>,
89    old_limit: u32,
90    new_limit: u32,
91    reason: AdjustmentReason,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95enum AdjustmentReason {
96    NormalLearning,
97    AnomalyDetected,
98    HighLoad,
99    AttackMitigation,
100    PatternPrediction,
101}
102
103/// System load metrics
104#[derive(Debug, Clone)]
105pub struct SystemLoad {
106    pub cpu_usage: f64,
107    pub memory_usage: f64,
108    pub active_connections: usize,
109    pub queue_depth: usize,
110}
111
112/// Adaptive rate limiter
113pub struct AdaptiveRateLimiter {
114    config: Arc<RwLock<AdaptiveRateLimitConfig>>,
115
116    // Tenant profiles
117    profiles: Arc<RwLock<HashMap<String, TenantUsageProfile>>>,
118
119    // Recent requests for pattern analysis
120    recent_requests: Arc<RwLock<Vec<RequestRecord>>>,
121
122    // System load history
123    load_history: Arc<RwLock<Vec<(DateTime<Utc>, SystemLoad)>>>,
124}
125
126#[derive(Debug, Clone)]
127struct RequestRecord {
128    tenant_id: String,
129    timestamp: DateTime<Utc>,
130    allowed: bool,
131    cost: f64,
132}
133
134impl AdaptiveRateLimiter {
135    /// Create new adaptive rate limiter
136    pub fn new(config: AdaptiveRateLimitConfig) -> Self {
137        Self {
138            config: Arc::new(RwLock::new(config)),
139            profiles: Arc::new(RwLock::new(HashMap::new())),
140            recent_requests: Arc::new(RwLock::new(Vec::new())),
141            load_history: Arc::new(RwLock::new(Vec::new())),
142        }
143    }
144
145    /// Check rate limit with adaptive adjustment
146    pub fn check_adaptive_limit(&self, tenant_id: &str) -> Result<RateLimitResult> {
147        let config = self.config.read();
148
149        if !config.enabled {
150            return Ok(RateLimitResult {
151                allowed: true,
152                remaining: u32::MAX,
153                retry_after: None,
154                limit: u32::MAX,
155            });
156        }
157
158        // Get or create profile
159        let mut profiles = self.profiles.write();
160        let profile = profiles.entry(tenant_id.to_string())
161            .or_insert_with(|| TenantUsageProfile::new(tenant_id.to_string(), config.max_rate_limit));
162
163        // Record request
164        self.record_request(tenant_id, true, 1.0);
165
166        // Get current hour's request count
167        let recent = self.recent_requests.read();
168        let cutoff = Utc::now() - Duration::hours(1);
169        let recent_count = recent.iter()
170            .filter(|r| r.tenant_id.as_str() == tenant_id && r.timestamp > cutoff)
171            .count();
172
173        // Check against current adaptive limit
174        let allowed = (recent_count as u32) < profile.current_limit;
175
176        let result = RateLimitResult {
177            allowed,
178            remaining: if allowed {
179                profile.current_limit.saturating_sub(recent_count as u32)
180            } else {
181                0
182            },
183            retry_after: if allowed { None } else { Some(std::time::Duration::from_secs(60)) },
184            limit: profile.current_limit,
185        };
186
187        // Update profile statistics
188        profile.data_points += 1;
189        profile.last_updated = Utc::now();
190
191        Ok(result)
192    }
193
194    /// Update adaptive limits based on learned patterns
195    pub fn update_adaptive_limits(&self) -> Result<()> {
196        let config = self.config.read();
197
198        if !config.enabled {
199            return Ok(());
200        }
201
202        let mut profiles = self.profiles.write();
203
204        for (tenant_id, profile) in profiles.iter_mut() {
205            if profile.data_points < 100 {
206                continue; // Not enough data
207            }
208
209            let mut new_limit = profile.current_limit;
210            let mut reason = AdjustmentReason::NormalLearning;
211
212            // Learning-based adjustment
213            if profile.data_points >= 1000 {
214                let usage_factor = profile.avg_requests_per_hour / profile.current_limit as f64;
215
216                if usage_factor > 0.8 {
217                    // High utilization - increase limit
218                    new_limit = ((profile.current_limit as f64) * (1.0 + config.adjustment_factor)) as u32;
219                    reason = AdjustmentReason::NormalLearning;
220                } else if usage_factor < 0.3 {
221                    // Low utilization - decrease limit (save resources)
222                    new_limit = ((profile.current_limit as f64) * (1.0 - config.adjustment_factor * 0.5)) as u32;
223                    reason = AdjustmentReason::NormalLearning;
224                }
225            }
226
227            // Anomaly-based throttling
228            if config.enable_anomaly_throttling {
229                let recent = self.recent_requests.read();
230                let cutoff = Utc::now() - Duration::minutes(5);
231                let very_recent_count = recent.iter()
232                    .filter(|r| r.tenant_id.as_str() == tenant_id && r.timestamp > cutoff)
233                    .count();
234
235                // If current rate is 3x average, throttle aggressively
236                let expected_in_5min = profile.avg_requests_per_hour / 12.0;
237                if very_recent_count as f64 > expected_in_5min * 3.0 {
238                    new_limit = ((profile.current_limit as f64) * 0.5) as u32;
239                    reason = AdjustmentReason::AnomalyDetected;
240                }
241            }
242
243            // Load-based adjustment
244            if config.enable_load_based_adjustment {
245                if let Some(load) = self.get_current_load() {
246                    if load.cpu_usage > 0.8 || load.memory_usage > 0.8 {
247                        // High system load - reduce limits
248                        new_limit = ((profile.current_limit as f64) * 0.7) as u32;
249                        reason = AdjustmentReason::HighLoad;
250                    }
251                }
252            }
253
254            // Apply safety limits
255            new_limit = new_limit.clamp(config.min_rate_limit, config.max_rate_limit);
256
257            // Record adjustment if changed
258            if new_limit != profile.current_limit {
259                profile.adjustment_history.push(LimitAdjustment {
260                    timestamp: Utc::now(),
261                    old_limit: profile.current_limit,
262                    new_limit,
263                    reason,
264                });
265
266                profile.current_limit = new_limit;
267
268                // Keep history limited
269                if profile.adjustment_history.len() > 100 {
270                    profile.adjustment_history.remove(0);
271                }
272            }
273        }
274
275        Ok(())
276    }
277
278    /// Predict future load and adjust proactively
279    pub fn predict_and_adjust(&self, tenant_id: &str) -> Result<u32> {
280        let config = self.config.read();
281
282        if !config.enable_pattern_prediction {
283            return Ok(0);
284        }
285
286        let profiles = self.profiles.read();
287        if let Some(profile) = profiles.get(tenant_id) {
288            if profile.data_points < 1000 {
289                return Ok(profile.current_limit);
290            }
291
292            // Simple pattern: check if we're approaching a known peak time
293            let current_hour = Utc::now().hour();
294
295            if profile.peak_times.contains(&current_hour) {
296                // Increase limit proactively
297                let predicted_limit = ((profile.current_limit as f64) * 1.2) as u32;
298                return Ok(predicted_limit.min(config.max_rate_limit));
299            }
300        }
301
302        Ok(0)
303    }
304
305    /// Record system load for load-based adjustments
306    pub fn record_system_load(&self, load: SystemLoad) {
307        let mut history = self.load_history.write();
308        history.push((Utc::now(), load));
309
310        // Keep only recent history (last hour)
311        let cutoff = Utc::now() - Duration::hours(1);
312        history.retain(|(ts, _)| *ts > cutoff);
313    }
314
315    fn get_current_load(&self) -> Option<SystemLoad> {
316        let history = self.load_history.read();
317        history.last().map(|(_, load)| load.clone())
318    }
319
320    fn record_request(&self, tenant_id: &str, allowed: bool, cost: f64) {
321        let mut requests = self.recent_requests.write();
322        requests.push(RequestRecord {
323            tenant_id: tenant_id.to_string(),
324            timestamp: Utc::now(),
325            allowed,
326            cost,
327        });
328
329        // Clean old records
330        let cutoff = Utc::now() - Duration::hours(self.config.read().learning_window_hours);
331        requests.retain(|r| r.timestamp > cutoff);
332    }
333
334    /// Get statistics for a tenant
335    pub fn get_tenant_stats(&self, tenant_id: &str) -> Option<AdaptiveLimitStats> {
336        let profiles = self.profiles.read();
337        profiles.get(tenant_id).map(|profile| {
338            let recent = self.recent_requests.read();
339            let cutoff = Utc::now() - Duration::hours(1);
340            let requests_last_hour = recent.iter()
341                .filter(|r| r.tenant_id.as_str() == tenant_id && r.timestamp > cutoff)
342                .count();
343
344            AdaptiveLimitStats {
345                current_limit: profile.current_limit,
346                base_limit: profile.base_limit,
347                requests_last_hour: requests_last_hour as u32,
348                avg_requests_per_hour: profile.avg_requests_per_hour,
349                utilization: requests_last_hour as f64 / profile.current_limit as f64,
350                total_adjustments: profile.adjustment_history.len(),
351                last_adjustment: profile.adjustment_history.last().map(|a| a.timestamp),
352            }
353        })
354    }
355
356    /// Get overall statistics
357    pub fn get_stats(&self) -> AdaptiveRateLimiterStats {
358        let profiles = self.profiles.read();
359        let recent = self.recent_requests.read();
360
361        AdaptiveRateLimiterStats {
362            total_tenants: profiles.len(),
363            total_requests: recent.len(),
364            config: self.config.read().clone(),
365        }
366    }
367}
368
369impl TenantUsageProfile {
370    fn new(tenant_id: String, base_limit: u32) -> Self {
371        Self {
372            tenant_id,
373            hourly_averages: vec![0.0; 24],
374            daily_averages: vec![0.0; 7],
375            peak_times: Vec::new(),
376            avg_requests_per_hour: 0.0,
377            stddev_requests_per_hour: 0.0,
378            max_requests_per_hour: 0.0,
379            current_limit: base_limit,
380            base_limit,
381            adjustment_history: Vec::new(),
382            last_updated: Utc::now(),
383            data_points: 0,
384        }
385    }
386}
387
388#[derive(Debug, Clone, Serialize, Deserialize)]
389pub struct AdaptiveLimitStats {
390    pub current_limit: u32,
391    pub base_limit: u32,
392    pub requests_last_hour: u32,
393    pub avg_requests_per_hour: f64,
394    pub utilization: f64,
395    pub total_adjustments: usize,
396    pub last_adjustment: Option<DateTime<Utc>>,
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize)]
400pub struct AdaptiveRateLimiterStats {
401    pub total_tenants: usize,
402    pub total_requests: usize,
403    pub config: AdaptiveRateLimitConfig,
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409
410    #[test]
411    fn test_adaptive_limiter_creation() {
412        let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig::default());
413        let stats = limiter.get_stats();
414
415        assert_eq!(stats.total_tenants, 0);
416        assert_eq!(stats.total_requests, 0);
417    }
418
419    #[test]
420    fn test_adaptive_limit_checking() {
421        let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig {
422            enabled: true,
423            min_rate_limit: 10,
424            max_rate_limit: 100,
425            ..Default::default()
426        });
427
428        // First request should be allowed
429        let result = limiter.check_adaptive_limit("tenant1").unwrap();
430        assert!(result.allowed);
431    }
432
433    #[test]
434    fn test_limit_adjustment() {
435        let mut config = AdaptiveRateLimitConfig::default();
436        config.min_rate_limit = 10;
437        config.max_rate_limit = 1000;
438
439        let limiter = AdaptiveRateLimiter::new(config);
440
441        // Create profile with sufficient data
442        {
443            let mut profiles = limiter.profiles.write();
444            let mut profile = TenantUsageProfile::new("tenant1".to_string(), 100);
445            profile.data_points = 1500;
446            profile.avg_requests_per_hour = 90.0; // High utilization
447            profile.current_limit = 100;
448            profiles.insert("tenant1".to_string(), profile);
449        }
450
451        // Update limits
452        limiter.update_adaptive_limits().unwrap();
453
454        // Check if limit was adjusted upward
455        let stats = limiter.get_tenant_stats("tenant1").unwrap();
456        assert!(stats.current_limit > 100); // Should have increased
457    }
458
459    #[test]
460    fn test_load_based_adjustment() {
461        let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig::default());
462
463        // Create profile
464        {
465            let mut profiles = limiter.profiles.write();
466            let mut profile = TenantUsageProfile::new("tenant1".to_string(), 100);
467            profile.data_points = 1000;
468            profile.current_limit = 100;
469            profiles.insert("tenant1".to_string(), profile);
470        }
471
472        // Record high system load
473        limiter.record_system_load(SystemLoad {
474            cpu_usage: 0.9,
475            memory_usage: 0.85,
476            active_connections: 1000,
477            queue_depth: 500,
478        });
479
480        // Update limits
481        limiter.update_adaptive_limits().unwrap();
482
483        // Check if limit was reduced due to high load
484        let stats = limiter.get_tenant_stats("tenant1").unwrap();
485        assert!(stats.current_limit < 100); // Should have decreased
486    }
487
488    #[test]
489    fn test_disabled_adaptive_limiting() {
490        let mut config = AdaptiveRateLimitConfig::default();
491        config.enabled = false;
492
493        let limiter = AdaptiveRateLimiter::new(config);
494        let result = limiter.check_adaptive_limit("tenant1").unwrap();
495
496        assert!(result.allowed);
497        assert_eq!(result.remaining, u32::MAX);
498    }
499
500    #[test]
501    fn test_safety_limits() {
502        let limiter = AdaptiveRateLimiter::new(AdaptiveRateLimitConfig {
503            min_rate_limit: 50,
504            max_rate_limit: 200,
505            ..Default::default()
506        });
507
508        // Create profile that would exceed max
509        {
510            let mut profiles = limiter.profiles.write();
511            let mut profile = TenantUsageProfile::new("tenant1".to_string(), 100);
512            profile.data_points = 1500;
513            profile.avg_requests_per_hour = 180.0; // Very high
514            profile.current_limit = 190;
515            profiles.insert("tenant1".to_string(), profile);
516        }
517
518        limiter.update_adaptive_limits().unwrap();
519
520        let stats = limiter.get_tenant_stats("tenant1").unwrap();
521        assert!(stats.current_limit <= 200); // Should not exceed max
522        assert!(stats.current_limit >= 50);  // Should not go below min
523    }
524}