rust_task_queue/
autoscaler.rs

1use crate::{queue::queue_names, QueueMetrics, RedisBroker, TaskQueueError};
2use serde::{Deserialize, Serialize};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6/// Enhanced autoscaler configuration with multi-dimensional scaling triggers
7#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct AutoScalerConfig {
9    pub min_workers: usize,
10    pub max_workers: usize,
11    pub scale_up_count: usize,
12    pub scale_down_count: usize,
13
14    // Multi-dimensional thresholds
15    pub scaling_triggers: ScalingTriggers,
16
17    // Adaptive learning parameters
18    pub enable_adaptive_thresholds: bool,
19    pub learning_rate: f64,
20    pub adaptation_window_minutes: u32,
21
22    // Hysteresis and stability
23    pub scale_up_cooldown_seconds: u64,
24    pub scale_down_cooldown_seconds: u64,
25    pub consecutive_signals_required: usize,
26
27    // Performance targets for adaptive learning
28    pub target_sla: SLATargets,
29}
30
31/// Multi-dimensional scaling triggers
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct ScalingTriggers {
34    pub queue_pressure_threshold: f64,     // Weighted queue depth score
35    pub worker_utilization_threshold: f64, // Target worker utilization %
36    pub task_complexity_threshold: f64,    // Complex task overload factor
37    pub error_rate_threshold: f64,         // Maximum acceptable error rate
38    pub memory_pressure_threshold: f64,    // Memory usage per worker (MB)
39}
40
41/// SLA targets for adaptive threshold learning
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct SLATargets {
44    pub max_p95_latency_ms: f64,
45    pub min_success_rate: f64,
46    pub max_queue_wait_time_ms: f64,
47    pub target_worker_utilization: f64,
48}
49
50impl Default for AutoScalerConfig {
51    fn default() -> Self {
52        Self {
53            min_workers: 1,
54            max_workers: 20,
55            scale_up_count: 2,
56            scale_down_count: 1,
57
58            scaling_triggers: ScalingTriggers {
59                queue_pressure_threshold: 0.75,
60                worker_utilization_threshold: 0.80,
61                task_complexity_threshold: 1.5,
62                error_rate_threshold: 0.05,
63                memory_pressure_threshold: 512.0,
64            },
65
66            enable_adaptive_thresholds: true,
67            learning_rate: 0.1,
68            adaptation_window_minutes: 30,
69
70            scale_up_cooldown_seconds: 60,
71            scale_down_cooldown_seconds: 300,
72            consecutive_signals_required: 2,
73
74            target_sla: SLATargets {
75                max_p95_latency_ms: 5000.0,
76                min_success_rate: 0.95,
77                max_queue_wait_time_ms: 10000.0,
78                target_worker_utilization: 0.70,
79            },
80        }
81    }
82}
83
84impl AutoScalerConfig {
85    pub fn validate(&self) -> Result<(), TaskQueueError> {
86        if self.min_workers == 0 {
87            return Err(TaskQueueError::Configuration(
88                "Minimum workers must be greater than 0".to_string(),
89            ));
90        }
91
92        if self.max_workers < self.min_workers {
93            return Err(TaskQueueError::Configuration(
94                "Maximum workers must be greater than or equal to minimum workers".to_string(),
95            ));
96        }
97
98        if self.max_workers > 1000 {
99            return Err(TaskQueueError::Configuration(
100                "Maximum workers cannot exceed 1000".to_string(),
101            ));
102        }
103
104        if self.scale_up_count == 0 || self.scale_up_count > 50 {
105            return Err(TaskQueueError::Configuration(
106                "Scale up count must be between 1 and 50".to_string(),
107            ));
108        }
109
110        if self.scale_down_count == 0 || self.scale_down_count > 50 {
111            return Err(TaskQueueError::Configuration(
112                "Scale down count must be between 1 and 50".to_string(),
113            ));
114        }
115
116        // Validate scaling triggers
117        let triggers = &self.scaling_triggers;
118        if triggers.queue_pressure_threshold <= 0.0 || triggers.queue_pressure_threshold > 2.0 {
119            return Err(TaskQueueError::Configuration(
120                "Queue pressure threshold must be between 0.1 and 2.0".to_string(),
121            ));
122        }
123
124        if triggers.worker_utilization_threshold <= 0.0
125            || triggers.worker_utilization_threshold > 1.0
126        {
127            return Err(TaskQueueError::Configuration(
128                "Worker utilization threshold must be between 0.1 and 1.0".to_string(),
129            ));
130        }
131
132        if triggers.error_rate_threshold < 0.0 || triggers.error_rate_threshold > 1.0 {
133            return Err(TaskQueueError::Configuration(
134                "Error rate threshold must be between 0.0 and 1.0".to_string(),
135            ));
136        }
137
138        if self.learning_rate <= 0.0 || self.learning_rate > 1.0 {
139            return Err(TaskQueueError::Configuration(
140                "Learning rate must be between 0.01 and 1.0".to_string(),
141            ));
142        }
143
144        Ok(())
145    }
146}
147
148#[derive(Debug)]
149pub enum ScalingAction {
150    ScaleUp(usize),
151    ScaleDown(usize),
152    NoAction,
153}
154
155/// Enhanced metrics with multi-dimensional analysis
156#[derive(Debug, Serialize, Deserialize)]
157pub struct AutoScalerMetrics {
158    pub active_workers: i64,
159    pub total_pending_tasks: i64,
160    pub queue_metrics: Vec<QueueMetrics>,
161
162    // Enhanced metrics
163    pub queue_pressure_score: f64,
164    pub worker_utilization: f64,
165    pub task_complexity_factor: f64,
166    pub error_rate: f64,
167    pub memory_pressure_mb: f64,
168    pub avg_queue_wait_time_ms: f64,
169    pub throughput_trend: f64,
170}
171
172/// Adaptive threshold controller that learns from system performance
173#[derive(Debug)]
174pub struct AdaptiveThresholdController {
175    current_thresholds: ScalingTriggers,
176    performance_history: Vec<PerformanceSnapshot>,
177    last_adaptation: Instant,
178    learning_rate: f64,
179    target_sla: SLATargets,
180    adaptation_window: Duration,
181}
182
183#[derive(Debug, Clone)]
184#[allow(dead_code)] // Fields reserved for future Phase 2/3 enhancements
185struct PerformanceSnapshot {
186    timestamp: Instant,
187    latency_p95_ms: f64,
188    success_rate: f64,
189    queue_wait_time_ms: f64,
190    worker_utilization: f64,
191    scaling_action_taken: Option<ScalingAction>,
192}
193
194/// Hysteresis controller to prevent oscillations
195#[derive(Debug)]
196pub struct HysteresisController {
197    scale_up_consecutive_signals: usize,
198    scale_down_consecutive_signals: usize,
199    required_consecutive_signals: usize,
200    last_scaling_action: Option<(Instant, ScalingAction)>,
201    scale_up_cooldown: Duration,
202    scale_down_cooldown: Duration,
203}
204
205pub struct AutoScaler {
206    broker: Arc<RedisBroker>,
207    config: AutoScalerConfig,
208    adaptive_controller: Option<AdaptiveThresholdController>,
209    hysteresis_controller: HysteresisController,
210    metrics_history: Vec<AutoScalerMetrics>,
211    start_time: Instant,
212}
213
214impl AutoScaler {
215    pub fn new(broker: Arc<RedisBroker>) -> Self {
216        let config = AutoScalerConfig::default();
217        Self::with_config(broker, config)
218    }
219
220    pub fn with_config(broker: Arc<RedisBroker>, config: AutoScalerConfig) -> Self {
221        let adaptive_controller = if config.enable_adaptive_thresholds {
222            Some(AdaptiveThresholdController::new(
223                config.scaling_triggers.clone(),
224                config.learning_rate,
225                config.target_sla.clone(),
226                Duration::from_secs(config.adaptation_window_minutes as u64 * 60),
227            ))
228        } else {
229            None
230        };
231
232        let hysteresis_controller = HysteresisController::new(
233            config.consecutive_signals_required,
234            Duration::from_secs(config.scale_up_cooldown_seconds),
235            Duration::from_secs(config.scale_down_cooldown_seconds),
236        );
237
238        Self {
239            broker,
240            config,
241            adaptive_controller,
242            hysteresis_controller,
243            metrics_history: Vec::new(),
244            start_time: Instant::now(),
245        }
246    }
247
248    pub async fn collect_metrics(&self) -> Result<AutoScalerMetrics, TaskQueueError> {
249        let active_workers = self.broker.get_active_workers().await?;
250
251        let queues = [
252            queue_names::DEFAULT,
253            queue_names::HIGH_PRIORITY,
254            queue_names::LOW_PRIORITY,
255        ];
256        let mut queue_metrics = Vec::new();
257        let mut total_pending_tasks = 0;
258        let mut total_processed_tasks = 0;
259        let mut total_failed_tasks = 0;
260
261        for queue in &queues {
262            let metrics = self.broker.get_queue_metrics(queue).await?;
263            total_pending_tasks += metrics.pending_tasks;
264            total_processed_tasks += metrics.processed_tasks;
265            total_failed_tasks += metrics.failed_tasks;
266            queue_metrics.push(metrics);
267        }
268
269        // Calculate enhanced metrics
270        let queue_pressure_score =
271            self.calculate_queue_pressure_score(&queue_metrics, active_workers);
272        let worker_utilization = self.calculate_worker_utilization(active_workers, &queue_metrics);
273        let task_complexity_factor = self.calculate_task_complexity_factor(&queue_metrics);
274        let error_rate = self.calculate_error_rate(total_processed_tasks, total_failed_tasks);
275        let memory_pressure_mb = self.estimate_memory_pressure(active_workers);
276        let avg_queue_wait_time_ms = self.calculate_avg_queue_wait_time(&queue_metrics);
277        let throughput_trend = self.calculate_throughput_trend(&queue_metrics);
278
279        Ok(AutoScalerMetrics {
280            active_workers,
281            total_pending_tasks,
282            queue_metrics,
283            queue_pressure_score,
284            worker_utilization,
285            task_complexity_factor,
286            error_rate,
287            memory_pressure_mb,
288            avg_queue_wait_time_ms,
289            throughput_trend,
290        })
291    }
292
293    pub fn decide_scaling_action(
294        &mut self,
295        metrics: &AutoScalerMetrics,
296    ) -> Result<ScalingAction, TaskQueueError> {
297        let current_workers = metrics.active_workers as usize;
298
299        // Get current thresholds (potentially adaptive)
300        let thresholds = if let Some(ref adaptive) = self.adaptive_controller {
301            adaptive.get_current_thresholds().clone()
302        } else {
303            self.config.scaling_triggers.clone()
304        };
305
306        // Multi-dimensional scaling decision
307        let scale_up_signals = Self::count_scale_up_signals_static(metrics, &thresholds);
308        let scale_down_signals = Self::count_scale_down_signals_static(metrics, &thresholds);
309
310        let proposed_action = if scale_up_signals >= 2 && current_workers < self.config.max_workers
311        {
312            let scale_count = std::cmp::min(
313                self.config.scale_up_count,
314                self.config.max_workers - current_workers,
315            );
316            ScalingAction::ScaleUp(scale_count)
317        } else if scale_down_signals >= 2 && current_workers > self.config.min_workers {
318            let scale_count = std::cmp::min(
319                self.config.scale_down_count,
320                current_workers - self.config.min_workers,
321            );
322            ScalingAction::ScaleDown(scale_count)
323        } else {
324            ScalingAction::NoAction
325        };
326
327        // Apply hysteresis control
328        let final_action = if self
329            .hysteresis_controller
330            .should_execute_scaling(&proposed_action)
331        {
332            #[cfg(feature = "tracing")]
333            match &proposed_action {
334                ScalingAction::ScaleUp(count) => {
335                    tracing::info!(
336                        "Enhanced auto-scaling up: queue_pressure={:.2}, utilization={:.2}, complexity={:.2}, adding {} workers",
337                        metrics.queue_pressure_score,
338                        metrics.worker_utilization,
339                        metrics.task_complexity_factor,
340                        count
341                    );
342                }
343                ScalingAction::ScaleDown(count) => {
344                    tracing::info!(
345                        "Enhanced auto-scaling down: queue_pressure={:.2}, utilization={:.2}, removing {} workers",
346                        metrics.queue_pressure_score,
347                        metrics.worker_utilization,
348                        count
349                    );
350                }
351                _ => {}
352            }
353
354            proposed_action
355        } else {
356            ScalingAction::NoAction
357        };
358
359        // Record for adaptive learning
360        if let Some(ref mut adaptive) = self.adaptive_controller {
361            adaptive.record_performance_snapshot(metrics, &final_action);
362        }
363
364        // Store metrics history for trend analysis
365        self.metrics_history.push(metrics.clone());
366        if self.metrics_history.len() > 100 {
367            self.metrics_history.remove(0);
368        }
369
370        Ok(final_action)
371    }
372
373    fn count_scale_up_signals_static(
374        metrics: &AutoScalerMetrics,
375        thresholds: &ScalingTriggers,
376    ) -> usize {
377        let mut signals = 0;
378
379        if metrics.queue_pressure_score > thresholds.queue_pressure_threshold {
380            signals += 1;
381        }
382        if metrics.worker_utilization > thresholds.worker_utilization_threshold {
383            signals += 1;
384        }
385        if metrics.task_complexity_factor > thresholds.task_complexity_threshold {
386            signals += 1;
387        }
388        if metrics.error_rate > thresholds.error_rate_threshold {
389            signals += 1;
390        }
391        if metrics.memory_pressure_mb > thresholds.memory_pressure_threshold {
392            signals += 1;
393        }
394
395        signals
396    }
397
398    fn count_scale_down_signals_static(
399        metrics: &AutoScalerMetrics,
400        thresholds: &ScalingTriggers,
401    ) -> usize {
402        let mut signals = 0;
403
404        // Scale down when metrics are well below thresholds
405        if metrics.queue_pressure_score < thresholds.queue_pressure_threshold * 0.3 {
406            signals += 1;
407        }
408        if metrics.worker_utilization < thresholds.worker_utilization_threshold * 0.4 {
409            signals += 1;
410        }
411        if metrics.task_complexity_factor < thresholds.task_complexity_threshold * 0.5 {
412            signals += 1;
413        }
414        if metrics.error_rate < thresholds.error_rate_threshold * 0.2 {
415            signals += 1;
416        }
417        if metrics.memory_pressure_mb < thresholds.memory_pressure_threshold * 0.5 {
418            signals += 1;
419        }
420
421        signals
422    }
423
424    fn calculate_queue_pressure_score(
425        &self,
426        queue_metrics: &[QueueMetrics],
427        active_workers: i64,
428    ) -> f64 {
429        let mut weighted_pressure = 0.0;
430        let queue_weights = [
431            (queue_names::HIGH_PRIORITY, 3.0),
432            (queue_names::DEFAULT, 1.0),
433            (queue_names::LOW_PRIORITY, 0.3),
434        ];
435
436        for metrics in queue_metrics {
437            let weight = queue_weights
438                .iter()
439                .find(|(name, _)| *name == metrics.queue_name)
440                .map(|(_, w)| *w)
441                .unwrap_or(1.0);
442
443            let queue_pressure = if active_workers > 0 {
444                metrics.pending_tasks as f64 / active_workers as f64
445            } else {
446                metrics.pending_tasks as f64
447            };
448
449            weighted_pressure += queue_pressure * weight;
450        }
451
452        weighted_pressure / queue_weights.len() as f64
453    }
454
455    fn calculate_worker_utilization(
456        &self,
457        active_workers: i64,
458        queue_metrics: &[QueueMetrics],
459    ) -> f64 {
460        if active_workers == 0 {
461            return 0.0;
462        }
463
464        let total_work = queue_metrics
465            .iter()
466            .map(|m| m.pending_tasks + m.processed_tasks)
467            .sum::<i64>();
468
469        let utilization = total_work as f64 / (active_workers as f64 * 100.0);
470        utilization.min(1.0)
471    }
472
473    fn calculate_task_complexity_factor(&self, _queue_metrics: &[QueueMetrics]) -> f64 {
474        // Simplified complexity calculation
475        // In a real implementation, this would analyze historical execution times
476        1.0
477    }
478
479    fn calculate_error_rate(&self, processed: i64, failed: i64) -> f64 {
480        let total = processed + failed;
481        if total == 0 {
482            return 0.0;
483        }
484        failed as f64 / total as f64
485    }
486
487    fn estimate_memory_pressure(&self, active_workers: i64) -> f64 {
488        // Simplified memory estimation
489        // In a real implementation, this would query actual memory usage
490        active_workers as f64 * 50.0 // Assume 50MB per worker baseline
491    }
492
493    fn calculate_avg_queue_wait_time(&self, _queue_metrics: &[QueueMetrics]) -> f64 {
494        // Simplified calculation
495        // In a real implementation, this would track actual wait times
496        0.0
497    }
498
499    fn calculate_throughput_trend(&self, _queue_metrics: &[QueueMetrics]) -> f64 {
500        // Simplified trend calculation
501        // In a real implementation, this would analyze historical throughput
502        0.0
503    }
504
505    pub async fn get_scaling_recommendations(&self) -> Result<String, TaskQueueError> {
506        let metrics = self.collect_metrics().await?;
507        let mut scaling_decision_copy = self.clone();
508        let action = scaling_decision_copy.decide_scaling_action(&metrics)?;
509
510        let mut report = format!(
511            "Enhanced Auto-scaling Status Report:\n\
512             - Active Workers: {}\n\
513             - Total Pending Tasks: {}\n\
514             - Queue Pressure Score: {:.2}\n\
515             - Worker Utilization: {:.1}%\n\
516             - Task Complexity Factor: {:.2}\n\
517             - Error Rate: {:.1}%\n\
518             - Memory Pressure: {:.1} MB\n\n",
519            metrics.active_workers,
520            metrics.total_pending_tasks,
521            metrics.queue_pressure_score,
522            metrics.worker_utilization * 100.0,
523            metrics.task_complexity_factor,
524            metrics.error_rate * 100.0,
525            metrics.memory_pressure_mb
526        );
527
528        for queue_metric in &metrics.queue_metrics {
529            report.push_str(&format!(
530                "Queue '{}': {} pending, {} processed, {} failed\n",
531                queue_metric.queue_name,
532                queue_metric.pending_tasks,
533                queue_metric.processed_tasks,
534                queue_metric.failed_tasks
535            ));
536        }
537
538        report.push_str(&format!("\nRecommended Action: {:?}\n", action));
539
540        if let Some(ref adaptive) = self.adaptive_controller {
541            report.push_str(&format!(
542                "\nAdaptive Thresholds Enabled: {}\n",
543                adaptive.get_adaptation_status()
544            ));
545        }
546
547        Ok(report)
548    }
549}
550
551// Clone implementation for AutoScaler (needed for decision making without borrowing issues)
552impl Clone for AutoScaler {
553    fn clone(&self) -> Self {
554        let adaptive_controller = self.adaptive_controller.clone();
555        let hysteresis_controller = self.hysteresis_controller.clone();
556
557        Self {
558            broker: self.broker.clone(),
559            config: self.config.clone(),
560            adaptive_controller,
561            hysteresis_controller,
562            metrics_history: self.metrics_history.clone(),
563            start_time: self.start_time,
564        }
565    }
566}
567
568// Clone implementation for AutoScalerMetrics
569impl Clone for AutoScalerMetrics {
570    fn clone(&self) -> Self {
571        Self {
572            active_workers: self.active_workers,
573            total_pending_tasks: self.total_pending_tasks,
574            queue_metrics: self.queue_metrics.clone(),
575            queue_pressure_score: self.queue_pressure_score,
576            worker_utilization: self.worker_utilization,
577            task_complexity_factor: self.task_complexity_factor,
578            error_rate: self.error_rate,
579            memory_pressure_mb: self.memory_pressure_mb,
580            avg_queue_wait_time_ms: self.avg_queue_wait_time_ms,
581            throughput_trend: self.throughput_trend,
582        }
583    }
584}
585
586impl AdaptiveThresholdController {
587    pub fn new(
588        initial_thresholds: ScalingTriggers,
589        learning_rate: f64,
590        target_sla: SLATargets,
591        adaptation_window: Duration,
592    ) -> Self {
593        Self {
594            current_thresholds: initial_thresholds,
595            performance_history: Vec::new(),
596            last_adaptation: Instant::now(),
597            learning_rate,
598            target_sla,
599            adaptation_window,
600        }
601    }
602
603    pub fn get_current_thresholds(&self) -> &ScalingTriggers {
604        &self.current_thresholds
605    }
606
607    pub fn record_performance_snapshot(
608        &mut self,
609        metrics: &AutoScalerMetrics,
610        action: &ScalingAction,
611    ) {
612        let snapshot = PerformanceSnapshot {
613            timestamp: Instant::now(),
614            latency_p95_ms: 0.0, // Would be filled from actual metrics
615            success_rate: 1.0 - metrics.error_rate,
616            queue_wait_time_ms: metrics.avg_queue_wait_time_ms,
617            worker_utilization: metrics.worker_utilization,
618            scaling_action_taken: match action {
619                ScalingAction::NoAction => None,
620                _ => Some(action.clone()),
621            },
622        };
623
624        self.performance_history.push(snapshot);
625
626        // Keep only recent history
627        let cutoff_time = Instant::now() - self.adaptation_window;
628        self.performance_history
629            .retain(|s| s.timestamp > cutoff_time);
630
631        // Adapt thresholds if enough time has passed
632        if self.last_adaptation.elapsed() > Duration::from_secs(300) {
633            self.adapt_thresholds();
634            self.last_adaptation = Instant::now();
635        }
636    }
637
638    fn adapt_thresholds(&mut self) {
639        if self.performance_history.is_empty() {
640            return;
641        }
642
643        let recent_performance =
644            &self.performance_history[self.performance_history.len().saturating_sub(10)..];
645
646        let avg_success_rate = recent_performance
647            .iter()
648            .map(|s| s.success_rate)
649            .sum::<f64>()
650            / recent_performance.len() as f64;
651
652        let avg_utilization = recent_performance
653            .iter()
654            .map(|s| s.worker_utilization)
655            .sum::<f64>()
656            / recent_performance.len() as f64;
657
658        // Adapt queue pressure threshold based on success rate
659        if avg_success_rate < self.target_sla.min_success_rate {
660            // Lower threshold to scale up more aggressively
661            self.current_thresholds.queue_pressure_threshold *= 1.0 - self.learning_rate;
662        } else if avg_success_rate > self.target_sla.min_success_rate + 0.02 {
663            // Raise threshold to scale up less aggressively
664            self.current_thresholds.queue_pressure_threshold *= 1.0 + (self.learning_rate * 0.5);
665        }
666
667        // Adapt utilization threshold based on target utilization
668        let utilization_diff = avg_utilization - self.target_sla.target_worker_utilization;
669        if utilization_diff.abs() > 0.1 {
670            let adjustment = -utilization_diff * self.learning_rate;
671            self.current_thresholds.worker_utilization_threshold *= 1.0 + adjustment;
672        }
673
674        // Clamp thresholds to reasonable ranges
675        self.current_thresholds.queue_pressure_threshold = self
676            .current_thresholds
677            .queue_pressure_threshold
678            .clamp(0.1, 2.0);
679        self.current_thresholds.worker_utilization_threshold = self
680            .current_thresholds
681            .worker_utilization_threshold
682            .clamp(0.1, 0.95);
683    }
684
685    pub fn get_adaptation_status(&self) -> String {
686        format!(
687            "Thresholds adapted {} times, {} performance samples",
688            0,
689            self.performance_history.len()
690        )
691    }
692}
693
694impl Clone for AdaptiveThresholdController {
695    fn clone(&self) -> Self {
696        Self {
697            current_thresholds: self.current_thresholds.clone(),
698            performance_history: self.performance_history.clone(),
699            last_adaptation: self.last_adaptation,
700            learning_rate: self.learning_rate,
701            target_sla: self.target_sla.clone(),
702            adaptation_window: self.adaptation_window,
703        }
704    }
705}
706
707impl HysteresisController {
708    pub fn new(
709        consecutive_signals_required: usize,
710        scale_up_cooldown: Duration,
711        scale_down_cooldown: Duration,
712    ) -> Self {
713        Self {
714            scale_up_consecutive_signals: 0,
715            scale_down_consecutive_signals: 0,
716            required_consecutive_signals: consecutive_signals_required,
717            last_scaling_action: None,
718            scale_up_cooldown,
719            scale_down_cooldown,
720        }
721    }
722
723    pub fn should_execute_scaling(&mut self, proposed_action: &ScalingAction) -> bool {
724        // Check cooldown periods
725        if let Some((last_time, ref last_action)) = self.last_scaling_action {
726            let cooldown_period = match last_action {
727                ScalingAction::ScaleUp(_) => self.scale_up_cooldown,
728                ScalingAction::ScaleDown(_) => self.scale_down_cooldown,
729                ScalingAction::NoAction => Duration::from_secs(0),
730            };
731
732            if last_time.elapsed() < cooldown_period {
733                return false;
734            }
735        }
736
737        // Check consecutive signals
738        let should_execute = match proposed_action {
739            ScalingAction::ScaleUp(_) => {
740                self.scale_up_consecutive_signals += 1;
741                self.scale_down_consecutive_signals = 0;
742                self.scale_up_consecutive_signals >= self.required_consecutive_signals
743            }
744            ScalingAction::ScaleDown(_) => {
745                self.scale_down_consecutive_signals += 1;
746                self.scale_up_consecutive_signals = 0;
747                self.scale_down_consecutive_signals >= self.required_consecutive_signals
748            }
749            ScalingAction::NoAction => {
750                self.scale_up_consecutive_signals = 0;
751                self.scale_down_consecutive_signals = 0;
752                false
753            }
754        };
755
756        if should_execute {
757            self.last_scaling_action = Some((Instant::now(), proposed_action.clone()));
758            self.scale_up_consecutive_signals = 0;
759            self.scale_down_consecutive_signals = 0;
760        }
761
762        should_execute
763    }
764}
765
766impl Clone for HysteresisController {
767    fn clone(&self) -> Self {
768        Self {
769            scale_up_consecutive_signals: self.scale_up_consecutive_signals,
770            scale_down_consecutive_signals: self.scale_down_consecutive_signals,
771            required_consecutive_signals: self.required_consecutive_signals,
772            last_scaling_action: self.last_scaling_action.clone(),
773            scale_up_cooldown: self.scale_up_cooldown,
774            scale_down_cooldown: self.scale_down_cooldown,
775        }
776    }
777}
778
779impl Clone for ScalingAction {
780    fn clone(&self) -> Self {
781        match self {
782            ScalingAction::ScaleUp(count) => ScalingAction::ScaleUp(*count),
783            ScalingAction::ScaleDown(count) => ScalingAction::ScaleDown(*count),
784            ScalingAction::NoAction => ScalingAction::NoAction,
785        }
786    }
787}
788
789#[cfg(test)]
790mod tests {
791    use super::*;
792    use crate::broker::QueueMetrics;
793
794    #[test]
795    fn test_autoscaler_config_default() {
796        let config = AutoScalerConfig::default();
797
798        assert_eq!(config.min_workers, 1);
799        assert_eq!(config.max_workers, 20);
800        assert_eq!(config.scale_up_count, 2);
801        assert_eq!(config.scale_down_count, 1);
802        assert_eq!(config.scaling_triggers.queue_pressure_threshold, 0.75);
803        assert_eq!(config.scaling_triggers.worker_utilization_threshold, 0.80);
804        assert_eq!(config.scaling_triggers.task_complexity_threshold, 1.5);
805        assert_eq!(config.scaling_triggers.error_rate_threshold, 0.05);
806        assert_eq!(config.scaling_triggers.memory_pressure_threshold, 512.0);
807        assert!(config.enable_adaptive_thresholds);
808        assert_eq!(config.learning_rate, 0.1);
809        assert_eq!(config.adaptation_window_minutes, 30);
810        assert_eq!(config.scale_up_cooldown_seconds, 60);
811        assert_eq!(config.scale_down_cooldown_seconds, 300);
812        assert_eq!(config.consecutive_signals_required, 2);
813        assert_eq!(config.target_sla.max_p95_latency_ms, 5000.0);
814        assert_eq!(config.target_sla.min_success_rate, 0.95);
815        assert_eq!(config.target_sla.max_queue_wait_time_ms, 10000.0);
816        assert_eq!(config.target_sla.target_worker_utilization, 0.70);
817    }
818
819    #[test]
820    fn test_autoscaler_config_validation_valid() {
821        let config = AutoScalerConfig {
822            min_workers: 2,
823            max_workers: 10,
824            scale_up_count: 3,
825            scale_down_count: 2,
826            scaling_triggers: ScalingTriggers {
827                queue_pressure_threshold: 0.75,
828                worker_utilization_threshold: 0.80,
829                task_complexity_threshold: 1.5,
830                error_rate_threshold: 0.05,
831                memory_pressure_threshold: 512.0,
832            },
833            enable_adaptive_thresholds: true,
834            learning_rate: 0.1,
835            adaptation_window_minutes: 30,
836            scale_up_cooldown_seconds: 60,
837            scale_down_cooldown_seconds: 300,
838            consecutive_signals_required: 2,
839            target_sla: SLATargets {
840                max_p95_latency_ms: 5000.0,
841                min_success_rate: 0.95,
842                max_queue_wait_time_ms: 10000.0,
843                target_worker_utilization: 0.70,
844            },
845        };
846
847        assert!(config.validate().is_ok());
848    }
849
850    #[test]
851    fn test_autoscaler_config_validation_min_workers_zero() {
852        let config = AutoScalerConfig {
853            min_workers: 0,
854            max_workers: 10,
855            scale_up_count: 2,
856            scale_down_count: 1,
857            scaling_triggers: ScalingTriggers {
858                queue_pressure_threshold: 0.75,
859                worker_utilization_threshold: 0.80,
860                task_complexity_threshold: 1.5,
861                error_rate_threshold: 0.05,
862                memory_pressure_threshold: 512.0,
863            },
864            enable_adaptive_thresholds: true,
865            learning_rate: 0.1,
866            adaptation_window_minutes: 30,
867            scale_up_cooldown_seconds: 60,
868            scale_down_cooldown_seconds: 300,
869            consecutive_signals_required: 2,
870            target_sla: SLATargets {
871                max_p95_latency_ms: 5000.0,
872                min_success_rate: 0.95,
873                max_queue_wait_time_ms: 10000.0,
874                target_worker_utilization: 0.70,
875            },
876        };
877
878        let result = config.validate();
879        assert!(result.is_err());
880        assert!(result
881            .unwrap_err()
882            .to_string()
883            .contains("Minimum workers must be greater than 0"));
884    }
885
886    #[test]
887    fn test_autoscaler_config_validation_max_less_than_min() {
888        let config = AutoScalerConfig {
889            min_workers: 10,
890            max_workers: 5,
891            scale_up_count: 2,
892            scale_down_count: 1,
893            scaling_triggers: ScalingTriggers {
894                queue_pressure_threshold: 0.75,
895                worker_utilization_threshold: 0.80,
896                task_complexity_threshold: 1.5,
897                error_rate_threshold: 0.05,
898                memory_pressure_threshold: 512.0,
899            },
900            enable_adaptive_thresholds: true,
901            learning_rate: 0.1,
902            adaptation_window_minutes: 30,
903            scale_up_cooldown_seconds: 60,
904            scale_down_cooldown_seconds: 300,
905            consecutive_signals_required: 2,
906            target_sla: SLATargets {
907                max_p95_latency_ms: 5000.0,
908                min_success_rate: 0.95,
909                max_queue_wait_time_ms: 10000.0,
910                target_worker_utilization: 0.70,
911            },
912        };
913
914        let result = config.validate();
915        assert!(result.is_err());
916        assert!(result
917            .unwrap_err()
918            .to_string()
919            .contains("Maximum workers must be greater than or equal to minimum workers"));
920    }
921
922    #[test]
923    fn test_autoscaler_config_validation_max_workers_too_high() {
924        let config = AutoScalerConfig {
925            min_workers: 1,
926            max_workers: 1001,
927            scale_up_count: 2,
928            scale_down_count: 1,
929            scaling_triggers: ScalingTriggers {
930                queue_pressure_threshold: 0.75,
931                worker_utilization_threshold: 0.80,
932                task_complexity_threshold: 1.5,
933                error_rate_threshold: 0.05,
934                memory_pressure_threshold: 512.0,
935            },
936            enable_adaptive_thresholds: true,
937            learning_rate: 0.1,
938            adaptation_window_minutes: 30,
939            scale_up_cooldown_seconds: 60,
940            scale_down_cooldown_seconds: 300,
941            consecutive_signals_required: 2,
942            target_sla: SLATargets {
943                max_p95_latency_ms: 5000.0,
944                min_success_rate: 0.95,
945                max_queue_wait_time_ms: 10000.0,
946                target_worker_utilization: 0.70,
947            },
948        };
949
950        let result = config.validate();
951        assert!(result.is_err());
952        assert!(result
953            .unwrap_err()
954            .to_string()
955            .contains("Maximum workers cannot exceed 1000"));
956    }
957
958    #[test]
959    fn test_autoscaler_config_validation_invalid_scale_up_count() {
960        let config = AutoScalerConfig {
961            min_workers: 1,
962            max_workers: 10,
963            scale_up_count: 0,
964            scale_down_count: 1,
965            scaling_triggers: ScalingTriggers {
966                queue_pressure_threshold: 0.75,
967                worker_utilization_threshold: 0.80,
968                task_complexity_threshold: 1.5,
969                error_rate_threshold: 0.05,
970                memory_pressure_threshold: 512.0,
971            },
972            enable_adaptive_thresholds: true,
973            learning_rate: 0.1,
974            adaptation_window_minutes: 30,
975            scale_up_cooldown_seconds: 60,
976            scale_down_cooldown_seconds: 300,
977            consecutive_signals_required: 2,
978            target_sla: SLATargets {
979                max_p95_latency_ms: 5000.0,
980                min_success_rate: 0.95,
981                max_queue_wait_time_ms: 10000.0,
982                target_worker_utilization: 0.70,
983            },
984        };
985
986        let result = config.validate();
987        assert!(result.is_err());
988        assert!(result
989            .unwrap_err()
990            .to_string()
991            .contains("Scale up count must be between 1 and 50"));
992    }
993
994    #[test]
995    fn test_autoscaler_config_validation_invalid_scale_down_count() {
996        let config = AutoScalerConfig {
997            min_workers: 1,
998            max_workers: 10,
999            scale_up_count: 2,
1000            scale_down_count: 0,
1001            scaling_triggers: ScalingTriggers {
1002                queue_pressure_threshold: 0.75,
1003                worker_utilization_threshold: 0.80,
1004                task_complexity_threshold: 1.5,
1005                error_rate_threshold: 0.05,
1006                memory_pressure_threshold: 512.0,
1007            },
1008            enable_adaptive_thresholds: true,
1009            learning_rate: 0.1,
1010            adaptation_window_minutes: 30,
1011            scale_up_cooldown_seconds: 60,
1012            scale_down_cooldown_seconds: 300,
1013            consecutive_signals_required: 2,
1014            target_sla: SLATargets {
1015                max_p95_latency_ms: 5000.0,
1016                min_success_rate: 0.95,
1017                max_queue_wait_time_ms: 10000.0,
1018                target_worker_utilization: 0.70,
1019            },
1020        };
1021
1022        let result = config.validate();
1023        assert!(result.is_err());
1024        assert!(result
1025            .unwrap_err()
1026            .to_string()
1027            .contains("Scale down count must be between 1 and 50"));
1028    }
1029
1030    #[test]
1031    fn test_autoscaler_config_validation_invalid_scaling_triggers() {
1032        let config = AutoScalerConfig {
1033            min_workers: 1,
1034            max_workers: 10,
1035            scale_up_count: 2,
1036            scale_down_count: 1,
1037            scaling_triggers: ScalingTriggers {
1038                queue_pressure_threshold: 0.0,
1039                worker_utilization_threshold: 0.0,
1040                task_complexity_threshold: 0.0,
1041                error_rate_threshold: 0.0,
1042                memory_pressure_threshold: 0.0,
1043            },
1044            enable_adaptive_thresholds: true,
1045            learning_rate: 0.1,
1046            adaptation_window_minutes: 30,
1047            scale_up_cooldown_seconds: 60,
1048            scale_down_cooldown_seconds: 300,
1049            consecutive_signals_required: 2,
1050            target_sla: SLATargets {
1051                max_p95_latency_ms: 5000.0,
1052                min_success_rate: 0.95,
1053                max_queue_wait_time_ms: 10000.0,
1054                target_worker_utilization: 0.70,
1055            },
1056        };
1057
1058        let result = config.validate();
1059        assert!(result.is_err());
1060        let error_msg = result.unwrap_err().to_string();
1061        assert!(error_msg.contains("Queue pressure threshold must be between 0.1 and 2.0"));
1062    }
1063
1064    #[test]
1065    fn test_autoscaler_config_validation_invalid_learning_rate() {
1066        let config = AutoScalerConfig {
1067            min_workers: 1,
1068            max_workers: 10,
1069            scale_up_count: 2,
1070            scale_down_count: 1,
1071            scaling_triggers: ScalingTriggers {
1072                queue_pressure_threshold: 0.75,
1073                worker_utilization_threshold: 0.80,
1074                task_complexity_threshold: 1.5,
1075                error_rate_threshold: 0.05,
1076                memory_pressure_threshold: 512.0,
1077            },
1078            enable_adaptive_thresholds: true,
1079            learning_rate: 0.0,
1080            adaptation_window_minutes: 30,
1081            scale_up_cooldown_seconds: 60,
1082            scale_down_cooldown_seconds: 300,
1083            consecutive_signals_required: 2,
1084            target_sla: SLATargets {
1085                max_p95_latency_ms: 5000.0,
1086                min_success_rate: 0.95,
1087                max_queue_wait_time_ms: 10000.0,
1088                target_worker_utilization: 0.70,
1089            },
1090        };
1091
1092        let result = config.validate();
1093        assert!(result.is_err());
1094        assert!(result
1095            .unwrap_err()
1096            .to_string()
1097            .contains("Learning rate must be between 0.01 and 1.0"));
1098    }
1099
1100    #[test]
1101    fn test_autoscaler_config_serialization() {
1102        let config = AutoScalerConfig::default();
1103
1104        // Test JSON serialization
1105        let json = serde_json::to_string(&config).expect("Failed to serialize to JSON");
1106        let deserialized: AutoScalerConfig =
1107            serde_json::from_str(&json).expect("Failed to deserialize from JSON");
1108
1109        assert_eq!(config.min_workers, deserialized.min_workers);
1110        assert_eq!(config.max_workers, deserialized.max_workers);
1111        assert_eq!(config.scale_up_count, deserialized.scale_up_count);
1112        assert_eq!(config.scale_down_count, deserialized.scale_down_count);
1113        assert_eq!(
1114            config.scaling_triggers.queue_pressure_threshold,
1115            deserialized.scaling_triggers.queue_pressure_threshold
1116        );
1117        assert_eq!(
1118            config.scaling_triggers.worker_utilization_threshold,
1119            deserialized.scaling_triggers.worker_utilization_threshold
1120        );
1121        assert_eq!(
1122            config.scaling_triggers.task_complexity_threshold,
1123            deserialized.scaling_triggers.task_complexity_threshold
1124        );
1125        assert_eq!(
1126            config.scaling_triggers.error_rate_threshold,
1127            deserialized.scaling_triggers.error_rate_threshold
1128        );
1129        assert_eq!(
1130            config.scaling_triggers.memory_pressure_threshold,
1131            deserialized.scaling_triggers.memory_pressure_threshold
1132        );
1133        assert_eq!(
1134            config.enable_adaptive_thresholds,
1135            deserialized.enable_adaptive_thresholds
1136        );
1137        assert_eq!(config.learning_rate, deserialized.learning_rate);
1138        assert_eq!(
1139            config.adaptation_window_minutes,
1140            deserialized.adaptation_window_minutes
1141        );
1142        assert_eq!(
1143            config.scale_up_cooldown_seconds,
1144            deserialized.scale_up_cooldown_seconds
1145        );
1146        assert_eq!(
1147            config.scale_down_cooldown_seconds,
1148            deserialized.scale_down_cooldown_seconds
1149        );
1150        assert_eq!(
1151            config.consecutive_signals_required,
1152            deserialized.consecutive_signals_required
1153        );
1154        assert_eq!(
1155            config.target_sla.max_p95_latency_ms,
1156            deserialized.target_sla.max_p95_latency_ms
1157        );
1158        assert_eq!(
1159            config.target_sla.min_success_rate,
1160            deserialized.target_sla.min_success_rate
1161        );
1162        assert_eq!(
1163            config.target_sla.max_queue_wait_time_ms,
1164            deserialized.target_sla.max_queue_wait_time_ms
1165        );
1166        assert_eq!(
1167            config.target_sla.target_worker_utilization,
1168            deserialized.target_sla.target_worker_utilization
1169        );
1170    }
1171
1172    fn create_test_autoscaler_metrics(
1173        active_workers: i64,
1174        total_pending_tasks: i64,
1175    ) -> AutoScalerMetrics {
1176        let queue_pressure_score = if active_workers > 0 {
1177            total_pending_tasks as f64 / active_workers as f64
1178        } else {
1179            total_pending_tasks as f64
1180        };
1181
1182        AutoScalerMetrics {
1183            active_workers,
1184            total_pending_tasks,
1185            queue_metrics: vec![QueueMetrics {
1186                queue_name: "default".to_string(),
1187                pending_tasks: total_pending_tasks,
1188                processed_tasks: 100,
1189                failed_tasks: 5,
1190            }],
1191            queue_pressure_score,
1192            worker_utilization: if active_workers > 0 { 0.6 } else { 0.0 },
1193            task_complexity_factor: 1.0,
1194            error_rate: 5.0 / 105.0, // 5 failed out of 105 total
1195            memory_pressure_mb: active_workers as f64 * 50.0,
1196            avg_queue_wait_time_ms: 0.0,
1197            throughput_trend: 0.0,
1198        }
1199    }
1200
1201    fn create_test_autoscaler_metrics_with_high_pressure(
1202        active_workers: i64,
1203        total_pending_tasks: i64,
1204    ) -> AutoScalerMetrics {
1205        let queue_pressure_score = if active_workers > 0 {
1206            total_pending_tasks as f64 / active_workers as f64
1207        } else {
1208            total_pending_tasks as f64
1209        };
1210
1211        AutoScalerMetrics {
1212            active_workers,
1213            total_pending_tasks,
1214            queue_metrics: vec![QueueMetrics {
1215                queue_name: "default".to_string(),
1216                pending_tasks: total_pending_tasks,
1217                processed_tasks: 100,
1218                failed_tasks: 5,
1219            }],
1220            queue_pressure_score,
1221            worker_utilization: 0.95,    // High utilization
1222            task_complexity_factor: 2.0, // High complexity
1223            error_rate: 0.08,            // High error rate
1224            memory_pressure_mb: 600.0,   // High memory pressure
1225            avg_queue_wait_time_ms: 0.0,
1226            throughput_trend: 0.0,
1227        }
1228    }
1229
1230    fn create_test_autoscaler_metrics_with_low_pressure(
1231        active_workers: i64,
1232        total_pending_tasks: i64,
1233    ) -> AutoScalerMetrics {
1234        let queue_pressure_score = if active_workers > 0 {
1235            total_pending_tasks as f64 / active_workers as f64
1236        } else {
1237            total_pending_tasks as f64
1238        };
1239
1240        AutoScalerMetrics {
1241            active_workers,
1242            total_pending_tasks,
1243            queue_metrics: vec![QueueMetrics {
1244                queue_name: "default".to_string(),
1245                pending_tasks: total_pending_tasks,
1246                processed_tasks: 100,
1247                failed_tasks: 5,
1248            }],
1249            queue_pressure_score,
1250            worker_utilization: 0.2,     // Low utilization
1251            task_complexity_factor: 0.5, // Low complexity
1252            error_rate: 0.01,            // Low error rate
1253            memory_pressure_mb: 100.0,   // Low memory pressure
1254            avg_queue_wait_time_ms: 0.0,
1255            throughput_trend: 0.0,
1256        }
1257    }
1258
1259    #[allow(dead_code)]
1260    fn create_mock_autoscaler() -> AutoScaler {
1261        // Create a dummy broker for testing purposes
1262        let redis_url = "redis://localhost:6379";
1263        let mut config = deadpool_redis::Config::from_url(redis_url);
1264        config.pool = Some(deadpool_redis::PoolConfig::new(1));
1265
1266        let pool = config
1267            .create_pool(Some(deadpool_redis::Runtime::Tokio1))
1268            .expect("Failed to create pool");
1269
1270        let broker = Arc::new(crate::broker::RedisBroker { pool });
1271        AutoScaler::new(broker)
1272    }
1273
1274    #[test]
1275    fn test_scaling_action_scale_up() {
1276        let config = AutoScalerConfig {
1277            min_workers: 1,
1278            max_workers: 10,
1279            scale_up_count: 2,
1280            scale_down_count: 1,
1281            scaling_triggers: ScalingTriggers {
1282                queue_pressure_threshold: 0.75,
1283                worker_utilization_threshold: 0.80,
1284                task_complexity_threshold: 1.5,
1285                error_rate_threshold: 0.05,
1286                memory_pressure_threshold: 512.0,
1287            },
1288            enable_adaptive_thresholds: true,
1289            learning_rate: 0.1,
1290            adaptation_window_minutes: 30,
1291            scale_up_cooldown_seconds: 60,
1292            scale_down_cooldown_seconds: 300,
1293            consecutive_signals_required: 2,
1294            target_sla: SLATargets {
1295                max_p95_latency_ms: 5000.0,
1296                min_success_rate: 0.95,
1297                max_queue_wait_time_ms: 10000.0,
1298                target_worker_utilization: 0.70,
1299            },
1300        };
1301
1302        let broker = Arc::new(crate::broker::RedisBroker {
1303            pool: deadpool_redis::Config::from_url("redis://localhost:6379")
1304                .create_pool(Some(deadpool_redis::Runtime::Tokio1))
1305                .expect("Failed to create pool"),
1306        });
1307
1308        let mut autoscaler = AutoScaler::with_config(broker, config);
1309
1310        // 2 workers, high pressure conditions that should trigger scale up
1311        let metrics = create_test_autoscaler_metrics_with_high_pressure(2, 12);
1312
1313        // Run scaling decision multiple times to satisfy consecutive signals requirement
1314        let action1 = autoscaler
1315            .decide_scaling_action(&metrics)
1316            .expect("Failed to decide scaling action");
1317        let action2 = autoscaler
1318            .decide_scaling_action(&metrics)
1319            .expect("Failed to decide scaling action");
1320
1321        // The second action should trigger scaling due to consecutive signals
1322        match action2 {
1323            ScalingAction::ScaleUp(count) => assert_eq!(count, 2),
1324            _ => panic!(
1325                "Expected ScaleUp action, got {:?}. First action was {:?}",
1326                action2, action1
1327            ),
1328        }
1329    }
1330
1331    #[test]
1332    fn test_scaling_action_scale_down() {
1333        let config = AutoScalerConfig {
1334            min_workers: 1,
1335            max_workers: 10,
1336            scale_up_count: 2,
1337            scale_down_count: 1,
1338            scaling_triggers: ScalingTriggers {
1339                queue_pressure_threshold: 0.75,
1340                worker_utilization_threshold: 0.80,
1341                task_complexity_threshold: 1.5,
1342                error_rate_threshold: 0.05,
1343                memory_pressure_threshold: 512.0,
1344            },
1345            enable_adaptive_thresholds: true,
1346            learning_rate: 0.1,
1347            adaptation_window_minutes: 30,
1348            scale_up_cooldown_seconds: 60,
1349            scale_down_cooldown_seconds: 300,
1350            consecutive_signals_required: 2,
1351            target_sla: SLATargets {
1352                max_p95_latency_ms: 5000.0,
1353                min_success_rate: 0.95,
1354                max_queue_wait_time_ms: 10000.0,
1355                target_worker_utilization: 0.70,
1356            },
1357        };
1358
1359        let broker = Arc::new(crate::broker::RedisBroker {
1360            pool: deadpool_redis::Config::from_url("redis://localhost:6379")
1361                .create_pool(Some(deadpool_redis::Runtime::Tokio1))
1362                .expect("Failed to create pool"),
1363        });
1364
1365        let mut autoscaler = AutoScaler::with_config(broker, config);
1366
1367        // 5 workers, low pressure conditions that should trigger scale down
1368        let metrics = create_test_autoscaler_metrics_with_low_pressure(5, 2);
1369
1370        // Run scaling decision multiple times to satisfy consecutive signals requirement
1371        let action1 = autoscaler
1372            .decide_scaling_action(&metrics)
1373            .expect("Failed to decide scaling action");
1374        let action2 = autoscaler
1375            .decide_scaling_action(&metrics)
1376            .expect("Failed to decide scaling action");
1377
1378        // The second action should trigger scaling due to consecutive signals
1379        match action2 {
1380            ScalingAction::ScaleDown(count) => assert_eq!(count, 1),
1381            _ => panic!(
1382                "Expected ScaleDown action, got {:?}. First action was {:?}",
1383                action2, action1
1384            ),
1385        }
1386    }
1387
1388    #[test]
1389    fn test_scaling_action_no_action() {
1390        let config = AutoScalerConfig::default();
1391
1392        let broker = Arc::new(crate::broker::RedisBroker {
1393            pool: deadpool_redis::Config::from_url("redis://localhost:6379")
1394                .create_pool(Some(deadpool_redis::Runtime::Tokio1))
1395                .expect("Failed to create pool"),
1396        });
1397
1398        let mut autoscaler = AutoScaler::with_config(broker, config);
1399
1400        // 3 workers, moderate conditions that should not trigger scaling
1401        let metrics = create_test_autoscaler_metrics(3, 9);
1402
1403        let action = autoscaler
1404            .decide_scaling_action(&metrics)
1405            .expect("Failed to decide scaling action");
1406
1407        match action {
1408            ScalingAction::NoAction => {}
1409            _ => panic!("Expected NoAction, got {:?}", action),
1410        }
1411    }
1412
1413    #[test]
1414    fn test_scaling_action_at_max_workers() {
1415        let config = AutoScalerConfig {
1416            min_workers: 1,
1417            max_workers: 5,
1418            scale_up_count: 3,
1419            scale_down_count: 1,
1420            scaling_triggers: ScalingTriggers {
1421                queue_pressure_threshold: 0.75,
1422                worker_utilization_threshold: 0.80,
1423                task_complexity_threshold: 1.5,
1424                error_rate_threshold: 0.05,
1425                memory_pressure_threshold: 512.0,
1426            },
1427            enable_adaptive_thresholds: true,
1428            learning_rate: 0.1,
1429            adaptation_window_minutes: 30,
1430            scale_up_cooldown_seconds: 60,
1431            scale_down_cooldown_seconds: 300,
1432            consecutive_signals_required: 2,
1433            target_sla: SLATargets {
1434                max_p95_latency_ms: 5000.0,
1435                min_success_rate: 0.95,
1436                max_queue_wait_time_ms: 10000.0,
1437                target_worker_utilization: 0.70,
1438            },
1439        };
1440
1441        let broker = Arc::new(crate::broker::RedisBroker {
1442            pool: deadpool_redis::Config::from_url("redis://localhost:6379")
1443                .create_pool(Some(deadpool_redis::Runtime::Tokio1))
1444                .expect("Failed to create pool"),
1445        });
1446
1447        let mut autoscaler = AutoScaler::with_config(broker, config);
1448
1449        // 5 workers (at max), 20 tasks = 4 tasks per worker (> threshold but can't scale up)
1450        let metrics = create_test_autoscaler_metrics(5, 20);
1451
1452        let action = autoscaler
1453            .decide_scaling_action(&metrics)
1454            .expect("Failed to decide scaling action");
1455
1456        match action {
1457            ScalingAction::NoAction => {}
1458            _ => panic!("Expected NoAction when at max workers"),
1459        }
1460    }
1461
1462    #[test]
1463    fn test_scaling_action_at_min_workers() {
1464        let config = AutoScalerConfig {
1465            min_workers: 3,
1466            max_workers: 10,
1467            scale_up_count: 2,
1468            scale_down_count: 2,
1469            scaling_triggers: ScalingTriggers {
1470                queue_pressure_threshold: 0.75,
1471                worker_utilization_threshold: 0.80,
1472                task_complexity_threshold: 1.5,
1473                error_rate_threshold: 0.05,
1474                memory_pressure_threshold: 512.0,
1475            },
1476            enable_adaptive_thresholds: true,
1477            learning_rate: 0.1,
1478            adaptation_window_minutes: 30,
1479            scale_up_cooldown_seconds: 60,
1480            scale_down_cooldown_seconds: 300,
1481            consecutive_signals_required: 2,
1482            target_sla: SLATargets {
1483                max_p95_latency_ms: 5000.0,
1484                min_success_rate: 0.95,
1485                max_queue_wait_time_ms: 10000.0,
1486                target_worker_utilization: 0.70,
1487            },
1488        };
1489
1490        let broker = Arc::new(crate::broker::RedisBroker {
1491            pool: deadpool_redis::Config::from_url("redis://localhost:6379")
1492                .create_pool(Some(deadpool_redis::Runtime::Tokio1))
1493                .expect("Failed to create pool"),
1494        });
1495
1496        let mut autoscaler = AutoScaler::with_config(broker, config);
1497
1498        // 3 workers (at min), 1 task = 0.33 tasks per worker (< threshold but can't scale down)
1499        let metrics = create_test_autoscaler_metrics(3, 1);
1500
1501        let action = autoscaler
1502            .decide_scaling_action(&metrics)
1503            .expect("Failed to decide scaling action");
1504
1505        match action {
1506            ScalingAction::NoAction => {}
1507            _ => panic!("Expected NoAction when at min workers"),
1508        }
1509    }
1510
1511    #[test]
1512    fn test_scaling_action_limited_by_max_workers() {
1513        let config = AutoScalerConfig {
1514            min_workers: 1,
1515            max_workers: 5,
1516            scale_up_count: 10,
1517            scale_down_count: 1,
1518            scaling_triggers: ScalingTriggers {
1519                queue_pressure_threshold: 0.75,
1520                worker_utilization_threshold: 0.80,
1521                task_complexity_threshold: 1.5,
1522                error_rate_threshold: 0.05,
1523                memory_pressure_threshold: 512.0,
1524            },
1525            enable_adaptive_thresholds: true,
1526            learning_rate: 0.1,
1527            adaptation_window_minutes: 30,
1528            scale_up_cooldown_seconds: 60,
1529            scale_down_cooldown_seconds: 300,
1530            consecutive_signals_required: 2,
1531            target_sla: SLATargets {
1532                max_p95_latency_ms: 5000.0,
1533                min_success_rate: 0.95,
1534                max_queue_wait_time_ms: 10000.0,
1535                target_worker_utilization: 0.70,
1536            },
1537        };
1538
1539        let broker = Arc::new(crate::broker::RedisBroker {
1540            pool: deadpool_redis::Config::from_url("redis://localhost:6379")
1541                .create_pool(Some(deadpool_redis::Runtime::Tokio1))
1542                .expect("Failed to create pool"),
1543        });
1544
1545        let mut autoscaler = AutoScaler::with_config(broker, config);
1546
1547        // 3 workers, high pressure conditions that should trigger scale up but limited by max
1548        let metrics = create_test_autoscaler_metrics_with_high_pressure(3, 15);
1549
1550        // Run scaling decision multiple times to satisfy consecutive signals requirement
1551        let action1 = autoscaler
1552            .decide_scaling_action(&metrics)
1553            .expect("Failed to decide scaling action");
1554        let action2 = autoscaler
1555            .decide_scaling_action(&metrics)
1556            .expect("Failed to decide scaling action");
1557
1558        // The second action should trigger scaling due to consecutive signals
1559        match action2 {
1560            ScalingAction::ScaleUp(count) => assert_eq!(count, 2), // Limited by max_workers
1561            _ => panic!(
1562                "Expected ScaleUp action limited by max workers, got {:?}. First action was {:?}",
1563                action2, action1
1564            ),
1565        }
1566    }
1567
1568    #[test]
1569    fn test_scaling_action_limited_by_min_workers() {
1570        let config = AutoScalerConfig {
1571            min_workers: 3,
1572            max_workers: 10,
1573            scale_up_count: 2,
1574            scale_down_count: 10,
1575            scaling_triggers: ScalingTriggers {
1576                queue_pressure_threshold: 0.75,
1577                worker_utilization_threshold: 0.80,
1578                task_complexity_threshold: 1.5,
1579                error_rate_threshold: 0.05,
1580                memory_pressure_threshold: 512.0,
1581            },
1582            enable_adaptive_thresholds: true,
1583            learning_rate: 0.1,
1584            adaptation_window_minutes: 30,
1585            scale_up_cooldown_seconds: 60,
1586            scale_down_cooldown_seconds: 300,
1587            consecutive_signals_required: 2,
1588            target_sla: SLATargets {
1589                max_p95_latency_ms: 5000.0,
1590                min_success_rate: 0.95,
1591                max_queue_wait_time_ms: 10000.0,
1592                target_worker_utilization: 0.70,
1593            },
1594        };
1595
1596        let broker = Arc::new(crate::broker::RedisBroker {
1597            pool: deadpool_redis::Config::from_url("redis://localhost:6379")
1598                .create_pool(Some(deadpool_redis::Runtime::Tokio1))
1599                .expect("Failed to create pool"),
1600        });
1601
1602        let mut autoscaler = AutoScaler::with_config(broker, config);
1603
1604        // 6 workers, low pressure conditions that should trigger scale down but limited by min
1605        let metrics = create_test_autoscaler_metrics_with_low_pressure(6, 3);
1606
1607        // Run scaling decision multiple times to satisfy consecutive signals requirement
1608        let action1 = autoscaler
1609            .decide_scaling_action(&metrics)
1610            .expect("Failed to decide scaling action");
1611        let action2 = autoscaler
1612            .decide_scaling_action(&metrics)
1613            .expect("Failed to decide scaling action");
1614
1615        // The second action should trigger scaling due to consecutive signals
1616        match action2 {
1617            ScalingAction::ScaleDown(count) => assert_eq!(count, 3), // Limited by min_workers
1618            _ => panic!(
1619                "Expected ScaleDown action limited by min workers, got {:?}. First action was {:?}",
1620                action2, action1
1621            ),
1622        }
1623    }
1624
1625    #[test]
1626    fn test_scaling_action_zero_workers() {
1627        let config = AutoScalerConfig::default();
1628
1629        let broker = Arc::new(crate::broker::RedisBroker {
1630            pool: deadpool_redis::Config::from_url("redis://localhost:6379")
1631                .create_pool(Some(deadpool_redis::Runtime::Tokio1))
1632                .expect("Failed to create pool"),
1633        });
1634
1635        let mut autoscaler = AutoScaler::with_config(broker, config);
1636
1637        // 0 workers, high pressure conditions (should definitely scale up)
1638        let metrics = create_test_autoscaler_metrics_with_high_pressure(0, 10);
1639
1640        // Run scaling decision multiple times to satisfy consecutive signals requirement
1641        let action1 = autoscaler
1642            .decide_scaling_action(&metrics)
1643            .expect("Failed to decide scaling action");
1644        let action2 = autoscaler
1645            .decide_scaling_action(&metrics)
1646            .expect("Failed to decide scaling action");
1647
1648        // The second action should trigger scaling due to consecutive signals
1649        match action2 {
1650            ScalingAction::ScaleUp(count) => assert_eq!(count, 2),
1651            _ => panic!(
1652                "Expected ScaleUp action with zero workers, got {:?}. First action was {:?}",
1653                action2, action1
1654            ),
1655        }
1656    }
1657
1658    #[test]
1659    fn test_autoscaler_metrics_debug() {
1660        let metrics = create_test_autoscaler_metrics(5, 25);
1661        let debug_str = format!("{:?}", metrics);
1662
1663        assert!(debug_str.contains("AutoScalerMetrics"));
1664        assert!(debug_str.contains("active_workers: 5"));
1665        assert!(debug_str.contains("total_pending_tasks: 25"));
1666        assert!(debug_str.contains("queue_pressure_score: 5"));
1667        assert!(debug_str.contains("worker_utilization: 0.6"));
1668        assert!(debug_str.contains("task_complexity_factor: 1"));
1669        assert!(debug_str.contains("error_rate:"));
1670        assert!(debug_str.contains("memory_pressure_mb: 250"));
1671        assert!(debug_str.contains("avg_queue_wait_time_ms: 0"));
1672        assert!(debug_str.contains("throughput_trend: 0"));
1673    }
1674
1675    #[test]
1676    fn test_scaling_action_debug() {
1677        let scale_up = ScalingAction::ScaleUp(3);
1678        let scale_down = ScalingAction::ScaleDown(2);
1679        let no_action = ScalingAction::NoAction;
1680
1681        assert!(format!("{:?}", scale_up).contains("ScaleUp(3)"));
1682        assert!(format!("{:?}", scale_down).contains("ScaleDown(2)"));
1683        assert!(format!("{:?}", no_action).contains("NoAction"));
1684    }
1685
1686    #[test]
1687    fn test_autoscaler_config_clone() {
1688        let original = AutoScalerConfig::default();
1689        let cloned = original.clone();
1690
1691        assert_eq!(original.min_workers, cloned.min_workers);
1692        assert_eq!(original.max_workers, cloned.max_workers);
1693        assert_eq!(original.scale_up_count, cloned.scale_up_count);
1694        assert_eq!(original.scale_down_count, cloned.scale_down_count);
1695        assert_eq!(
1696            original.scaling_triggers.queue_pressure_threshold,
1697            cloned.scaling_triggers.queue_pressure_threshold
1698        );
1699        assert_eq!(
1700            original.scaling_triggers.worker_utilization_threshold,
1701            cloned.scaling_triggers.worker_utilization_threshold
1702        );
1703        assert_eq!(
1704            original.scaling_triggers.task_complexity_threshold,
1705            cloned.scaling_triggers.task_complexity_threshold
1706        );
1707        assert_eq!(
1708            original.scaling_triggers.error_rate_threshold,
1709            cloned.scaling_triggers.error_rate_threshold
1710        );
1711        assert_eq!(
1712            original.scaling_triggers.memory_pressure_threshold,
1713            cloned.scaling_triggers.memory_pressure_threshold
1714        );
1715        assert_eq!(
1716            original.enable_adaptive_thresholds,
1717            cloned.enable_adaptive_thresholds
1718        );
1719        assert_eq!(original.learning_rate, cloned.learning_rate);
1720        assert_eq!(
1721            original.adaptation_window_minutes,
1722            cloned.adaptation_window_minutes
1723        );
1724        assert_eq!(
1725            original.scale_up_cooldown_seconds,
1726            cloned.scale_up_cooldown_seconds
1727        );
1728        assert_eq!(
1729            original.scale_down_cooldown_seconds,
1730            cloned.scale_down_cooldown_seconds
1731        );
1732        assert_eq!(
1733            original.consecutive_signals_required,
1734            cloned.consecutive_signals_required
1735        );
1736        assert_eq!(
1737            original.target_sla.max_p95_latency_ms,
1738            cloned.target_sla.max_p95_latency_ms
1739        );
1740        assert_eq!(
1741            original.target_sla.min_success_rate,
1742            cloned.target_sla.min_success_rate
1743        );
1744        assert_eq!(
1745            original.target_sla.max_queue_wait_time_ms,
1746            cloned.target_sla.max_queue_wait_time_ms
1747        );
1748        assert_eq!(
1749            original.target_sla.target_worker_utilization,
1750            cloned.target_sla.target_worker_utilization
1751        );
1752    }
1753}