1use crate::{queue::queue_names, QueueMetrics, RedisBroker, TaskQueueError};
2use serde::{Deserialize, Serialize};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct AutoScalerConfig {
9 pub min_workers: usize,
10 pub max_workers: usize,
11 pub scale_up_count: usize,
12 pub scale_down_count: usize,
13
14 pub scaling_triggers: ScalingTriggers,
16
17 pub enable_adaptive_thresholds: bool,
19 pub learning_rate: f64,
20 pub adaptation_window_minutes: u32,
21
22 pub scale_up_cooldown_seconds: u64,
24 pub scale_down_cooldown_seconds: u64,
25 pub consecutive_signals_required: usize,
26
27 pub target_sla: SLATargets,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct ScalingTriggers {
34 pub queue_pressure_threshold: f64, pub worker_utilization_threshold: f64, pub task_complexity_threshold: f64, pub error_rate_threshold: f64, pub memory_pressure_threshold: f64, }
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct SLATargets {
44 pub max_p95_latency_ms: f64,
45 pub min_success_rate: f64,
46 pub max_queue_wait_time_ms: f64,
47 pub target_worker_utilization: f64,
48}
49
50impl Default for AutoScalerConfig {
51 fn default() -> Self {
52 Self {
53 min_workers: 1,
54 max_workers: 20,
55 scale_up_count: 2,
56 scale_down_count: 1,
57
58 scaling_triggers: ScalingTriggers {
59 queue_pressure_threshold: 0.75,
60 worker_utilization_threshold: 0.80,
61 task_complexity_threshold: 1.5,
62 error_rate_threshold: 0.05,
63 memory_pressure_threshold: 512.0,
64 },
65
66 enable_adaptive_thresholds: true,
67 learning_rate: 0.1,
68 adaptation_window_minutes: 30,
69
70 scale_up_cooldown_seconds: 60,
71 scale_down_cooldown_seconds: 300,
72 consecutive_signals_required: 2,
73
74 target_sla: SLATargets {
75 max_p95_latency_ms: 5000.0,
76 min_success_rate: 0.95,
77 max_queue_wait_time_ms: 10000.0,
78 target_worker_utilization: 0.70,
79 },
80 }
81 }
82}
83
84impl AutoScalerConfig {
85 pub fn validate(&self) -> Result<(), TaskQueueError> {
86 if self.min_workers == 0 {
87 return Err(TaskQueueError::Configuration(
88 "Minimum workers must be greater than 0".to_string(),
89 ));
90 }
91
92 if self.max_workers < self.min_workers {
93 return Err(TaskQueueError::Configuration(
94 "Maximum workers must be greater than or equal to minimum workers".to_string(),
95 ));
96 }
97
98 if self.max_workers > 1000 {
99 return Err(TaskQueueError::Configuration(
100 "Maximum workers cannot exceed 1000".to_string(),
101 ));
102 }
103
104 if self.scale_up_count == 0 || self.scale_up_count > 50 {
105 return Err(TaskQueueError::Configuration(
106 "Scale up count must be between 1 and 50".to_string(),
107 ));
108 }
109
110 if self.scale_down_count == 0 || self.scale_down_count > 50 {
111 return Err(TaskQueueError::Configuration(
112 "Scale down count must be between 1 and 50".to_string(),
113 ));
114 }
115
116 let triggers = &self.scaling_triggers;
118 if triggers.queue_pressure_threshold <= 0.0 || triggers.queue_pressure_threshold > 2.0 {
119 return Err(TaskQueueError::Configuration(
120 "Queue pressure threshold must be between 0.1 and 2.0".to_string(),
121 ));
122 }
123
124 if triggers.worker_utilization_threshold <= 0.0
125 || triggers.worker_utilization_threshold > 1.0
126 {
127 return Err(TaskQueueError::Configuration(
128 "Worker utilization threshold must be between 0.1 and 1.0".to_string(),
129 ));
130 }
131
132 if triggers.error_rate_threshold < 0.0 || triggers.error_rate_threshold > 1.0 {
133 return Err(TaskQueueError::Configuration(
134 "Error rate threshold must be between 0.0 and 1.0".to_string(),
135 ));
136 }
137
138 if self.learning_rate <= 0.0 || self.learning_rate > 1.0 {
139 return Err(TaskQueueError::Configuration(
140 "Learning rate must be between 0.01 and 1.0".to_string(),
141 ));
142 }
143
144 Ok(())
145 }
146}
147
148#[derive(Debug)]
149pub enum ScalingAction {
150 ScaleUp(usize),
151 ScaleDown(usize),
152 NoAction,
153}
154
155#[derive(Debug, Serialize, Deserialize)]
157pub struct AutoScalerMetrics {
158 pub active_workers: i64,
159 pub total_pending_tasks: i64,
160 pub queue_metrics: Vec<QueueMetrics>,
161
162 pub queue_pressure_score: f64,
164 pub worker_utilization: f64,
165 pub task_complexity_factor: f64,
166 pub error_rate: f64,
167 pub memory_pressure_mb: f64,
168 pub avg_queue_wait_time_ms: f64,
169 pub throughput_trend: f64,
170}
171
172#[derive(Debug)]
174pub struct AdaptiveThresholdController {
175 current_thresholds: ScalingTriggers,
176 performance_history: Vec<PerformanceSnapshot>,
177 last_adaptation: Instant,
178 learning_rate: f64,
179 target_sla: SLATargets,
180 adaptation_window: Duration,
181}
182
183#[derive(Debug, Clone)]
184#[allow(dead_code)] struct PerformanceSnapshot {
186 timestamp: Instant,
187 latency_p95_ms: f64,
188 success_rate: f64,
189 queue_wait_time_ms: f64,
190 worker_utilization: f64,
191 scaling_action_taken: Option<ScalingAction>,
192}
193
194#[derive(Debug)]
196pub struct HysteresisController {
197 scale_up_consecutive_signals: usize,
198 scale_down_consecutive_signals: usize,
199 required_consecutive_signals: usize,
200 last_scaling_action: Option<(Instant, ScalingAction)>,
201 scale_up_cooldown: Duration,
202 scale_down_cooldown: Duration,
203}
204
205pub struct AutoScaler {
206 broker: Arc<RedisBroker>,
207 config: AutoScalerConfig,
208 adaptive_controller: Option<AdaptiveThresholdController>,
209 hysteresis_controller: HysteresisController,
210 metrics_history: Vec<AutoScalerMetrics>,
211 start_time: Instant,
212}
213
214impl AutoScaler {
215 pub fn new(broker: Arc<RedisBroker>) -> Self {
216 let config = AutoScalerConfig::default();
217 Self::with_config(broker, config)
218 }
219
220 pub fn with_config(broker: Arc<RedisBroker>, config: AutoScalerConfig) -> Self {
221 let adaptive_controller = if config.enable_adaptive_thresholds {
222 Some(AdaptiveThresholdController::new(
223 config.scaling_triggers.clone(),
224 config.learning_rate,
225 config.target_sla.clone(),
226 Duration::from_secs(config.adaptation_window_minutes as u64 * 60),
227 ))
228 } else {
229 None
230 };
231
232 let hysteresis_controller = HysteresisController::new(
233 config.consecutive_signals_required,
234 Duration::from_secs(config.scale_up_cooldown_seconds),
235 Duration::from_secs(config.scale_down_cooldown_seconds),
236 );
237
238 Self {
239 broker,
240 config,
241 adaptive_controller,
242 hysteresis_controller,
243 metrics_history: Vec::new(),
244 start_time: Instant::now(),
245 }
246 }
247
248 pub async fn collect_metrics(&self) -> Result<AutoScalerMetrics, TaskQueueError> {
249 let active_workers = self.broker.get_active_workers().await?;
250
251 let queues = [
252 queue_names::DEFAULT,
253 queue_names::HIGH_PRIORITY,
254 queue_names::LOW_PRIORITY,
255 ];
256 let mut queue_metrics = Vec::new();
257 let mut total_pending_tasks = 0;
258 let mut total_processed_tasks = 0;
259 let mut total_failed_tasks = 0;
260
261 for queue in &queues {
262 let metrics = self.broker.get_queue_metrics(queue).await?;
263 total_pending_tasks += metrics.pending_tasks;
264 total_processed_tasks += metrics.processed_tasks;
265 total_failed_tasks += metrics.failed_tasks;
266 queue_metrics.push(metrics);
267 }
268
269 let queue_pressure_score =
271 self.calculate_queue_pressure_score(&queue_metrics, active_workers);
272 let worker_utilization = self.calculate_worker_utilization(active_workers, &queue_metrics);
273 let task_complexity_factor = self.calculate_task_complexity_factor(&queue_metrics);
274 let error_rate = self.calculate_error_rate(total_processed_tasks, total_failed_tasks);
275 let memory_pressure_mb = self.estimate_memory_pressure(active_workers);
276 let avg_queue_wait_time_ms = self.calculate_avg_queue_wait_time(&queue_metrics);
277 let throughput_trend = self.calculate_throughput_trend(&queue_metrics);
278
279 Ok(AutoScalerMetrics {
280 active_workers,
281 total_pending_tasks,
282 queue_metrics,
283 queue_pressure_score,
284 worker_utilization,
285 task_complexity_factor,
286 error_rate,
287 memory_pressure_mb,
288 avg_queue_wait_time_ms,
289 throughput_trend,
290 })
291 }
292
293 pub fn decide_scaling_action(
294 &mut self,
295 metrics: &AutoScalerMetrics,
296 ) -> Result<ScalingAction, TaskQueueError> {
297 let current_workers = metrics.active_workers as usize;
298
299 let thresholds = if let Some(ref adaptive) = self.adaptive_controller {
301 adaptive.get_current_thresholds().clone()
302 } else {
303 self.config.scaling_triggers.clone()
304 };
305
306 let scale_up_signals = Self::count_scale_up_signals_static(metrics, &thresholds);
308 let scale_down_signals = Self::count_scale_down_signals_static(metrics, &thresholds);
309
310 let proposed_action = if scale_up_signals >= 2 && current_workers < self.config.max_workers
311 {
312 let scale_count = std::cmp::min(
313 self.config.scale_up_count,
314 self.config.max_workers - current_workers,
315 );
316 ScalingAction::ScaleUp(scale_count)
317 } else if scale_down_signals >= 2 && current_workers > self.config.min_workers {
318 let scale_count = std::cmp::min(
319 self.config.scale_down_count,
320 current_workers - self.config.min_workers,
321 );
322 ScalingAction::ScaleDown(scale_count)
323 } else {
324 ScalingAction::NoAction
325 };
326
327 let final_action = if self
329 .hysteresis_controller
330 .should_execute_scaling(&proposed_action)
331 {
332 #[cfg(feature = "tracing")]
333 match &proposed_action {
334 ScalingAction::ScaleUp(count) => {
335 tracing::info!(
336 "Enhanced auto-scaling up: queue_pressure={:.2}, utilization={:.2}, complexity={:.2}, adding {} workers",
337 metrics.queue_pressure_score,
338 metrics.worker_utilization,
339 metrics.task_complexity_factor,
340 count
341 );
342 }
343 ScalingAction::ScaleDown(count) => {
344 tracing::info!(
345 "Enhanced auto-scaling down: queue_pressure={:.2}, utilization={:.2}, removing {} workers",
346 metrics.queue_pressure_score,
347 metrics.worker_utilization,
348 count
349 );
350 }
351 _ => {}
352 }
353
354 proposed_action
355 } else {
356 ScalingAction::NoAction
357 };
358
359 if let Some(ref mut adaptive) = self.adaptive_controller {
361 adaptive.record_performance_snapshot(metrics, &final_action);
362 }
363
364 self.metrics_history.push(metrics.clone());
366 if self.metrics_history.len() > 100 {
367 self.metrics_history.remove(0);
368 }
369
370 Ok(final_action)
371 }
372
373 fn count_scale_up_signals_static(
374 metrics: &AutoScalerMetrics,
375 thresholds: &ScalingTriggers,
376 ) -> usize {
377 let mut signals = 0;
378
379 if metrics.queue_pressure_score > thresholds.queue_pressure_threshold {
380 signals += 1;
381 }
382 if metrics.worker_utilization > thresholds.worker_utilization_threshold {
383 signals += 1;
384 }
385 if metrics.task_complexity_factor > thresholds.task_complexity_threshold {
386 signals += 1;
387 }
388 if metrics.error_rate > thresholds.error_rate_threshold {
389 signals += 1;
390 }
391 if metrics.memory_pressure_mb > thresholds.memory_pressure_threshold {
392 signals += 1;
393 }
394
395 signals
396 }
397
398 fn count_scale_down_signals_static(
399 metrics: &AutoScalerMetrics,
400 thresholds: &ScalingTriggers,
401 ) -> usize {
402 let mut signals = 0;
403
404 if metrics.queue_pressure_score < thresholds.queue_pressure_threshold * 0.3 {
406 signals += 1;
407 }
408 if metrics.worker_utilization < thresholds.worker_utilization_threshold * 0.4 {
409 signals += 1;
410 }
411 if metrics.task_complexity_factor < thresholds.task_complexity_threshold * 0.5 {
412 signals += 1;
413 }
414 if metrics.error_rate < thresholds.error_rate_threshold * 0.2 {
415 signals += 1;
416 }
417 if metrics.memory_pressure_mb < thresholds.memory_pressure_threshold * 0.5 {
418 signals += 1;
419 }
420
421 signals
422 }
423
424 fn calculate_queue_pressure_score(
425 &self,
426 queue_metrics: &[QueueMetrics],
427 active_workers: i64,
428 ) -> f64 {
429 let mut weighted_pressure = 0.0;
430 let queue_weights = [
431 (queue_names::HIGH_PRIORITY, 3.0),
432 (queue_names::DEFAULT, 1.0),
433 (queue_names::LOW_PRIORITY, 0.3),
434 ];
435
436 for metrics in queue_metrics {
437 let weight = queue_weights
438 .iter()
439 .find(|(name, _)| *name == metrics.queue_name)
440 .map(|(_, w)| *w)
441 .unwrap_or(1.0);
442
443 let queue_pressure = if active_workers > 0 {
444 metrics.pending_tasks as f64 / active_workers as f64
445 } else {
446 metrics.pending_tasks as f64
447 };
448
449 weighted_pressure += queue_pressure * weight;
450 }
451
452 weighted_pressure / queue_weights.len() as f64
453 }
454
455 fn calculate_worker_utilization(
456 &self,
457 active_workers: i64,
458 queue_metrics: &[QueueMetrics],
459 ) -> f64 {
460 if active_workers == 0 {
461 return 0.0;
462 }
463
464 let total_work = queue_metrics
465 .iter()
466 .map(|m| m.pending_tasks + m.processed_tasks)
467 .sum::<i64>();
468
469 let utilization = total_work as f64 / (active_workers as f64 * 100.0);
470 utilization.min(1.0)
471 }
472
473 fn calculate_task_complexity_factor(&self, _queue_metrics: &[QueueMetrics]) -> f64 {
474 1.0
477 }
478
479 fn calculate_error_rate(&self, processed: i64, failed: i64) -> f64 {
480 let total = processed + failed;
481 if total == 0 {
482 return 0.0;
483 }
484 failed as f64 / total as f64
485 }
486
487 fn estimate_memory_pressure(&self, active_workers: i64) -> f64 {
488 active_workers as f64 * 50.0 }
492
493 fn calculate_avg_queue_wait_time(&self, _queue_metrics: &[QueueMetrics]) -> f64 {
494 0.0
497 }
498
499 fn calculate_throughput_trend(&self, _queue_metrics: &[QueueMetrics]) -> f64 {
500 0.0
503 }
504
505 pub async fn get_scaling_recommendations(&self) -> Result<String, TaskQueueError> {
506 let metrics = self.collect_metrics().await?;
507 let mut scaling_decision_copy = self.clone();
508 let action = scaling_decision_copy.decide_scaling_action(&metrics)?;
509
510 let mut report = format!(
511 "Enhanced Auto-scaling Status Report:\n\
512 - Active Workers: {}\n\
513 - Total Pending Tasks: {}\n\
514 - Queue Pressure Score: {:.2}\n\
515 - Worker Utilization: {:.1}%\n\
516 - Task Complexity Factor: {:.2}\n\
517 - Error Rate: {:.1}%\n\
518 - Memory Pressure: {:.1} MB\n\n",
519 metrics.active_workers,
520 metrics.total_pending_tasks,
521 metrics.queue_pressure_score,
522 metrics.worker_utilization * 100.0,
523 metrics.task_complexity_factor,
524 metrics.error_rate * 100.0,
525 metrics.memory_pressure_mb
526 );
527
528 for queue_metric in &metrics.queue_metrics {
529 report.push_str(&format!(
530 "Queue '{}': {} pending, {} processed, {} failed\n",
531 queue_metric.queue_name,
532 queue_metric.pending_tasks,
533 queue_metric.processed_tasks,
534 queue_metric.failed_tasks
535 ));
536 }
537
538 report.push_str(&format!("\nRecommended Action: {:?}\n", action));
539
540 if let Some(ref adaptive) = self.adaptive_controller {
541 report.push_str(&format!(
542 "\nAdaptive Thresholds Enabled: {}\n",
543 adaptive.get_adaptation_status()
544 ));
545 }
546
547 Ok(report)
548 }
549}
550
551impl Clone for AutoScaler {
553 fn clone(&self) -> Self {
554 let adaptive_controller = self.adaptive_controller.clone();
555 let hysteresis_controller = self.hysteresis_controller.clone();
556
557 Self {
558 broker: self.broker.clone(),
559 config: self.config.clone(),
560 adaptive_controller,
561 hysteresis_controller,
562 metrics_history: self.metrics_history.clone(),
563 start_time: self.start_time,
564 }
565 }
566}
567
568impl Clone for AutoScalerMetrics {
570 fn clone(&self) -> Self {
571 Self {
572 active_workers: self.active_workers,
573 total_pending_tasks: self.total_pending_tasks,
574 queue_metrics: self.queue_metrics.clone(),
575 queue_pressure_score: self.queue_pressure_score,
576 worker_utilization: self.worker_utilization,
577 task_complexity_factor: self.task_complexity_factor,
578 error_rate: self.error_rate,
579 memory_pressure_mb: self.memory_pressure_mb,
580 avg_queue_wait_time_ms: self.avg_queue_wait_time_ms,
581 throughput_trend: self.throughput_trend,
582 }
583 }
584}
585
586impl AdaptiveThresholdController {
587 pub fn new(
588 initial_thresholds: ScalingTriggers,
589 learning_rate: f64,
590 target_sla: SLATargets,
591 adaptation_window: Duration,
592 ) -> Self {
593 Self {
594 current_thresholds: initial_thresholds,
595 performance_history: Vec::new(),
596 last_adaptation: Instant::now(),
597 learning_rate,
598 target_sla,
599 adaptation_window,
600 }
601 }
602
603 pub fn get_current_thresholds(&self) -> &ScalingTriggers {
604 &self.current_thresholds
605 }
606
607 pub fn record_performance_snapshot(
608 &mut self,
609 metrics: &AutoScalerMetrics,
610 action: &ScalingAction,
611 ) {
612 let snapshot = PerformanceSnapshot {
613 timestamp: Instant::now(),
614 latency_p95_ms: 0.0, success_rate: 1.0 - metrics.error_rate,
616 queue_wait_time_ms: metrics.avg_queue_wait_time_ms,
617 worker_utilization: metrics.worker_utilization,
618 scaling_action_taken: match action {
619 ScalingAction::NoAction => None,
620 _ => Some(action.clone()),
621 },
622 };
623
624 self.performance_history.push(snapshot);
625
626 let cutoff_time = Instant::now() - self.adaptation_window;
628 self.performance_history
629 .retain(|s| s.timestamp > cutoff_time);
630
631 if self.last_adaptation.elapsed() > Duration::from_secs(300) {
633 self.adapt_thresholds();
634 self.last_adaptation = Instant::now();
635 }
636 }
637
638 fn adapt_thresholds(&mut self) {
639 if self.performance_history.is_empty() {
640 return;
641 }
642
643 let recent_performance =
644 &self.performance_history[self.performance_history.len().saturating_sub(10)..];
645
646 let avg_success_rate = recent_performance
647 .iter()
648 .map(|s| s.success_rate)
649 .sum::<f64>()
650 / recent_performance.len() as f64;
651
652 let avg_utilization = recent_performance
653 .iter()
654 .map(|s| s.worker_utilization)
655 .sum::<f64>()
656 / recent_performance.len() as f64;
657
658 if avg_success_rate < self.target_sla.min_success_rate {
660 self.current_thresholds.queue_pressure_threshold *= 1.0 - self.learning_rate;
662 } else if avg_success_rate > self.target_sla.min_success_rate + 0.02 {
663 self.current_thresholds.queue_pressure_threshold *= 1.0 + (self.learning_rate * 0.5);
665 }
666
667 let utilization_diff = avg_utilization - self.target_sla.target_worker_utilization;
669 if utilization_diff.abs() > 0.1 {
670 let adjustment = -utilization_diff * self.learning_rate;
671 self.current_thresholds.worker_utilization_threshold *= 1.0 + adjustment;
672 }
673
674 self.current_thresholds.queue_pressure_threshold = self
676 .current_thresholds
677 .queue_pressure_threshold
678 .clamp(0.1, 2.0);
679 self.current_thresholds.worker_utilization_threshold = self
680 .current_thresholds
681 .worker_utilization_threshold
682 .clamp(0.1, 0.95);
683 }
684
685 pub fn get_adaptation_status(&self) -> String {
686 format!(
687 "Thresholds adapted {} times, {} performance samples",
688 0,
689 self.performance_history.len()
690 )
691 }
692}
693
694impl Clone for AdaptiveThresholdController {
695 fn clone(&self) -> Self {
696 Self {
697 current_thresholds: self.current_thresholds.clone(),
698 performance_history: self.performance_history.clone(),
699 last_adaptation: self.last_adaptation,
700 learning_rate: self.learning_rate,
701 target_sla: self.target_sla.clone(),
702 adaptation_window: self.adaptation_window,
703 }
704 }
705}
706
707impl HysteresisController {
708 pub fn new(
709 consecutive_signals_required: usize,
710 scale_up_cooldown: Duration,
711 scale_down_cooldown: Duration,
712 ) -> Self {
713 Self {
714 scale_up_consecutive_signals: 0,
715 scale_down_consecutive_signals: 0,
716 required_consecutive_signals: consecutive_signals_required,
717 last_scaling_action: None,
718 scale_up_cooldown,
719 scale_down_cooldown,
720 }
721 }
722
723 pub fn should_execute_scaling(&mut self, proposed_action: &ScalingAction) -> bool {
724 if let Some((last_time, ref last_action)) = self.last_scaling_action {
726 let cooldown_period = match last_action {
727 ScalingAction::ScaleUp(_) => self.scale_up_cooldown,
728 ScalingAction::ScaleDown(_) => self.scale_down_cooldown,
729 ScalingAction::NoAction => Duration::from_secs(0),
730 };
731
732 if last_time.elapsed() < cooldown_period {
733 return false;
734 }
735 }
736
737 let should_execute = match proposed_action {
739 ScalingAction::ScaleUp(_) => {
740 self.scale_up_consecutive_signals += 1;
741 self.scale_down_consecutive_signals = 0;
742 self.scale_up_consecutive_signals >= self.required_consecutive_signals
743 }
744 ScalingAction::ScaleDown(_) => {
745 self.scale_down_consecutive_signals += 1;
746 self.scale_up_consecutive_signals = 0;
747 self.scale_down_consecutive_signals >= self.required_consecutive_signals
748 }
749 ScalingAction::NoAction => {
750 self.scale_up_consecutive_signals = 0;
751 self.scale_down_consecutive_signals = 0;
752 false
753 }
754 };
755
756 if should_execute {
757 self.last_scaling_action = Some((Instant::now(), proposed_action.clone()));
758 self.scale_up_consecutive_signals = 0;
759 self.scale_down_consecutive_signals = 0;
760 }
761
762 should_execute
763 }
764}
765
766impl Clone for HysteresisController {
767 fn clone(&self) -> Self {
768 Self {
769 scale_up_consecutive_signals: self.scale_up_consecutive_signals,
770 scale_down_consecutive_signals: self.scale_down_consecutive_signals,
771 required_consecutive_signals: self.required_consecutive_signals,
772 last_scaling_action: self.last_scaling_action.clone(),
773 scale_up_cooldown: self.scale_up_cooldown,
774 scale_down_cooldown: self.scale_down_cooldown,
775 }
776 }
777}
778
779impl Clone for ScalingAction {
780 fn clone(&self) -> Self {
781 match self {
782 ScalingAction::ScaleUp(count) => ScalingAction::ScaleUp(*count),
783 ScalingAction::ScaleDown(count) => ScalingAction::ScaleDown(*count),
784 ScalingAction::NoAction => ScalingAction::NoAction,
785 }
786 }
787}
788
789#[cfg(test)]
790mod tests {
791 use super::*;
792 use crate::broker::QueueMetrics;
793
794 #[test]
795 fn test_autoscaler_config_default() {
796 let config = AutoScalerConfig::default();
797
798 assert_eq!(config.min_workers, 1);
799 assert_eq!(config.max_workers, 20);
800 assert_eq!(config.scale_up_count, 2);
801 assert_eq!(config.scale_down_count, 1);
802 assert_eq!(config.scaling_triggers.queue_pressure_threshold, 0.75);
803 assert_eq!(config.scaling_triggers.worker_utilization_threshold, 0.80);
804 assert_eq!(config.scaling_triggers.task_complexity_threshold, 1.5);
805 assert_eq!(config.scaling_triggers.error_rate_threshold, 0.05);
806 assert_eq!(config.scaling_triggers.memory_pressure_threshold, 512.0);
807 assert!(config.enable_adaptive_thresholds);
808 assert_eq!(config.learning_rate, 0.1);
809 assert_eq!(config.adaptation_window_minutes, 30);
810 assert_eq!(config.scale_up_cooldown_seconds, 60);
811 assert_eq!(config.scale_down_cooldown_seconds, 300);
812 assert_eq!(config.consecutive_signals_required, 2);
813 assert_eq!(config.target_sla.max_p95_latency_ms, 5000.0);
814 assert_eq!(config.target_sla.min_success_rate, 0.95);
815 assert_eq!(config.target_sla.max_queue_wait_time_ms, 10000.0);
816 assert_eq!(config.target_sla.target_worker_utilization, 0.70);
817 }
818
819 #[test]
820 fn test_autoscaler_config_validation_valid() {
821 let config = AutoScalerConfig {
822 min_workers: 2,
823 max_workers: 10,
824 scale_up_count: 3,
825 scale_down_count: 2,
826 scaling_triggers: ScalingTriggers {
827 queue_pressure_threshold: 0.75,
828 worker_utilization_threshold: 0.80,
829 task_complexity_threshold: 1.5,
830 error_rate_threshold: 0.05,
831 memory_pressure_threshold: 512.0,
832 },
833 enable_adaptive_thresholds: true,
834 learning_rate: 0.1,
835 adaptation_window_minutes: 30,
836 scale_up_cooldown_seconds: 60,
837 scale_down_cooldown_seconds: 300,
838 consecutive_signals_required: 2,
839 target_sla: SLATargets {
840 max_p95_latency_ms: 5000.0,
841 min_success_rate: 0.95,
842 max_queue_wait_time_ms: 10000.0,
843 target_worker_utilization: 0.70,
844 },
845 };
846
847 assert!(config.validate().is_ok());
848 }
849
850 #[test]
851 fn test_autoscaler_config_validation_min_workers_zero() {
852 let config = AutoScalerConfig {
853 min_workers: 0,
854 max_workers: 10,
855 scale_up_count: 2,
856 scale_down_count: 1,
857 scaling_triggers: ScalingTriggers {
858 queue_pressure_threshold: 0.75,
859 worker_utilization_threshold: 0.80,
860 task_complexity_threshold: 1.5,
861 error_rate_threshold: 0.05,
862 memory_pressure_threshold: 512.0,
863 },
864 enable_adaptive_thresholds: true,
865 learning_rate: 0.1,
866 adaptation_window_minutes: 30,
867 scale_up_cooldown_seconds: 60,
868 scale_down_cooldown_seconds: 300,
869 consecutive_signals_required: 2,
870 target_sla: SLATargets {
871 max_p95_latency_ms: 5000.0,
872 min_success_rate: 0.95,
873 max_queue_wait_time_ms: 10000.0,
874 target_worker_utilization: 0.70,
875 },
876 };
877
878 let result = config.validate();
879 assert!(result.is_err());
880 assert!(result
881 .unwrap_err()
882 .to_string()
883 .contains("Minimum workers must be greater than 0"));
884 }
885
886 #[test]
887 fn test_autoscaler_config_validation_max_less_than_min() {
888 let config = AutoScalerConfig {
889 min_workers: 10,
890 max_workers: 5,
891 scale_up_count: 2,
892 scale_down_count: 1,
893 scaling_triggers: ScalingTriggers {
894 queue_pressure_threshold: 0.75,
895 worker_utilization_threshold: 0.80,
896 task_complexity_threshold: 1.5,
897 error_rate_threshold: 0.05,
898 memory_pressure_threshold: 512.0,
899 },
900 enable_adaptive_thresholds: true,
901 learning_rate: 0.1,
902 adaptation_window_minutes: 30,
903 scale_up_cooldown_seconds: 60,
904 scale_down_cooldown_seconds: 300,
905 consecutive_signals_required: 2,
906 target_sla: SLATargets {
907 max_p95_latency_ms: 5000.0,
908 min_success_rate: 0.95,
909 max_queue_wait_time_ms: 10000.0,
910 target_worker_utilization: 0.70,
911 },
912 };
913
914 let result = config.validate();
915 assert!(result.is_err());
916 assert!(result
917 .unwrap_err()
918 .to_string()
919 .contains("Maximum workers must be greater than or equal to minimum workers"));
920 }
921
922 #[test]
923 fn test_autoscaler_config_validation_max_workers_too_high() {
924 let config = AutoScalerConfig {
925 min_workers: 1,
926 max_workers: 1001,
927 scale_up_count: 2,
928 scale_down_count: 1,
929 scaling_triggers: ScalingTriggers {
930 queue_pressure_threshold: 0.75,
931 worker_utilization_threshold: 0.80,
932 task_complexity_threshold: 1.5,
933 error_rate_threshold: 0.05,
934 memory_pressure_threshold: 512.0,
935 },
936 enable_adaptive_thresholds: true,
937 learning_rate: 0.1,
938 adaptation_window_minutes: 30,
939 scale_up_cooldown_seconds: 60,
940 scale_down_cooldown_seconds: 300,
941 consecutive_signals_required: 2,
942 target_sla: SLATargets {
943 max_p95_latency_ms: 5000.0,
944 min_success_rate: 0.95,
945 max_queue_wait_time_ms: 10000.0,
946 target_worker_utilization: 0.70,
947 },
948 };
949
950 let result = config.validate();
951 assert!(result.is_err());
952 assert!(result
953 .unwrap_err()
954 .to_string()
955 .contains("Maximum workers cannot exceed 1000"));
956 }
957
958 #[test]
959 fn test_autoscaler_config_validation_invalid_scale_up_count() {
960 let config = AutoScalerConfig {
961 min_workers: 1,
962 max_workers: 10,
963 scale_up_count: 0,
964 scale_down_count: 1,
965 scaling_triggers: ScalingTriggers {
966 queue_pressure_threshold: 0.75,
967 worker_utilization_threshold: 0.80,
968 task_complexity_threshold: 1.5,
969 error_rate_threshold: 0.05,
970 memory_pressure_threshold: 512.0,
971 },
972 enable_adaptive_thresholds: true,
973 learning_rate: 0.1,
974 adaptation_window_minutes: 30,
975 scale_up_cooldown_seconds: 60,
976 scale_down_cooldown_seconds: 300,
977 consecutive_signals_required: 2,
978 target_sla: SLATargets {
979 max_p95_latency_ms: 5000.0,
980 min_success_rate: 0.95,
981 max_queue_wait_time_ms: 10000.0,
982 target_worker_utilization: 0.70,
983 },
984 };
985
986 let result = config.validate();
987 assert!(result.is_err());
988 assert!(result
989 .unwrap_err()
990 .to_string()
991 .contains("Scale up count must be between 1 and 50"));
992 }
993
994 #[test]
995 fn test_autoscaler_config_validation_invalid_scale_down_count() {
996 let config = AutoScalerConfig {
997 min_workers: 1,
998 max_workers: 10,
999 scale_up_count: 2,
1000 scale_down_count: 0,
1001 scaling_triggers: ScalingTriggers {
1002 queue_pressure_threshold: 0.75,
1003 worker_utilization_threshold: 0.80,
1004 task_complexity_threshold: 1.5,
1005 error_rate_threshold: 0.05,
1006 memory_pressure_threshold: 512.0,
1007 },
1008 enable_adaptive_thresholds: true,
1009 learning_rate: 0.1,
1010 adaptation_window_minutes: 30,
1011 scale_up_cooldown_seconds: 60,
1012 scale_down_cooldown_seconds: 300,
1013 consecutive_signals_required: 2,
1014 target_sla: SLATargets {
1015 max_p95_latency_ms: 5000.0,
1016 min_success_rate: 0.95,
1017 max_queue_wait_time_ms: 10000.0,
1018 target_worker_utilization: 0.70,
1019 },
1020 };
1021
1022 let result = config.validate();
1023 assert!(result.is_err());
1024 assert!(result
1025 .unwrap_err()
1026 .to_string()
1027 .contains("Scale down count must be between 1 and 50"));
1028 }
1029
1030 #[test]
1031 fn test_autoscaler_config_validation_invalid_scaling_triggers() {
1032 let config = AutoScalerConfig {
1033 min_workers: 1,
1034 max_workers: 10,
1035 scale_up_count: 2,
1036 scale_down_count: 1,
1037 scaling_triggers: ScalingTriggers {
1038 queue_pressure_threshold: 0.0,
1039 worker_utilization_threshold: 0.0,
1040 task_complexity_threshold: 0.0,
1041 error_rate_threshold: 0.0,
1042 memory_pressure_threshold: 0.0,
1043 },
1044 enable_adaptive_thresholds: true,
1045 learning_rate: 0.1,
1046 adaptation_window_minutes: 30,
1047 scale_up_cooldown_seconds: 60,
1048 scale_down_cooldown_seconds: 300,
1049 consecutive_signals_required: 2,
1050 target_sla: SLATargets {
1051 max_p95_latency_ms: 5000.0,
1052 min_success_rate: 0.95,
1053 max_queue_wait_time_ms: 10000.0,
1054 target_worker_utilization: 0.70,
1055 },
1056 };
1057
1058 let result = config.validate();
1059 assert!(result.is_err());
1060 let error_msg = result.unwrap_err().to_string();
1061 assert!(error_msg.contains("Queue pressure threshold must be between 0.1 and 2.0"));
1062 }
1063
1064 #[test]
1065 fn test_autoscaler_config_validation_invalid_learning_rate() {
1066 let config = AutoScalerConfig {
1067 min_workers: 1,
1068 max_workers: 10,
1069 scale_up_count: 2,
1070 scale_down_count: 1,
1071 scaling_triggers: ScalingTriggers {
1072 queue_pressure_threshold: 0.75,
1073 worker_utilization_threshold: 0.80,
1074 task_complexity_threshold: 1.5,
1075 error_rate_threshold: 0.05,
1076 memory_pressure_threshold: 512.0,
1077 },
1078 enable_adaptive_thresholds: true,
1079 learning_rate: 0.0,
1080 adaptation_window_minutes: 30,
1081 scale_up_cooldown_seconds: 60,
1082 scale_down_cooldown_seconds: 300,
1083 consecutive_signals_required: 2,
1084 target_sla: SLATargets {
1085 max_p95_latency_ms: 5000.0,
1086 min_success_rate: 0.95,
1087 max_queue_wait_time_ms: 10000.0,
1088 target_worker_utilization: 0.70,
1089 },
1090 };
1091
1092 let result = config.validate();
1093 assert!(result.is_err());
1094 assert!(result
1095 .unwrap_err()
1096 .to_string()
1097 .contains("Learning rate must be between 0.01 and 1.0"));
1098 }
1099
1100 #[test]
1101 fn test_autoscaler_config_serialization() {
1102 let config = AutoScalerConfig::default();
1103
1104 let json = serde_json::to_string(&config).expect("Failed to serialize to JSON");
1106 let deserialized: AutoScalerConfig =
1107 serde_json::from_str(&json).expect("Failed to deserialize from JSON");
1108
1109 assert_eq!(config.min_workers, deserialized.min_workers);
1110 assert_eq!(config.max_workers, deserialized.max_workers);
1111 assert_eq!(config.scale_up_count, deserialized.scale_up_count);
1112 assert_eq!(config.scale_down_count, deserialized.scale_down_count);
1113 assert_eq!(
1114 config.scaling_triggers.queue_pressure_threshold,
1115 deserialized.scaling_triggers.queue_pressure_threshold
1116 );
1117 assert_eq!(
1118 config.scaling_triggers.worker_utilization_threshold,
1119 deserialized.scaling_triggers.worker_utilization_threshold
1120 );
1121 assert_eq!(
1122 config.scaling_triggers.task_complexity_threshold,
1123 deserialized.scaling_triggers.task_complexity_threshold
1124 );
1125 assert_eq!(
1126 config.scaling_triggers.error_rate_threshold,
1127 deserialized.scaling_triggers.error_rate_threshold
1128 );
1129 assert_eq!(
1130 config.scaling_triggers.memory_pressure_threshold,
1131 deserialized.scaling_triggers.memory_pressure_threshold
1132 );
1133 assert_eq!(
1134 config.enable_adaptive_thresholds,
1135 deserialized.enable_adaptive_thresholds
1136 );
1137 assert_eq!(config.learning_rate, deserialized.learning_rate);
1138 assert_eq!(
1139 config.adaptation_window_minutes,
1140 deserialized.adaptation_window_minutes
1141 );
1142 assert_eq!(
1143 config.scale_up_cooldown_seconds,
1144 deserialized.scale_up_cooldown_seconds
1145 );
1146 assert_eq!(
1147 config.scale_down_cooldown_seconds,
1148 deserialized.scale_down_cooldown_seconds
1149 );
1150 assert_eq!(
1151 config.consecutive_signals_required,
1152 deserialized.consecutive_signals_required
1153 );
1154 assert_eq!(
1155 config.target_sla.max_p95_latency_ms,
1156 deserialized.target_sla.max_p95_latency_ms
1157 );
1158 assert_eq!(
1159 config.target_sla.min_success_rate,
1160 deserialized.target_sla.min_success_rate
1161 );
1162 assert_eq!(
1163 config.target_sla.max_queue_wait_time_ms,
1164 deserialized.target_sla.max_queue_wait_time_ms
1165 );
1166 assert_eq!(
1167 config.target_sla.target_worker_utilization,
1168 deserialized.target_sla.target_worker_utilization
1169 );
1170 }
1171
1172 fn create_test_autoscaler_metrics(
1173 active_workers: i64,
1174 total_pending_tasks: i64,
1175 ) -> AutoScalerMetrics {
1176 let queue_pressure_score = if active_workers > 0 {
1177 total_pending_tasks as f64 / active_workers as f64
1178 } else {
1179 total_pending_tasks as f64
1180 };
1181
1182 AutoScalerMetrics {
1183 active_workers,
1184 total_pending_tasks,
1185 queue_metrics: vec![QueueMetrics {
1186 queue_name: "default".to_string(),
1187 pending_tasks: total_pending_tasks,
1188 processed_tasks: 100,
1189 failed_tasks: 5,
1190 }],
1191 queue_pressure_score,
1192 worker_utilization: if active_workers > 0 { 0.6 } else { 0.0 },
1193 task_complexity_factor: 1.0,
1194 error_rate: 5.0 / 105.0, memory_pressure_mb: active_workers as f64 * 50.0,
1196 avg_queue_wait_time_ms: 0.0,
1197 throughput_trend: 0.0,
1198 }
1199 }
1200
1201 fn create_test_autoscaler_metrics_with_high_pressure(
1202 active_workers: i64,
1203 total_pending_tasks: i64,
1204 ) -> AutoScalerMetrics {
1205 let queue_pressure_score = if active_workers > 0 {
1206 total_pending_tasks as f64 / active_workers as f64
1207 } else {
1208 total_pending_tasks as f64
1209 };
1210
1211 AutoScalerMetrics {
1212 active_workers,
1213 total_pending_tasks,
1214 queue_metrics: vec![QueueMetrics {
1215 queue_name: "default".to_string(),
1216 pending_tasks: total_pending_tasks,
1217 processed_tasks: 100,
1218 failed_tasks: 5,
1219 }],
1220 queue_pressure_score,
1221 worker_utilization: 0.95, task_complexity_factor: 2.0, error_rate: 0.08, memory_pressure_mb: 600.0, avg_queue_wait_time_ms: 0.0,
1226 throughput_trend: 0.0,
1227 }
1228 }
1229
1230 fn create_test_autoscaler_metrics_with_low_pressure(
1231 active_workers: i64,
1232 total_pending_tasks: i64,
1233 ) -> AutoScalerMetrics {
1234 let queue_pressure_score = if active_workers > 0 {
1235 total_pending_tasks as f64 / active_workers as f64
1236 } else {
1237 total_pending_tasks as f64
1238 };
1239
1240 AutoScalerMetrics {
1241 active_workers,
1242 total_pending_tasks,
1243 queue_metrics: vec![QueueMetrics {
1244 queue_name: "default".to_string(),
1245 pending_tasks: total_pending_tasks,
1246 processed_tasks: 100,
1247 failed_tasks: 5,
1248 }],
1249 queue_pressure_score,
1250 worker_utilization: 0.2, task_complexity_factor: 0.5, error_rate: 0.01, memory_pressure_mb: 100.0, avg_queue_wait_time_ms: 0.0,
1255 throughput_trend: 0.0,
1256 }
1257 }
1258
1259 #[allow(dead_code)]
1260 fn create_mock_autoscaler() -> AutoScaler {
1261 let redis_url = "redis://localhost:6379";
1263 let mut config = deadpool_redis::Config::from_url(redis_url);
1264 config.pool = Some(deadpool_redis::PoolConfig::new(1));
1265
1266 let pool = config
1267 .create_pool(Some(deadpool_redis::Runtime::Tokio1))
1268 .expect("Failed to create pool");
1269
1270 let broker = Arc::new(crate::broker::RedisBroker { pool });
1271 AutoScaler::new(broker)
1272 }
1273
1274 #[test]
1275 fn test_scaling_action_scale_up() {
1276 let config = AutoScalerConfig {
1277 min_workers: 1,
1278 max_workers: 10,
1279 scale_up_count: 2,
1280 scale_down_count: 1,
1281 scaling_triggers: ScalingTriggers {
1282 queue_pressure_threshold: 0.75,
1283 worker_utilization_threshold: 0.80,
1284 task_complexity_threshold: 1.5,
1285 error_rate_threshold: 0.05,
1286 memory_pressure_threshold: 512.0,
1287 },
1288 enable_adaptive_thresholds: true,
1289 learning_rate: 0.1,
1290 adaptation_window_minutes: 30,
1291 scale_up_cooldown_seconds: 60,
1292 scale_down_cooldown_seconds: 300,
1293 consecutive_signals_required: 2,
1294 target_sla: SLATargets {
1295 max_p95_latency_ms: 5000.0,
1296 min_success_rate: 0.95,
1297 max_queue_wait_time_ms: 10000.0,
1298 target_worker_utilization: 0.70,
1299 },
1300 };
1301
1302 let broker = Arc::new(crate::broker::RedisBroker {
1303 pool: deadpool_redis::Config::from_url("redis://localhost:6379")
1304 .create_pool(Some(deadpool_redis::Runtime::Tokio1))
1305 .expect("Failed to create pool"),
1306 });
1307
1308 let mut autoscaler = AutoScaler::with_config(broker, config);
1309
1310 let metrics = create_test_autoscaler_metrics_with_high_pressure(2, 12);
1312
1313 let action1 = autoscaler
1315 .decide_scaling_action(&metrics)
1316 .expect("Failed to decide scaling action");
1317 let action2 = autoscaler
1318 .decide_scaling_action(&metrics)
1319 .expect("Failed to decide scaling action");
1320
1321 match action2 {
1323 ScalingAction::ScaleUp(count) => assert_eq!(count, 2),
1324 _ => panic!(
1325 "Expected ScaleUp action, got {:?}. First action was {:?}",
1326 action2, action1
1327 ),
1328 }
1329 }
1330
1331 #[test]
1332 fn test_scaling_action_scale_down() {
1333 let config = AutoScalerConfig {
1334 min_workers: 1,
1335 max_workers: 10,
1336 scale_up_count: 2,
1337 scale_down_count: 1,
1338 scaling_triggers: ScalingTriggers {
1339 queue_pressure_threshold: 0.75,
1340 worker_utilization_threshold: 0.80,
1341 task_complexity_threshold: 1.5,
1342 error_rate_threshold: 0.05,
1343 memory_pressure_threshold: 512.0,
1344 },
1345 enable_adaptive_thresholds: true,
1346 learning_rate: 0.1,
1347 adaptation_window_minutes: 30,
1348 scale_up_cooldown_seconds: 60,
1349 scale_down_cooldown_seconds: 300,
1350 consecutive_signals_required: 2,
1351 target_sla: SLATargets {
1352 max_p95_latency_ms: 5000.0,
1353 min_success_rate: 0.95,
1354 max_queue_wait_time_ms: 10000.0,
1355 target_worker_utilization: 0.70,
1356 },
1357 };
1358
1359 let broker = Arc::new(crate::broker::RedisBroker {
1360 pool: deadpool_redis::Config::from_url("redis://localhost:6379")
1361 .create_pool(Some(deadpool_redis::Runtime::Tokio1))
1362 .expect("Failed to create pool"),
1363 });
1364
1365 let mut autoscaler = AutoScaler::with_config(broker, config);
1366
1367 let metrics = create_test_autoscaler_metrics_with_low_pressure(5, 2);
1369
1370 let action1 = autoscaler
1372 .decide_scaling_action(&metrics)
1373 .expect("Failed to decide scaling action");
1374 let action2 = autoscaler
1375 .decide_scaling_action(&metrics)
1376 .expect("Failed to decide scaling action");
1377
1378 match action2 {
1380 ScalingAction::ScaleDown(count) => assert_eq!(count, 1),
1381 _ => panic!(
1382 "Expected ScaleDown action, got {:?}. First action was {:?}",
1383 action2, action1
1384 ),
1385 }
1386 }
1387
1388 #[test]
1389 fn test_scaling_action_no_action() {
1390 let config = AutoScalerConfig::default();
1391
1392 let broker = Arc::new(crate::broker::RedisBroker {
1393 pool: deadpool_redis::Config::from_url("redis://localhost:6379")
1394 .create_pool(Some(deadpool_redis::Runtime::Tokio1))
1395 .expect("Failed to create pool"),
1396 });
1397
1398 let mut autoscaler = AutoScaler::with_config(broker, config);
1399
1400 let metrics = create_test_autoscaler_metrics(3, 9);
1402
1403 let action = autoscaler
1404 .decide_scaling_action(&metrics)
1405 .expect("Failed to decide scaling action");
1406
1407 match action {
1408 ScalingAction::NoAction => {}
1409 _ => panic!("Expected NoAction, got {:?}", action),
1410 }
1411 }
1412
1413 #[test]
1414 fn test_scaling_action_at_max_workers() {
1415 let config = AutoScalerConfig {
1416 min_workers: 1,
1417 max_workers: 5,
1418 scale_up_count: 3,
1419 scale_down_count: 1,
1420 scaling_triggers: ScalingTriggers {
1421 queue_pressure_threshold: 0.75,
1422 worker_utilization_threshold: 0.80,
1423 task_complexity_threshold: 1.5,
1424 error_rate_threshold: 0.05,
1425 memory_pressure_threshold: 512.0,
1426 },
1427 enable_adaptive_thresholds: true,
1428 learning_rate: 0.1,
1429 adaptation_window_minutes: 30,
1430 scale_up_cooldown_seconds: 60,
1431 scale_down_cooldown_seconds: 300,
1432 consecutive_signals_required: 2,
1433 target_sla: SLATargets {
1434 max_p95_latency_ms: 5000.0,
1435 min_success_rate: 0.95,
1436 max_queue_wait_time_ms: 10000.0,
1437 target_worker_utilization: 0.70,
1438 },
1439 };
1440
1441 let broker = Arc::new(crate::broker::RedisBroker {
1442 pool: deadpool_redis::Config::from_url("redis://localhost:6379")
1443 .create_pool(Some(deadpool_redis::Runtime::Tokio1))
1444 .expect("Failed to create pool"),
1445 });
1446
1447 let mut autoscaler = AutoScaler::with_config(broker, config);
1448
1449 let metrics = create_test_autoscaler_metrics(5, 20);
1451
1452 let action = autoscaler
1453 .decide_scaling_action(&metrics)
1454 .expect("Failed to decide scaling action");
1455
1456 match action {
1457 ScalingAction::NoAction => {}
1458 _ => panic!("Expected NoAction when at max workers"),
1459 }
1460 }
1461
1462 #[test]
1463 fn test_scaling_action_at_min_workers() {
1464 let config = AutoScalerConfig {
1465 min_workers: 3,
1466 max_workers: 10,
1467 scale_up_count: 2,
1468 scale_down_count: 2,
1469 scaling_triggers: ScalingTriggers {
1470 queue_pressure_threshold: 0.75,
1471 worker_utilization_threshold: 0.80,
1472 task_complexity_threshold: 1.5,
1473 error_rate_threshold: 0.05,
1474 memory_pressure_threshold: 512.0,
1475 },
1476 enable_adaptive_thresholds: true,
1477 learning_rate: 0.1,
1478 adaptation_window_minutes: 30,
1479 scale_up_cooldown_seconds: 60,
1480 scale_down_cooldown_seconds: 300,
1481 consecutive_signals_required: 2,
1482 target_sla: SLATargets {
1483 max_p95_latency_ms: 5000.0,
1484 min_success_rate: 0.95,
1485 max_queue_wait_time_ms: 10000.0,
1486 target_worker_utilization: 0.70,
1487 },
1488 };
1489
1490 let broker = Arc::new(crate::broker::RedisBroker {
1491 pool: deadpool_redis::Config::from_url("redis://localhost:6379")
1492 .create_pool(Some(deadpool_redis::Runtime::Tokio1))
1493 .expect("Failed to create pool"),
1494 });
1495
1496 let mut autoscaler = AutoScaler::with_config(broker, config);
1497
1498 let metrics = create_test_autoscaler_metrics(3, 1);
1500
1501 let action = autoscaler
1502 .decide_scaling_action(&metrics)
1503 .expect("Failed to decide scaling action");
1504
1505 match action {
1506 ScalingAction::NoAction => {}
1507 _ => panic!("Expected NoAction when at min workers"),
1508 }
1509 }
1510
1511 #[test]
1512 fn test_scaling_action_limited_by_max_workers() {
1513 let config = AutoScalerConfig {
1514 min_workers: 1,
1515 max_workers: 5,
1516 scale_up_count: 10,
1517 scale_down_count: 1,
1518 scaling_triggers: ScalingTriggers {
1519 queue_pressure_threshold: 0.75,
1520 worker_utilization_threshold: 0.80,
1521 task_complexity_threshold: 1.5,
1522 error_rate_threshold: 0.05,
1523 memory_pressure_threshold: 512.0,
1524 },
1525 enable_adaptive_thresholds: true,
1526 learning_rate: 0.1,
1527 adaptation_window_minutes: 30,
1528 scale_up_cooldown_seconds: 60,
1529 scale_down_cooldown_seconds: 300,
1530 consecutive_signals_required: 2,
1531 target_sla: SLATargets {
1532 max_p95_latency_ms: 5000.0,
1533 min_success_rate: 0.95,
1534 max_queue_wait_time_ms: 10000.0,
1535 target_worker_utilization: 0.70,
1536 },
1537 };
1538
1539 let broker = Arc::new(crate::broker::RedisBroker {
1540 pool: deadpool_redis::Config::from_url("redis://localhost:6379")
1541 .create_pool(Some(deadpool_redis::Runtime::Tokio1))
1542 .expect("Failed to create pool"),
1543 });
1544
1545 let mut autoscaler = AutoScaler::with_config(broker, config);
1546
1547 let metrics = create_test_autoscaler_metrics_with_high_pressure(3, 15);
1549
1550 let action1 = autoscaler
1552 .decide_scaling_action(&metrics)
1553 .expect("Failed to decide scaling action");
1554 let action2 = autoscaler
1555 .decide_scaling_action(&metrics)
1556 .expect("Failed to decide scaling action");
1557
1558 match action2 {
1560 ScalingAction::ScaleUp(count) => assert_eq!(count, 2), _ => panic!(
1562 "Expected ScaleUp action limited by max workers, got {:?}. First action was {:?}",
1563 action2, action1
1564 ),
1565 }
1566 }
1567
1568 #[test]
1569 fn test_scaling_action_limited_by_min_workers() {
1570 let config = AutoScalerConfig {
1571 min_workers: 3,
1572 max_workers: 10,
1573 scale_up_count: 2,
1574 scale_down_count: 10,
1575 scaling_triggers: ScalingTriggers {
1576 queue_pressure_threshold: 0.75,
1577 worker_utilization_threshold: 0.80,
1578 task_complexity_threshold: 1.5,
1579 error_rate_threshold: 0.05,
1580 memory_pressure_threshold: 512.0,
1581 },
1582 enable_adaptive_thresholds: true,
1583 learning_rate: 0.1,
1584 adaptation_window_minutes: 30,
1585 scale_up_cooldown_seconds: 60,
1586 scale_down_cooldown_seconds: 300,
1587 consecutive_signals_required: 2,
1588 target_sla: SLATargets {
1589 max_p95_latency_ms: 5000.0,
1590 min_success_rate: 0.95,
1591 max_queue_wait_time_ms: 10000.0,
1592 target_worker_utilization: 0.70,
1593 },
1594 };
1595
1596 let broker = Arc::new(crate::broker::RedisBroker {
1597 pool: deadpool_redis::Config::from_url("redis://localhost:6379")
1598 .create_pool(Some(deadpool_redis::Runtime::Tokio1))
1599 .expect("Failed to create pool"),
1600 });
1601
1602 let mut autoscaler = AutoScaler::with_config(broker, config);
1603
1604 let metrics = create_test_autoscaler_metrics_with_low_pressure(6, 3);
1606
1607 let action1 = autoscaler
1609 .decide_scaling_action(&metrics)
1610 .expect("Failed to decide scaling action");
1611 let action2 = autoscaler
1612 .decide_scaling_action(&metrics)
1613 .expect("Failed to decide scaling action");
1614
1615 match action2 {
1617 ScalingAction::ScaleDown(count) => assert_eq!(count, 3), _ => panic!(
1619 "Expected ScaleDown action limited by min workers, got {:?}. First action was {:?}",
1620 action2, action1
1621 ),
1622 }
1623 }
1624
1625 #[test]
1626 fn test_scaling_action_zero_workers() {
1627 let config = AutoScalerConfig::default();
1628
1629 let broker = Arc::new(crate::broker::RedisBroker {
1630 pool: deadpool_redis::Config::from_url("redis://localhost:6379")
1631 .create_pool(Some(deadpool_redis::Runtime::Tokio1))
1632 .expect("Failed to create pool"),
1633 });
1634
1635 let mut autoscaler = AutoScaler::with_config(broker, config);
1636
1637 let metrics = create_test_autoscaler_metrics_with_high_pressure(0, 10);
1639
1640 let action1 = autoscaler
1642 .decide_scaling_action(&metrics)
1643 .expect("Failed to decide scaling action");
1644 let action2 = autoscaler
1645 .decide_scaling_action(&metrics)
1646 .expect("Failed to decide scaling action");
1647
1648 match action2 {
1650 ScalingAction::ScaleUp(count) => assert_eq!(count, 2),
1651 _ => panic!(
1652 "Expected ScaleUp action with zero workers, got {:?}. First action was {:?}",
1653 action2, action1
1654 ),
1655 }
1656 }
1657
1658 #[test]
1659 fn test_autoscaler_metrics_debug() {
1660 let metrics = create_test_autoscaler_metrics(5, 25);
1661 let debug_str = format!("{:?}", metrics);
1662
1663 assert!(debug_str.contains("AutoScalerMetrics"));
1664 assert!(debug_str.contains("active_workers: 5"));
1665 assert!(debug_str.contains("total_pending_tasks: 25"));
1666 assert!(debug_str.contains("queue_pressure_score: 5"));
1667 assert!(debug_str.contains("worker_utilization: 0.6"));
1668 assert!(debug_str.contains("task_complexity_factor: 1"));
1669 assert!(debug_str.contains("error_rate:"));
1670 assert!(debug_str.contains("memory_pressure_mb: 250"));
1671 assert!(debug_str.contains("avg_queue_wait_time_ms: 0"));
1672 assert!(debug_str.contains("throughput_trend: 0"));
1673 }
1674
1675 #[test]
1676 fn test_scaling_action_debug() {
1677 let scale_up = ScalingAction::ScaleUp(3);
1678 let scale_down = ScalingAction::ScaleDown(2);
1679 let no_action = ScalingAction::NoAction;
1680
1681 assert!(format!("{:?}", scale_up).contains("ScaleUp(3)"));
1682 assert!(format!("{:?}", scale_down).contains("ScaleDown(2)"));
1683 assert!(format!("{:?}", no_action).contains("NoAction"));
1684 }
1685
1686 #[test]
1687 fn test_autoscaler_config_clone() {
1688 let original = AutoScalerConfig::default();
1689 let cloned = original.clone();
1690
1691 assert_eq!(original.min_workers, cloned.min_workers);
1692 assert_eq!(original.max_workers, cloned.max_workers);
1693 assert_eq!(original.scale_up_count, cloned.scale_up_count);
1694 assert_eq!(original.scale_down_count, cloned.scale_down_count);
1695 assert_eq!(
1696 original.scaling_triggers.queue_pressure_threshold,
1697 cloned.scaling_triggers.queue_pressure_threshold
1698 );
1699 assert_eq!(
1700 original.scaling_triggers.worker_utilization_threshold,
1701 cloned.scaling_triggers.worker_utilization_threshold
1702 );
1703 assert_eq!(
1704 original.scaling_triggers.task_complexity_threshold,
1705 cloned.scaling_triggers.task_complexity_threshold
1706 );
1707 assert_eq!(
1708 original.scaling_triggers.error_rate_threshold,
1709 cloned.scaling_triggers.error_rate_threshold
1710 );
1711 assert_eq!(
1712 original.scaling_triggers.memory_pressure_threshold,
1713 cloned.scaling_triggers.memory_pressure_threshold
1714 );
1715 assert_eq!(
1716 original.enable_adaptive_thresholds,
1717 cloned.enable_adaptive_thresholds
1718 );
1719 assert_eq!(original.learning_rate, cloned.learning_rate);
1720 assert_eq!(
1721 original.adaptation_window_minutes,
1722 cloned.adaptation_window_minutes
1723 );
1724 assert_eq!(
1725 original.scale_up_cooldown_seconds,
1726 cloned.scale_up_cooldown_seconds
1727 );
1728 assert_eq!(
1729 original.scale_down_cooldown_seconds,
1730 cloned.scale_down_cooldown_seconds
1731 );
1732 assert_eq!(
1733 original.consecutive_signals_required,
1734 cloned.consecutive_signals_required
1735 );
1736 assert_eq!(
1737 original.target_sla.max_p95_latency_ms,
1738 cloned.target_sla.max_p95_latency_ms
1739 );
1740 assert_eq!(
1741 original.target_sla.min_success_rate,
1742 cloned.target_sla.min_success_rate
1743 );
1744 assert_eq!(
1745 original.target_sla.max_queue_wait_time_ms,
1746 cloned.target_sla.max_queue_wait_time_ms
1747 );
1748 assert_eq!(
1749 original.target_sla.target_worker_utilization,
1750 cloned.target_sla.target_worker_utilization
1751 );
1752 }
1753}