1use ipfrs_core::Result;
44use serde::{Deserialize, Serialize};
45use std::time::Duration;
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct WorkloadMetrics {
50 pub queries_per_second: f64,
52 pub avg_latency: Duration,
54 pub p99_latency: Duration,
56 pub memory_usage_mb: f64,
58 pub cpu_utilization: f64,
60 pub cache_hit_rate: f64,
62 pub index_size: usize,
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub enum ActionType {
69 IncreaseCache,
71 ScaleHorizontally,
73 ScaleVertically,
75 OptimizeParameters,
77 EnableCompression,
79 AddWarmupCache,
81 NoAction,
83}
84
85impl std::fmt::Display for ActionType {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 match self {
88 ActionType::IncreaseCache => write!(f, "Increase Cache"),
89 ActionType::ScaleHorizontally => write!(f, "Scale Horizontally"),
90 ActionType::ScaleVertically => write!(f, "Scale Vertically"),
91 ActionType::OptimizeParameters => write!(f, "Optimize Parameters"),
92 ActionType::EnableCompression => write!(f, "Enable Compression"),
93 ActionType::AddWarmupCache => write!(f, "Add Warmup Cache"),
94 ActionType::NoAction => write!(f, "No Action"),
95 }
96 }
97}
98
99#[derive(Debug, Clone)]
101pub struct ScalingAction {
102 pub action_type: ActionType,
104 pub priority: f64,
106 pub description: String,
108 pub expected_impact: String,
110 pub cost_estimate: f64,
112}
113
114#[derive(Debug, Clone)]
116pub struct ScalingRecommendations {
117 pub health_score: f64,
119 pub capacity_headroom: f64,
121 pub actions: Vec<ScalingAction>,
123 pub cost_benefit_ratio: f64,
125}
126
127#[derive(Debug, Clone)]
129pub struct AdvisorConfig {
130 pub target_p99_latency_ms: u64,
132 pub target_cpu_utilization: f64,
134 pub min_cache_hit_rate: f64,
136 pub target_qps_capacity: f64,
138}
139
140impl Default for AdvisorConfig {
141 fn default() -> Self {
142 Self {
143 target_p99_latency_ms: 100, target_cpu_utilization: 0.70, min_cache_hit_rate: 0.75, target_qps_capacity: 1000.0, }
148 }
149}
150
151pub struct AutoScalingAdvisor {
153 config: AdvisorConfig,
155 history: Vec<WorkloadMetrics>,
157}
158
159impl AutoScalingAdvisor {
160 pub fn new() -> Self {
162 Self {
163 config: AdvisorConfig::default(),
164 history: Vec::new(),
165 }
166 }
167
168 pub fn with_config(config: AdvisorConfig) -> Self {
170 Self {
171 config,
172 history: Vec::new(),
173 }
174 }
175
176 pub fn record(&mut self, metrics: WorkloadMetrics) {
178 self.history.push(metrics);
179
180 if self.history.len() > 1000 {
182 self.history.remove(0);
183 }
184 }
185
186 pub fn analyze(&self, current: &WorkloadMetrics) -> Result<ScalingRecommendations> {
188 let mut actions = Vec::new();
189
190 let p99_ms = current.p99_latency.as_millis() as u64;
192 if p99_ms > self.config.target_p99_latency_ms {
193 let latency_ratio = p99_ms as f64 / self.config.target_p99_latency_ms as f64;
194
195 if latency_ratio > 2.0 {
196 actions.push(ScalingAction {
198 action_type: ActionType::ScaleHorizontally,
199 priority: 0.9,
200 description: format!(
201 "Add replicas to handle load. Current P99: {}ms, Target: {}ms",
202 p99_ms, self.config.target_p99_latency_ms
203 ),
204 expected_impact: format!(
205 "Reduce P99 latency by ~{}%",
206 ((latency_ratio - 1.0) * 50.0).min(70.0) as i32
207 ),
208 cost_estimate: latency_ratio * 10.0,
209 });
210 } else {
211 actions.push(ScalingAction {
213 action_type: ActionType::OptimizeParameters,
214 priority: 0.6,
215 description: format!(
216 "Optimize HNSW parameters (reduce ef_search). Current P99: {}ms",
217 p99_ms
218 ),
219 expected_impact: "Reduce P99 latency by 20-30% with minimal accuracy loss"
220 .to_string(),
221 cost_estimate: 0.5,
222 });
223 }
224 }
225
226 if current.cpu_utilization > 0.85 {
228 actions.push(ScalingAction {
229 action_type: ActionType::ScaleVertically,
230 priority: 0.8,
231 description: format!(
232 "Increase CPU resources. Current: {:.1}%, Saturated at >85%",
233 current.cpu_utilization * 100.0
234 ),
235 expected_impact: "Increase query throughput by 30-50%".to_string(),
236 cost_estimate: current.cpu_utilization * 8.0,
237 });
238 }
239
240 if current.cache_hit_rate < self.config.min_cache_hit_rate {
242 actions.push(ScalingAction {
243 action_type: ActionType::IncreaseCache,
244 priority: 0.7,
245 description: format!(
246 "Increase cache size. Current hit rate: {:.1}%, Target: {:.1}%",
247 current.cache_hit_rate * 100.0,
248 self.config.min_cache_hit_rate * 100.0
249 ),
250 expected_impact: format!(
251 "Improve hit rate by {:.0}%, reduce latency by 15-25%",
252 (self.config.min_cache_hit_rate - current.cache_hit_rate) * 100.0
253 ),
254 cost_estimate: 3.0,
255 });
256 }
257
258 if current.index_size > 5_000_000 && current.memory_usage_mb > 8192.0 {
260 actions.push(ScalingAction {
261 action_type: ActionType::EnableCompression,
262 priority: 0.65,
263 description: format!(
264 "Enable quantization for {} vectors using {}MB memory",
265 current.index_size, current.memory_usage_mb
266 ),
267 expected_impact: "Reduce memory by 4-8x with <5% accuracy loss".to_string(),
268 cost_estimate: 1.0,
269 });
270 }
271
272 actions.sort_by(|a, b| b.priority.partial_cmp(&a.priority).unwrap());
274
275 let health_score = self.calculate_health_score(current);
277
278 let capacity_headroom = self.calculate_capacity_headroom(current);
280
281 let cost_benefit_ratio = if actions.is_empty() {
283 0.0
284 } else {
285 let total_benefit: f64 = actions.iter().map(|a| a.priority).sum();
286 let total_cost: f64 = actions.iter().map(|a| a.cost_estimate).sum();
287 if total_cost > 0.0 {
288 total_benefit / total_cost
289 } else {
290 0.0
291 }
292 };
293
294 Ok(ScalingRecommendations {
295 health_score,
296 capacity_headroom,
297 actions,
298 cost_benefit_ratio,
299 })
300 }
301
302 fn calculate_health_score(&self, metrics: &WorkloadMetrics) -> f64 {
304 let mut score = 1.0;
305
306 let p99_ms = metrics.p99_latency.as_millis() as u64;
308 if p99_ms > self.config.target_p99_latency_ms {
309 let latency_penalty =
310 (p99_ms as f64 / self.config.target_p99_latency_ms as f64 - 1.0) * 0.3;
311 score -= latency_penalty.min(0.4);
312 }
313
314 if metrics.cpu_utilization > self.config.target_cpu_utilization {
316 let cpu_penalty = (metrics.cpu_utilization - self.config.target_cpu_utilization) * 0.5;
317 score -= cpu_penalty.min(0.3);
318 }
319
320 if metrics.cache_hit_rate < self.config.min_cache_hit_rate {
322 let cache_penalty = (self.config.min_cache_hit_rate - metrics.cache_hit_rate) * 0.3;
323 score -= cache_penalty.min(0.2);
324 }
325
326 score.max(0.0)
327 }
328
329 fn calculate_capacity_headroom(&self, metrics: &WorkloadMetrics) -> f64 {
331 let _cpu_headroom = (1.0 - metrics.cpu_utilization).max(0.0);
333 let estimated_max_qps = metrics.queries_per_second / metrics.cpu_utilization;
334 let additional_capacity = estimated_max_qps - metrics.queries_per_second;
335
336 (additional_capacity / metrics.queries_per_second).clamp(0.0, 2.0)
337 }
338
339 pub fn trend_analysis(&self) -> TrendReport {
341 if self.history.len() < 2 {
342 return TrendReport::default();
343 }
344
345 let recent = &self.history[self.history.len().saturating_sub(10)..];
346
347 let avg_qps: f64 =
348 recent.iter().map(|m| m.queries_per_second).sum::<f64>() / recent.len() as f64;
349 let avg_cpu: f64 =
350 recent.iter().map(|m| m.cpu_utilization).sum::<f64>() / recent.len() as f64;
351 let avg_cache_hit: f64 =
352 recent.iter().map(|m| m.cache_hit_rate).sum::<f64>() / recent.len() as f64;
353
354 let qps_trend = if recent.len() > 1 {
356 (recent.last().unwrap().queries_per_second - recent[0].queries_per_second)
357 / recent[0].queries_per_second
358 } else {
359 0.0
360 };
361
362 TrendReport {
363 avg_qps,
364 avg_cpu_utilization: avg_cpu,
365 avg_cache_hit_rate: avg_cache_hit,
366 qps_trend_percent: qps_trend * 100.0,
367 sample_count: recent.len(),
368 }
369 }
370}
371
372impl Default for AutoScalingAdvisor {
373 fn default() -> Self {
374 Self::new()
375 }
376}
377
378#[derive(Debug, Clone, Default)]
380pub struct TrendReport {
381 pub avg_qps: f64,
383 pub avg_cpu_utilization: f64,
385 pub avg_cache_hit_rate: f64,
387 pub qps_trend_percent: f64,
389 pub sample_count: usize,
391}
392
393#[cfg(test)]
394mod tests {
395 use super::*;
396
397 #[test]
398 fn test_advisor_creation() {
399 let advisor = AutoScalingAdvisor::new();
400 assert_eq!(advisor.history.len(), 0);
401 }
402
403 #[test]
404 fn test_healthy_system() {
405 let advisor = AutoScalingAdvisor::new();
406
407 let metrics = WorkloadMetrics {
408 queries_per_second: 500.0,
409 avg_latency: Duration::from_millis(5),
410 p99_latency: Duration::from_millis(20),
411 memory_usage_mb: 2048.0,
412 cpu_utilization: 0.50,
413 cache_hit_rate: 0.85,
414 index_size: 1_000_000,
415 };
416
417 let recommendations = advisor.analyze(&metrics).unwrap();
418 assert!(recommendations.health_score > 0.8);
419 assert!(recommendations.actions.is_empty() || recommendations.actions[0].priority < 0.5);
420 }
421
422 #[test]
423 fn test_high_latency_detection() {
424 let advisor = AutoScalingAdvisor::new();
425
426 let metrics = WorkloadMetrics {
427 queries_per_second: 1500.0,
428 avg_latency: Duration::from_millis(50),
429 p99_latency: Duration::from_millis(250), memory_usage_mb: 4096.0,
431 cpu_utilization: 0.85,
432 cache_hit_rate: 0.60,
433 index_size: 10_000_000,
434 };
435
436 let recommendations = advisor.analyze(&metrics).unwrap();
437 assert!(recommendations.health_score < 0.7);
438 assert!(!recommendations.actions.is_empty());
439 assert!(recommendations
440 .actions
441 .iter()
442 .any(|a| a.action_type == ActionType::ScaleHorizontally));
443 }
444
445 #[test]
446 fn test_low_cache_hit_rate() {
447 let advisor = AutoScalingAdvisor::new();
448
449 let metrics = WorkloadMetrics {
450 queries_per_second: 1000.0,
451 avg_latency: Duration::from_millis(10),
452 p99_latency: Duration::from_millis(50),
453 memory_usage_mb: 2048.0,
454 cpu_utilization: 0.60,
455 cache_hit_rate: 0.40, index_size: 5_000_000,
457 };
458
459 let recommendations = advisor.analyze(&metrics).unwrap();
460 assert!(recommendations
461 .actions
462 .iter()
463 .any(|a| a.action_type == ActionType::IncreaseCache));
464 }
465
466 #[test]
467 fn test_high_cpu_utilization() {
468 let advisor = AutoScalingAdvisor::new();
469
470 let metrics = WorkloadMetrics {
471 queries_per_second: 2000.0,
472 avg_latency: Duration::from_millis(15),
473 p99_latency: Duration::from_millis(60),
474 memory_usage_mb: 4096.0,
475 cpu_utilization: 0.92, cache_hit_rate: 0.80,
477 index_size: 8_000_000,
478 };
479
480 let recommendations = advisor.analyze(&metrics).unwrap();
481 assert!(recommendations
482 .actions
483 .iter()
484 .any(|a| a.action_type == ActionType::ScaleVertically));
485 }
486
487 #[test]
488 fn test_compression_recommendation() {
489 let advisor = AutoScalingAdvisor::new();
490
491 let metrics = WorkloadMetrics {
492 queries_per_second: 1000.0,
493 avg_latency: Duration::from_millis(10),
494 p99_latency: Duration::from_millis(50),
495 memory_usage_mb: 10000.0, cpu_utilization: 0.60,
497 cache_hit_rate: 0.80,
498 index_size: 10_000_000, };
500
501 let recommendations = advisor.analyze(&metrics).unwrap();
502 assert!(recommendations
503 .actions
504 .iter()
505 .any(|a| a.action_type == ActionType::EnableCompression));
506 }
507
508 #[test]
509 fn test_record_metrics() {
510 let mut advisor = AutoScalingAdvisor::new();
511
512 let metrics = WorkloadMetrics {
513 queries_per_second: 1000.0,
514 avg_latency: Duration::from_millis(10),
515 p99_latency: Duration::from_millis(50),
516 memory_usage_mb: 2048.0,
517 cpu_utilization: 0.60,
518 cache_hit_rate: 0.80,
519 index_size: 5_000_000,
520 };
521
522 advisor.record(metrics.clone());
523 advisor.record(metrics);
524
525 assert_eq!(advisor.history.len(), 2);
526 }
527
528 #[test]
529 fn test_capacity_headroom() {
530 let advisor = AutoScalingAdvisor::new();
531
532 let metrics = WorkloadMetrics {
533 queries_per_second: 1000.0,
534 avg_latency: Duration::from_millis(10),
535 p99_latency: Duration::from_millis(50),
536 memory_usage_mb: 2048.0,
537 cpu_utilization: 0.50, cache_hit_rate: 0.80,
539 index_size: 5_000_000,
540 };
541
542 let recommendations = advisor.analyze(&metrics).unwrap();
543 assert!(recommendations.capacity_headroom > 0.5);
544 }
545
546 #[test]
547 fn test_trend_analysis() {
548 let mut advisor = AutoScalingAdvisor::new();
549
550 for i in 0..10 {
551 let metrics = WorkloadMetrics {
552 queries_per_second: 1000.0 + (i as f64 * 100.0),
553 avg_latency: Duration::from_millis(10),
554 p99_latency: Duration::from_millis(50),
555 memory_usage_mb: 2048.0,
556 cpu_utilization: 0.60,
557 cache_hit_rate: 0.80,
558 index_size: 5_000_000,
559 };
560 advisor.record(metrics);
561 }
562
563 let trend = advisor.trend_analysis();
564 assert_eq!(trend.sample_count, 10);
565 assert!(trend.qps_trend_percent > 0.0); }
567
568 #[test]
569 fn test_custom_config() {
570 let config = AdvisorConfig {
571 target_p99_latency_ms: 50,
572 target_cpu_utilization: 0.80,
573 min_cache_hit_rate: 0.90,
574 target_qps_capacity: 5000.0,
575 };
576
577 let advisor = AutoScalingAdvisor::with_config(config);
578
579 let metrics = WorkloadMetrics {
580 queries_per_second: 1000.0,
581 avg_latency: Duration::from_millis(10),
582 p99_latency: Duration::from_millis(75), memory_usage_mb: 2048.0,
584 cpu_utilization: 0.70,
585 cache_hit_rate: 0.85, index_size: 5_000_000,
587 };
588
589 let recommendations = advisor.analyze(&metrics).unwrap();
590 assert!(!recommendations.actions.is_empty());
591 }
592
593 #[test]
594 fn test_action_priority_ordering() {
595 let advisor = AutoScalingAdvisor::new();
596
597 let metrics = WorkloadMetrics {
598 queries_per_second: 2000.0,
599 avg_latency: Duration::from_millis(50),
600 p99_latency: Duration::from_millis(300), memory_usage_mb: 10000.0,
602 cpu_utilization: 0.95, cache_hit_rate: 0.40, index_size: 10_000_000,
605 };
606
607 let recommendations = advisor.analyze(&metrics).unwrap();
608
609 for i in 1..recommendations.actions.len() {
611 assert!(recommendations.actions[i - 1].priority >= recommendations.actions[i].priority);
612 }
613 }
614}