ipfrs_semantic/
auto_scaling.rs

1//! Auto-scaling advisor for production deployments
2//!
3//! This module provides intelligent recommendations for scaling semantic search
4//! systems based on observed metrics and workload patterns.
5//!
6//! # Features
7//!
8//! - **Load Analysis**: Analyze query load and resource utilization
9//! - **Scaling Recommendations**: Suggest horizontal/vertical scaling
10//! - **Cost Estimation**: Estimate infrastructure costs
11//! - **Performance Prediction**: Predict performance under different configurations
12//!
13//! # Example
14//!
15//! ```rust
16//! use ipfrs_semantic::auto_scaling::{AutoScalingAdvisor, WorkloadMetrics};
17//! use std::time::Duration;
18//!
19//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
20//! let mut advisor = AutoScalingAdvisor::new();
21//!
22//! // Record workload metrics
23//! let metrics = WorkloadMetrics {
24//!     queries_per_second: 1500.0,
25//!     avg_latency: Duration::from_millis(10),
26//!     p99_latency: Duration::from_millis(50),
27//!     memory_usage_mb: 4096.0,
28//!     cpu_utilization: 0.85,
29//!     cache_hit_rate: 0.60,
30//!     index_size: 10_000_000,
31//! };
32//!
33//! // Get scaling recommendations
34//! let recommendations = advisor.analyze(&metrics)?;
35//! for rec in &recommendations.actions {
36//!     println!("📊 {}: {}", rec.action_type, rec.description);
37//!     println!("   Impact: {}", rec.expected_impact);
38//! }
39//! # Ok(())
40//! # }
41//! ```
42
43use ipfrs_core::Result;
44use serde::{Deserialize, Serialize};
45use std::time::Duration;
46
47/// Workload metrics for a semantic search system
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct WorkloadMetrics {
50    /// Queries per second
51    pub queries_per_second: f64,
52    /// Average query latency
53    pub avg_latency: Duration,
54    /// P99 latency
55    pub p99_latency: Duration,
56    /// Memory usage in MB
57    pub memory_usage_mb: f64,
58    /// CPU utilization (0.0 to 1.0)
59    pub cpu_utilization: f64,
60    /// Cache hit rate (0.0 to 1.0)
61    pub cache_hit_rate: f64,
62    /// Total index size (number of vectors)
63    pub index_size: usize,
64}
65
66/// Scaling action type
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub enum ActionType {
69    /// Increase cache size
70    IncreaseCache,
71    /// Add more replicas
72    ScaleHorizontally,
73    /// Increase CPU/memory
74    ScaleVertically,
75    /// Optimize index parameters
76    OptimizeParameters,
77    /// Enable compression/quantization
78    EnableCompression,
79    /// Add warmup cache
80    AddWarmupCache,
81    /// No action needed
82    NoAction,
83}
84
85impl std::fmt::Display for ActionType {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        match self {
88            ActionType::IncreaseCache => write!(f, "Increase Cache"),
89            ActionType::ScaleHorizontally => write!(f, "Scale Horizontally"),
90            ActionType::ScaleVertically => write!(f, "Scale Vertically"),
91            ActionType::OptimizeParameters => write!(f, "Optimize Parameters"),
92            ActionType::EnableCompression => write!(f, "Enable Compression"),
93            ActionType::AddWarmupCache => write!(f, "Add Warmup Cache"),
94            ActionType::NoAction => write!(f, "No Action"),
95        }
96    }
97}
98
99/// A specific scaling recommendation
100#[derive(Debug, Clone)]
101pub struct ScalingAction {
102    /// Type of action
103    pub action_type: ActionType,
104    /// Priority (0.0 to 1.0, where 1.0 is highest)
105    pub priority: f64,
106    /// Description of the action
107    pub description: String,
108    /// Expected impact
109    pub expected_impact: String,
110    /// Estimated cost (relative units)
111    pub cost_estimate: f64,
112}
113
114/// Scaling recommendations report
115#[derive(Debug, Clone)]
116pub struct ScalingRecommendations {
117    /// Current system health score (0.0 to 1.0)
118    pub health_score: f64,
119    /// Predicted capacity before overload
120    pub capacity_headroom: f64,
121    /// Recommended actions
122    pub actions: Vec<ScalingAction>,
123    /// Cost-benefit analysis
124    pub cost_benefit_ratio: f64,
125}
126
127/// Configuration for auto-scaling advisor
128#[derive(Debug, Clone)]
129pub struct AdvisorConfig {
130    /// Target P99 latency threshold (ms)
131    pub target_p99_latency_ms: u64,
132    /// Target CPU utilization (0.0 to 1.0)
133    pub target_cpu_utilization: f64,
134    /// Minimum cache hit rate
135    pub min_cache_hit_rate: f64,
136    /// Target queries per second capacity
137    pub target_qps_capacity: f64,
138}
139
140impl Default for AdvisorConfig {
141    fn default() -> Self {
142        Self {
143            target_p99_latency_ms: 100,   // 100ms P99
144            target_cpu_utilization: 0.70, // 70% CPU target
145            min_cache_hit_rate: 0.75,     // 75% cache hit rate
146            target_qps_capacity: 1000.0,  // 1000 QPS
147        }
148    }
149}
150
151/// Auto-scaling advisor
152pub struct AutoScalingAdvisor {
153    /// Configuration
154    config: AdvisorConfig,
155    /// Historical metrics
156    history: Vec<WorkloadMetrics>,
157}
158
159impl AutoScalingAdvisor {
160    /// Create a new advisor with default config
161    pub fn new() -> Self {
162        Self {
163            config: AdvisorConfig::default(),
164            history: Vec::new(),
165        }
166    }
167
168    /// Create an advisor with custom config
169    pub fn with_config(config: AdvisorConfig) -> Self {
170        Self {
171            config,
172            history: Vec::new(),
173        }
174    }
175
176    /// Record workload metrics
177    pub fn record(&mut self, metrics: WorkloadMetrics) {
178        self.history.push(metrics);
179
180        // Keep only last 1000 samples
181        if self.history.len() > 1000 {
182            self.history.remove(0);
183        }
184    }
185
186    /// Analyze current workload and generate recommendations
187    pub fn analyze(&self, current: &WorkloadMetrics) -> Result<ScalingRecommendations> {
188        let mut actions = Vec::new();
189
190        // Check P99 latency
191        let p99_ms = current.p99_latency.as_millis() as u64;
192        if p99_ms > self.config.target_p99_latency_ms {
193            let latency_ratio = p99_ms as f64 / self.config.target_p99_latency_ms as f64;
194
195            if latency_ratio > 2.0 {
196                // Severe latency issues - need horizontal scaling
197                actions.push(ScalingAction {
198                    action_type: ActionType::ScaleHorizontally,
199                    priority: 0.9,
200                    description: format!(
201                        "Add replicas to handle load. Current P99: {}ms, Target: {}ms",
202                        p99_ms, self.config.target_p99_latency_ms
203                    ),
204                    expected_impact: format!(
205                        "Reduce P99 latency by ~{}%",
206                        ((latency_ratio - 1.0) * 50.0).min(70.0) as i32
207                    ),
208                    cost_estimate: latency_ratio * 10.0,
209                });
210            } else {
211                // Moderate latency - optimize parameters
212                actions.push(ScalingAction {
213                    action_type: ActionType::OptimizeParameters,
214                    priority: 0.6,
215                    description: format!(
216                        "Optimize HNSW parameters (reduce ef_search). Current P99: {}ms",
217                        p99_ms
218                    ),
219                    expected_impact: "Reduce P99 latency by 20-30% with minimal accuracy loss"
220                        .to_string(),
221                    cost_estimate: 0.5,
222                });
223            }
224        }
225
226        // Check CPU utilization
227        if current.cpu_utilization > 0.85 {
228            actions.push(ScalingAction {
229                action_type: ActionType::ScaleVertically,
230                priority: 0.8,
231                description: format!(
232                    "Increase CPU resources. Current: {:.1}%, Saturated at >85%",
233                    current.cpu_utilization * 100.0
234                ),
235                expected_impact: "Increase query throughput by 30-50%".to_string(),
236                cost_estimate: current.cpu_utilization * 8.0,
237            });
238        }
239
240        // Check cache hit rate
241        if current.cache_hit_rate < self.config.min_cache_hit_rate {
242            actions.push(ScalingAction {
243                action_type: ActionType::IncreaseCache,
244                priority: 0.7,
245                description: format!(
246                    "Increase cache size. Current hit rate: {:.1}%, Target: {:.1}%",
247                    current.cache_hit_rate * 100.0,
248                    self.config.min_cache_hit_rate * 100.0
249                ),
250                expected_impact: format!(
251                    "Improve hit rate by {:.0}%, reduce latency by 15-25%",
252                    (self.config.min_cache_hit_rate - current.cache_hit_rate) * 100.0
253                ),
254                cost_estimate: 3.0,
255            });
256        }
257
258        // Check memory pressure for large indices
259        if current.index_size > 5_000_000 && current.memory_usage_mb > 8192.0 {
260            actions.push(ScalingAction {
261                action_type: ActionType::EnableCompression,
262                priority: 0.65,
263                description: format!(
264                    "Enable quantization for {} vectors using {}MB memory",
265                    current.index_size, current.memory_usage_mb
266                ),
267                expected_impact: "Reduce memory by 4-8x with <5% accuracy loss".to_string(),
268                cost_estimate: 1.0,
269            });
270        }
271
272        // Sort actions by priority
273        actions.sort_by(|a, b| b.priority.partial_cmp(&a.priority).unwrap());
274
275        // Calculate health score
276        let health_score = self.calculate_health_score(current);
277
278        // Calculate capacity headroom
279        let capacity_headroom = self.calculate_capacity_headroom(current);
280
281        // Calculate cost-benefit ratio
282        let cost_benefit_ratio = if actions.is_empty() {
283            0.0
284        } else {
285            let total_benefit: f64 = actions.iter().map(|a| a.priority).sum();
286            let total_cost: f64 = actions.iter().map(|a| a.cost_estimate).sum();
287            if total_cost > 0.0 {
288                total_benefit / total_cost
289            } else {
290                0.0
291            }
292        };
293
294        Ok(ScalingRecommendations {
295            health_score,
296            capacity_headroom,
297            actions,
298            cost_benefit_ratio,
299        })
300    }
301
302    /// Calculate system health score (0.0 to 1.0)
303    fn calculate_health_score(&self, metrics: &WorkloadMetrics) -> f64 {
304        let mut score = 1.0;
305
306        // Penalty for high latency
307        let p99_ms = metrics.p99_latency.as_millis() as u64;
308        if p99_ms > self.config.target_p99_latency_ms {
309            let latency_penalty =
310                (p99_ms as f64 / self.config.target_p99_latency_ms as f64 - 1.0) * 0.3;
311            score -= latency_penalty.min(0.4);
312        }
313
314        // Penalty for high CPU
315        if metrics.cpu_utilization > self.config.target_cpu_utilization {
316            let cpu_penalty = (metrics.cpu_utilization - self.config.target_cpu_utilization) * 0.5;
317            score -= cpu_penalty.min(0.3);
318        }
319
320        // Penalty for low cache hit rate
321        if metrics.cache_hit_rate < self.config.min_cache_hit_rate {
322            let cache_penalty = (self.config.min_cache_hit_rate - metrics.cache_hit_rate) * 0.3;
323            score -= cache_penalty.min(0.2);
324        }
325
326        score.max(0.0)
327    }
328
329    /// Calculate capacity headroom (how much more load can be handled)
330    fn calculate_capacity_headroom(&self, metrics: &WorkloadMetrics) -> f64 {
331        // Estimate based on CPU utilization and current QPS
332        let _cpu_headroom = (1.0 - metrics.cpu_utilization).max(0.0);
333        let estimated_max_qps = metrics.queries_per_second / metrics.cpu_utilization;
334        let additional_capacity = estimated_max_qps - metrics.queries_per_second;
335
336        (additional_capacity / metrics.queries_per_second).clamp(0.0, 2.0)
337    }
338
339    /// Get historical trend analysis
340    pub fn trend_analysis(&self) -> TrendReport {
341        if self.history.len() < 2 {
342            return TrendReport::default();
343        }
344
345        let recent = &self.history[self.history.len().saturating_sub(10)..];
346
347        let avg_qps: f64 =
348            recent.iter().map(|m| m.queries_per_second).sum::<f64>() / recent.len() as f64;
349        let avg_cpu: f64 =
350            recent.iter().map(|m| m.cpu_utilization).sum::<f64>() / recent.len() as f64;
351        let avg_cache_hit: f64 =
352            recent.iter().map(|m| m.cache_hit_rate).sum::<f64>() / recent.len() as f64;
353
354        // Calculate trends
355        let qps_trend = if recent.len() > 1 {
356            (recent.last().unwrap().queries_per_second - recent[0].queries_per_second)
357                / recent[0].queries_per_second
358        } else {
359            0.0
360        };
361
362        TrendReport {
363            avg_qps,
364            avg_cpu_utilization: avg_cpu,
365            avg_cache_hit_rate: avg_cache_hit,
366            qps_trend_percent: qps_trend * 100.0,
367            sample_count: recent.len(),
368        }
369    }
370}
371
372impl Default for AutoScalingAdvisor {
373    fn default() -> Self {
374        Self::new()
375    }
376}
377
378/// Trend analysis report
379#[derive(Debug, Clone, Default)]
380pub struct TrendReport {
381    /// Average QPS over recent period
382    pub avg_qps: f64,
383    /// Average CPU utilization
384    pub avg_cpu_utilization: f64,
385    /// Average cache hit rate
386    pub avg_cache_hit_rate: f64,
387    /// QPS trend (percent change)
388    pub qps_trend_percent: f64,
389    /// Number of samples analyzed
390    pub sample_count: usize,
391}
392
393#[cfg(test)]
394mod tests {
395    use super::*;
396
397    #[test]
398    fn test_advisor_creation() {
399        let advisor = AutoScalingAdvisor::new();
400        assert_eq!(advisor.history.len(), 0);
401    }
402
403    #[test]
404    fn test_healthy_system() {
405        let advisor = AutoScalingAdvisor::new();
406
407        let metrics = WorkloadMetrics {
408            queries_per_second: 500.0,
409            avg_latency: Duration::from_millis(5),
410            p99_latency: Duration::from_millis(20),
411            memory_usage_mb: 2048.0,
412            cpu_utilization: 0.50,
413            cache_hit_rate: 0.85,
414            index_size: 1_000_000,
415        };
416
417        let recommendations = advisor.analyze(&metrics).unwrap();
418        assert!(recommendations.health_score > 0.8);
419        assert!(recommendations.actions.is_empty() || recommendations.actions[0].priority < 0.5);
420    }
421
422    #[test]
423    fn test_high_latency_detection() {
424        let advisor = AutoScalingAdvisor::new();
425
426        let metrics = WorkloadMetrics {
427            queries_per_second: 1500.0,
428            avg_latency: Duration::from_millis(50),
429            p99_latency: Duration::from_millis(250), // Very high!
430            memory_usage_mb: 4096.0,
431            cpu_utilization: 0.85,
432            cache_hit_rate: 0.60,
433            index_size: 10_000_000,
434        };
435
436        let recommendations = advisor.analyze(&metrics).unwrap();
437        assert!(recommendations.health_score < 0.7);
438        assert!(!recommendations.actions.is_empty());
439        assert!(recommendations
440            .actions
441            .iter()
442            .any(|a| a.action_type == ActionType::ScaleHorizontally));
443    }
444
445    #[test]
446    fn test_low_cache_hit_rate() {
447        let advisor = AutoScalingAdvisor::new();
448
449        let metrics = WorkloadMetrics {
450            queries_per_second: 1000.0,
451            avg_latency: Duration::from_millis(10),
452            p99_latency: Duration::from_millis(50),
453            memory_usage_mb: 2048.0,
454            cpu_utilization: 0.60,
455            cache_hit_rate: 0.40, // Very low!
456            index_size: 5_000_000,
457        };
458
459        let recommendations = advisor.analyze(&metrics).unwrap();
460        assert!(recommendations
461            .actions
462            .iter()
463            .any(|a| a.action_type == ActionType::IncreaseCache));
464    }
465
466    #[test]
467    fn test_high_cpu_utilization() {
468        let advisor = AutoScalingAdvisor::new();
469
470        let metrics = WorkloadMetrics {
471            queries_per_second: 2000.0,
472            avg_latency: Duration::from_millis(15),
473            p99_latency: Duration::from_millis(60),
474            memory_usage_mb: 4096.0,
475            cpu_utilization: 0.92, // Very high!
476            cache_hit_rate: 0.80,
477            index_size: 8_000_000,
478        };
479
480        let recommendations = advisor.analyze(&metrics).unwrap();
481        assert!(recommendations
482            .actions
483            .iter()
484            .any(|a| a.action_type == ActionType::ScaleVertically));
485    }
486
487    #[test]
488    fn test_compression_recommendation() {
489        let advisor = AutoScalingAdvisor::new();
490
491        let metrics = WorkloadMetrics {
492            queries_per_second: 1000.0,
493            avg_latency: Duration::from_millis(10),
494            p99_latency: Duration::from_millis(50),
495            memory_usage_mb: 10000.0, // High memory usage
496            cpu_utilization: 0.60,
497            cache_hit_rate: 0.80,
498            index_size: 10_000_000, // Large index
499        };
500
501        let recommendations = advisor.analyze(&metrics).unwrap();
502        assert!(recommendations
503            .actions
504            .iter()
505            .any(|a| a.action_type == ActionType::EnableCompression));
506    }
507
508    #[test]
509    fn test_record_metrics() {
510        let mut advisor = AutoScalingAdvisor::new();
511
512        let metrics = WorkloadMetrics {
513            queries_per_second: 1000.0,
514            avg_latency: Duration::from_millis(10),
515            p99_latency: Duration::from_millis(50),
516            memory_usage_mb: 2048.0,
517            cpu_utilization: 0.60,
518            cache_hit_rate: 0.80,
519            index_size: 5_000_000,
520        };
521
522        advisor.record(metrics.clone());
523        advisor.record(metrics);
524
525        assert_eq!(advisor.history.len(), 2);
526    }
527
528    #[test]
529    fn test_capacity_headroom() {
530        let advisor = AutoScalingAdvisor::new();
531
532        let metrics = WorkloadMetrics {
533            queries_per_second: 1000.0,
534            avg_latency: Duration::from_millis(10),
535            p99_latency: Duration::from_millis(50),
536            memory_usage_mb: 2048.0,
537            cpu_utilization: 0.50, // 50% CPU means 100% headroom
538            cache_hit_rate: 0.80,
539            index_size: 5_000_000,
540        };
541
542        let recommendations = advisor.analyze(&metrics).unwrap();
543        assert!(recommendations.capacity_headroom > 0.5);
544    }
545
546    #[test]
547    fn test_trend_analysis() {
548        let mut advisor = AutoScalingAdvisor::new();
549
550        for i in 0..10 {
551            let metrics = WorkloadMetrics {
552                queries_per_second: 1000.0 + (i as f64 * 100.0),
553                avg_latency: Duration::from_millis(10),
554                p99_latency: Duration::from_millis(50),
555                memory_usage_mb: 2048.0,
556                cpu_utilization: 0.60,
557                cache_hit_rate: 0.80,
558                index_size: 5_000_000,
559            };
560            advisor.record(metrics);
561        }
562
563        let trend = advisor.trend_analysis();
564        assert_eq!(trend.sample_count, 10);
565        assert!(trend.qps_trend_percent > 0.0); // Increasing trend
566    }
567
568    #[test]
569    fn test_custom_config() {
570        let config = AdvisorConfig {
571            target_p99_latency_ms: 50,
572            target_cpu_utilization: 0.80,
573            min_cache_hit_rate: 0.90,
574            target_qps_capacity: 5000.0,
575        };
576
577        let advisor = AutoScalingAdvisor::with_config(config);
578
579        let metrics = WorkloadMetrics {
580            queries_per_second: 1000.0,
581            avg_latency: Duration::from_millis(10),
582            p99_latency: Duration::from_millis(75), // Over custom target
583            memory_usage_mb: 2048.0,
584            cpu_utilization: 0.70,
585            cache_hit_rate: 0.85, // Below custom target
586            index_size: 5_000_000,
587        };
588
589        let recommendations = advisor.analyze(&metrics).unwrap();
590        assert!(!recommendations.actions.is_empty());
591    }
592
593    #[test]
594    fn test_action_priority_ordering() {
595        let advisor = AutoScalingAdvisor::new();
596
597        let metrics = WorkloadMetrics {
598            queries_per_second: 2000.0,
599            avg_latency: Duration::from_millis(50),
600            p99_latency: Duration::from_millis(300), // Critical
601            memory_usage_mb: 10000.0,
602            cpu_utilization: 0.95, // Critical
603            cache_hit_rate: 0.40,  // Poor
604            index_size: 10_000_000,
605        };
606
607        let recommendations = advisor.analyze(&metrics).unwrap();
608
609        // Actions should be sorted by priority
610        for i in 1..recommendations.actions.len() {
611            assert!(recommendations.actions[i - 1].priority >= recommendations.actions[i].priority);
612        }
613    }
614}