forge_orchestration/
autoscaler.rs

1//! Autoscaling module for Forge
2//!
3//! ## Table of Contents
4//! - **AutoscalerConfig**: Configuration for scaling behavior
5//! - **Autoscaler**: Main autoscaling engine
6//! - **ScalingDecision**: Result of scaling evaluation
7//! - **ScalingPolicy**: Custom scaling policies
8
9use crate::error::{ForgeError, Result};
10use crate::job::Job;
11use async_trait::async_trait;
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::RwLock;
17use tracing::{debug, info, warn};
18
19/// Configuration for the autoscaler
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct AutoscalerConfig {
22    /// Utilization threshold to trigger upscaling (0.0 - 1.0)
23    pub upscale_threshold: f64,
24    /// Utilization threshold to trigger downscaling (0.0 - 1.0)
25    pub downscale_threshold: f64,
26    /// Hysteresis period in seconds (cooldown between scaling actions)
27    pub hysteresis_secs: u64,
28    /// Evaluation interval in seconds
29    pub eval_interval_secs: u64,
30    /// Minimum instances (floor)
31    pub min_instances: u32,
32    /// Maximum instances (ceiling)
33    pub max_instances: u32,
34    /// Scale up increment
35    pub scale_up_step: u32,
36    /// Scale down increment
37    pub scale_down_step: u32,
38    /// Enable predictive scaling
39    pub predictive_enabled: bool,
40    /// Lookback window for metrics (seconds)
41    pub metrics_window_secs: u64,
42}
43
44impl Default for AutoscalerConfig {
45    fn default() -> Self {
46        Self {
47            upscale_threshold: 0.8,
48            downscale_threshold: 0.3,
49            hysteresis_secs: 300,
50            eval_interval_secs: 30,
51            min_instances: 1,
52            max_instances: 100,
53            scale_up_step: 1,
54            scale_down_step: 1,
55            predictive_enabled: false,
56            metrics_window_secs: 300,
57        }
58    }
59}
60
61impl AutoscalerConfig {
62    /// Create a new autoscaler config
63    pub fn new() -> Self {
64        Self::default()
65    }
66
67    /// Set upscale threshold
68    pub fn upscale_threshold(mut self, threshold: f64) -> Self {
69        self.upscale_threshold = threshold.clamp(0.0, 1.0);
70        self
71    }
72
73    /// Set downscale threshold
74    pub fn downscale_threshold(mut self, threshold: f64) -> Self {
75        self.downscale_threshold = threshold.clamp(0.0, 1.0);
76        self
77    }
78
79    /// Set hysteresis period
80    pub fn hysteresis_secs(mut self, secs: u64) -> Self {
81        self.hysteresis_secs = secs;
82        self
83    }
84
85    /// Set instance bounds
86    pub fn bounds(mut self, min: u32, max: u32) -> Self {
87        self.min_instances = min;
88        self.max_instances = max.max(min);
89        self
90    }
91
92    /// Enable predictive scaling
93    pub fn predictive(mut self, enabled: bool) -> Self {
94        self.predictive_enabled = enabled;
95        self
96    }
97
98    /// Validate configuration
99    pub fn validate(&self) -> Result<()> {
100        if self.upscale_threshold <= self.downscale_threshold {
101            return Err(ForgeError::config(
102                "upscale_threshold must be greater than downscale_threshold",
103            ));
104        }
105        if self.min_instances > self.max_instances {
106            return Err(ForgeError::config(
107                "min_instances cannot exceed max_instances",
108            ));
109        }
110        Ok(())
111    }
112}
113
114/// Scaling decision result
115#[derive(Debug, Clone, PartialEq, Eq)]
116pub enum ScalingDecision {
117    /// No scaling needed
118    NoChange,
119    /// Scale up by N instances
120    ScaleUp(u32),
121    /// Scale down by N instances
122    ScaleDown(u32),
123    /// Scale to exact count
124    ScaleTo(u32),
125}
126
127impl ScalingDecision {
128    /// Check if this is a scaling action
129    pub fn is_scaling(&self) -> bool {
130        !matches!(self, Self::NoChange)
131    }
132
133    /// Get the target delta (positive = up, negative = down)
134    pub fn delta(&self) -> i32 {
135        match self {
136            Self::NoChange => 0,
137            Self::ScaleUp(n) => *n as i32,
138            Self::ScaleDown(n) => -(*n as i32),
139            Self::ScaleTo(_) => 0, // Absolute, not delta
140        }
141    }
142}
143
144/// Metrics snapshot for scaling decisions
145#[derive(Debug, Clone)]
146pub struct MetricsSnapshot {
147    /// Average CPU utilization (0.0 - 1.0)
148    pub cpu_utilization: f64,
149    /// Average memory utilization (0.0 - 1.0)
150    pub memory_utilization: f64,
151    /// Request rate (requests per second)
152    pub request_rate: f64,
153    /// Average latency in milliseconds
154    pub latency_ms: f64,
155    /// Current instance count
156    pub current_instances: u32,
157    /// Timestamp
158    pub timestamp: Instant,
159}
160
161impl MetricsSnapshot {
162    /// Create a new metrics snapshot
163    pub fn new(cpu: f64, memory: f64, instances: u32) -> Self {
164        Self {
165            cpu_utilization: cpu.clamp(0.0, 1.0),
166            memory_utilization: memory.clamp(0.0, 1.0),
167            request_rate: 0.0,
168            latency_ms: 0.0,
169            current_instances: instances,
170            timestamp: Instant::now(),
171        }
172    }
173
174    /// Get combined utilization (max of CPU and memory)
175    pub fn utilization(&self) -> f64 {
176        self.cpu_utilization.max(self.memory_utilization)
177    }
178}
179
180/// Trait for custom scaling policies
181#[async_trait]
182pub trait ScalingPolicy: Send + Sync {
183    /// Evaluate metrics and return a scaling decision
184    async fn evaluate(
185        &self,
186        metrics: &MetricsSnapshot,
187        config: &AutoscalerConfig,
188    ) -> ScalingDecision;
189
190    /// Policy name for logging
191    fn name(&self) -> &str;
192}
193
194/// Default threshold-based scaling policy
195#[derive(Debug, Clone)]
196pub struct ThresholdPolicy;
197
198#[async_trait]
199impl ScalingPolicy for ThresholdPolicy {
200    async fn evaluate(
201        &self,
202        metrics: &MetricsSnapshot,
203        config: &AutoscalerConfig,
204    ) -> ScalingDecision {
205        let utilization = metrics.utilization();
206
207        if utilization >= config.upscale_threshold {
208            let new_count = (metrics.current_instances + config.scale_up_step)
209                .min(config.max_instances);
210            if new_count > metrics.current_instances {
211                return ScalingDecision::ScaleUp(new_count - metrics.current_instances);
212            }
213        } else if utilization <= config.downscale_threshold {
214            let new_count = metrics
215                .current_instances
216                .saturating_sub(config.scale_down_step)
217                .max(config.min_instances);
218            if new_count < metrics.current_instances {
219                return ScalingDecision::ScaleDown(metrics.current_instances - new_count);
220            }
221        }
222
223        ScalingDecision::NoChange
224    }
225
226    fn name(&self) -> &str {
227        "threshold"
228    }
229}
230
231/// Target utilization scaling policy
232#[derive(Debug, Clone)]
233pub struct TargetUtilizationPolicy {
234    /// Target utilization (0.0 - 1.0)
235    target: f64,
236    /// Tolerance band around target
237    tolerance: f64,
238}
239
240impl TargetUtilizationPolicy {
241    /// Create a new target utilization policy
242    pub fn new(target: f64) -> Self {
243        Self {
244            target: target.clamp(0.1, 0.9),
245            tolerance: 0.1,
246        }
247    }
248
249    /// Set tolerance band
250    pub fn with_tolerance(mut self, tolerance: f64) -> Self {
251        self.tolerance = tolerance.clamp(0.01, 0.5);
252        self
253    }
254}
255
256#[async_trait]
257impl ScalingPolicy for TargetUtilizationPolicy {
258    async fn evaluate(
259        &self,
260        metrics: &MetricsSnapshot,
261        config: &AutoscalerConfig,
262    ) -> ScalingDecision {
263        let utilization = metrics.utilization();
264        let current = metrics.current_instances as f64;
265
266        // Calculate desired instances to hit target utilization
267        let desired = (current * utilization / self.target).ceil() as u32;
268        let desired = desired.clamp(config.min_instances, config.max_instances);
269
270        let diff = (utilization - self.target).abs();
271        if diff <= self.tolerance {
272            return ScalingDecision::NoChange;
273        }
274
275        if desired > metrics.current_instances {
276            ScalingDecision::ScaleUp(desired - metrics.current_instances)
277        } else if desired < metrics.current_instances {
278            ScalingDecision::ScaleDown(metrics.current_instances - desired)
279        } else {
280            ScalingDecision::NoChange
281        }
282    }
283
284    fn name(&self) -> &str {
285        "target-utilization"
286    }
287}
288
289/// State for a single job's autoscaling
290#[derive(Debug)]
291struct JobScalingState {
292    last_scale_time: Option<Instant>,
293    metrics_history: Vec<MetricsSnapshot>,
294}
295
296impl JobScalingState {
297    fn new() -> Self {
298        Self {
299            last_scale_time: None,
300            metrics_history: Vec::new(),
301        }
302    }
303
304    fn can_scale(&self, hysteresis: Duration) -> bool {
305        match self.last_scale_time {
306            Some(t) => t.elapsed() >= hysteresis,
307            None => true,
308        }
309    }
310
311    fn record_scale(&mut self) {
312        self.last_scale_time = Some(Instant::now());
313    }
314
315    fn add_metrics(&mut self, snapshot: MetricsSnapshot, max_history: usize) {
316        self.metrics_history.push(snapshot);
317        if self.metrics_history.len() > max_history {
318            self.metrics_history.remove(0);
319        }
320    }
321}
322
323/// Main autoscaler engine
324pub struct Autoscaler {
325    config: AutoscalerConfig,
326    policy: Arc<dyn ScalingPolicy>,
327    job_states: RwLock<HashMap<String, JobScalingState>>,
328}
329
330impl Autoscaler {
331    /// Create a new autoscaler with default policy
332    pub fn new(config: AutoscalerConfig) -> Result<Self> {
333        config.validate()?;
334        Ok(Self {
335            config,
336            policy: Arc::new(ThresholdPolicy),
337            job_states: RwLock::new(HashMap::new()),
338        })
339    }
340
341    /// Create with a custom scaling policy
342    pub fn with_policy(config: AutoscalerConfig, policy: Arc<dyn ScalingPolicy>) -> Result<Self> {
343        config.validate()?;
344        Ok(Self {
345            config,
346            policy,
347            job_states: RwLock::new(HashMap::new()),
348        })
349    }
350
351    /// Get the current configuration
352    pub fn config(&self) -> &AutoscalerConfig {
353        &self.config
354    }
355
356    /// Evaluate scaling for a job
357    pub async fn evaluate(&self, job_id: &str, metrics: MetricsSnapshot) -> ScalingDecision {
358        let hysteresis = Duration::from_secs(self.config.hysteresis_secs);
359
360        // Check hysteresis
361        {
362            let states = self.job_states.read().await;
363            if let Some(state) = states.get(job_id) {
364                if !state.can_scale(hysteresis) {
365                    debug!(
366                        job_id = %job_id,
367                        "Scaling blocked by hysteresis"
368                    );
369                    return ScalingDecision::NoChange;
370                }
371            }
372        }
373
374        // Evaluate policy
375        let decision = self.policy.evaluate(&metrics, &self.config).await;
376
377        // Record metrics and scaling action
378        {
379            let mut states = self.job_states.write().await;
380            let state = states
381                .entry(job_id.to_string())
382                .or_insert_with(JobScalingState::new);
383
384            state.add_metrics(metrics, 100);
385
386            if decision.is_scaling() {
387                info!(
388                    job_id = %job_id,
389                    decision = ?decision,
390                    policy = %self.policy.name(),
391                    "Scaling decision made"
392                );
393                state.record_scale();
394            }
395        }
396
397        decision
398    }
399
400    /// Force a scaling decision (bypasses hysteresis)
401    pub async fn force_scale(&self, job_id: &str, decision: ScalingDecision) {
402        let mut states = self.job_states.write().await;
403        let state = states
404            .entry(job_id.to_string())
405            .or_insert_with(JobScalingState::new);
406
407        warn!(
408            job_id = %job_id,
409            decision = ?decision,
410            "Forced scaling decision"
411        );
412        state.record_scale();
413    }
414
415    /// Get metrics history for a job
416    pub async fn get_metrics_history(&self, job_id: &str) -> Vec<MetricsSnapshot> {
417        let states = self.job_states.read().await;
418        states
419            .get(job_id)
420            .map(|s| s.metrics_history.clone())
421            .unwrap_or_default()
422    }
423
424    /// Clear state for a job
425    pub async fn clear_job(&self, job_id: &str) {
426        let mut states = self.job_states.write().await;
427        states.remove(job_id);
428    }
429}
430
431#[cfg(test)]
432mod tests {
433    use super::*;
434
435    #[tokio::test]
436    async fn test_threshold_policy_scale_up() {
437        let policy = ThresholdPolicy;
438        let config = AutoscalerConfig::default();
439        let metrics = MetricsSnapshot::new(0.85, 0.5, 5);
440
441        let decision = policy.evaluate(&metrics, &config).await;
442        assert_eq!(decision, ScalingDecision::ScaleUp(1));
443    }
444
445    #[tokio::test]
446    async fn test_threshold_policy_scale_down() {
447        let policy = ThresholdPolicy;
448        let config = AutoscalerConfig::default();
449        let metrics = MetricsSnapshot::new(0.2, 0.1, 5);
450
451        let decision = policy.evaluate(&metrics, &config).await;
452        assert_eq!(decision, ScalingDecision::ScaleDown(1));
453    }
454
455    #[tokio::test]
456    async fn test_threshold_policy_no_change() {
457        let policy = ThresholdPolicy;
458        let config = AutoscalerConfig::default();
459        let metrics = MetricsSnapshot::new(0.5, 0.5, 5);
460
461        let decision = policy.evaluate(&metrics, &config).await;
462        assert_eq!(decision, ScalingDecision::NoChange);
463    }
464
465    #[tokio::test]
466    async fn test_autoscaler_hysteresis() {
467        let config = AutoscalerConfig::default().hysteresis_secs(1);
468        let autoscaler = Autoscaler::new(config).unwrap();
469
470        // First evaluation should scale
471        let metrics = MetricsSnapshot::new(0.9, 0.5, 5);
472        let decision = autoscaler.evaluate("job-1", metrics.clone()).await;
473        assert!(decision.is_scaling());
474
475        // Immediate second evaluation should be blocked
476        let decision = autoscaler.evaluate("job-1", metrics).await;
477        assert_eq!(decision, ScalingDecision::NoChange);
478    }
479
480    #[tokio::test]
481    async fn test_target_utilization_policy() {
482        let policy = TargetUtilizationPolicy::new(0.7);
483        let config = AutoscalerConfig::default();
484
485        // High utilization -> scale up
486        let metrics = MetricsSnapshot::new(0.9, 0.5, 5);
487        let decision = policy.evaluate(&metrics, &config).await;
488        assert!(matches!(decision, ScalingDecision::ScaleUp(_)));
489
490        // Low utilization -> scale down
491        let metrics = MetricsSnapshot::new(0.3, 0.2, 5);
492        let decision = policy.evaluate(&metrics, &config).await;
493        assert!(matches!(decision, ScalingDecision::ScaleDown(_)));
494    }
495
496    #[test]
497    fn test_config_validation() {
498        let config = AutoscalerConfig::default()
499            .upscale_threshold(0.3)
500            .downscale_threshold(0.8);
501
502        assert!(config.validate().is_err());
503    }
504}