1use crate::error::{ForgeError, Result};
10use crate::job::Job;
11use async_trait::async_trait;
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::RwLock;
17use tracing::{debug, info, warn};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct AutoscalerConfig {
22 pub upscale_threshold: f64,
24 pub downscale_threshold: f64,
26 pub hysteresis_secs: u64,
28 pub eval_interval_secs: u64,
30 pub min_instances: u32,
32 pub max_instances: u32,
34 pub scale_up_step: u32,
36 pub scale_down_step: u32,
38 pub predictive_enabled: bool,
40 pub metrics_window_secs: u64,
42}
43
44impl Default for AutoscalerConfig {
45 fn default() -> Self {
46 Self {
47 upscale_threshold: 0.8,
48 downscale_threshold: 0.3,
49 hysteresis_secs: 300,
50 eval_interval_secs: 30,
51 min_instances: 1,
52 max_instances: 100,
53 scale_up_step: 1,
54 scale_down_step: 1,
55 predictive_enabled: false,
56 metrics_window_secs: 300,
57 }
58 }
59}
60
61impl AutoscalerConfig {
62 pub fn new() -> Self {
64 Self::default()
65 }
66
67 pub fn upscale_threshold(mut self, threshold: f64) -> Self {
69 self.upscale_threshold = threshold.clamp(0.0, 1.0);
70 self
71 }
72
73 pub fn downscale_threshold(mut self, threshold: f64) -> Self {
75 self.downscale_threshold = threshold.clamp(0.0, 1.0);
76 self
77 }
78
79 pub fn hysteresis_secs(mut self, secs: u64) -> Self {
81 self.hysteresis_secs = secs;
82 self
83 }
84
85 pub fn bounds(mut self, min: u32, max: u32) -> Self {
87 self.min_instances = min;
88 self.max_instances = max.max(min);
89 self
90 }
91
92 pub fn predictive(mut self, enabled: bool) -> Self {
94 self.predictive_enabled = enabled;
95 self
96 }
97
98 pub fn validate(&self) -> Result<()> {
100 if self.upscale_threshold <= self.downscale_threshold {
101 return Err(ForgeError::config(
102 "upscale_threshold must be greater than downscale_threshold",
103 ));
104 }
105 if self.min_instances > self.max_instances {
106 return Err(ForgeError::config(
107 "min_instances cannot exceed max_instances",
108 ));
109 }
110 Ok(())
111 }
112}
113
114#[derive(Debug, Clone, PartialEq, Eq)]
116pub enum ScalingDecision {
117 NoChange,
119 ScaleUp(u32),
121 ScaleDown(u32),
123 ScaleTo(u32),
125}
126
127impl ScalingDecision {
128 pub fn is_scaling(&self) -> bool {
130 !matches!(self, Self::NoChange)
131 }
132
133 pub fn delta(&self) -> i32 {
135 match self {
136 Self::NoChange => 0,
137 Self::ScaleUp(n) => *n as i32,
138 Self::ScaleDown(n) => -(*n as i32),
139 Self::ScaleTo(_) => 0, }
141 }
142}
143
144#[derive(Debug, Clone)]
146pub struct MetricsSnapshot {
147 pub cpu_utilization: f64,
149 pub memory_utilization: f64,
151 pub request_rate: f64,
153 pub latency_ms: f64,
155 pub current_instances: u32,
157 pub timestamp: Instant,
159}
160
161impl MetricsSnapshot {
162 pub fn new(cpu: f64, memory: f64, instances: u32) -> Self {
164 Self {
165 cpu_utilization: cpu.clamp(0.0, 1.0),
166 memory_utilization: memory.clamp(0.0, 1.0),
167 request_rate: 0.0,
168 latency_ms: 0.0,
169 current_instances: instances,
170 timestamp: Instant::now(),
171 }
172 }
173
174 pub fn utilization(&self) -> f64 {
176 self.cpu_utilization.max(self.memory_utilization)
177 }
178}
179
180#[async_trait]
182pub trait ScalingPolicy: Send + Sync {
183 async fn evaluate(
185 &self,
186 metrics: &MetricsSnapshot,
187 config: &AutoscalerConfig,
188 ) -> ScalingDecision;
189
190 fn name(&self) -> &str;
192}
193
194#[derive(Debug, Clone)]
196pub struct ThresholdPolicy;
197
198#[async_trait]
199impl ScalingPolicy for ThresholdPolicy {
200 async fn evaluate(
201 &self,
202 metrics: &MetricsSnapshot,
203 config: &AutoscalerConfig,
204 ) -> ScalingDecision {
205 let utilization = metrics.utilization();
206
207 if utilization >= config.upscale_threshold {
208 let new_count = (metrics.current_instances + config.scale_up_step)
209 .min(config.max_instances);
210 if new_count > metrics.current_instances {
211 return ScalingDecision::ScaleUp(new_count - metrics.current_instances);
212 }
213 } else if utilization <= config.downscale_threshold {
214 let new_count = metrics
215 .current_instances
216 .saturating_sub(config.scale_down_step)
217 .max(config.min_instances);
218 if new_count < metrics.current_instances {
219 return ScalingDecision::ScaleDown(metrics.current_instances - new_count);
220 }
221 }
222
223 ScalingDecision::NoChange
224 }
225
226 fn name(&self) -> &str {
227 "threshold"
228 }
229}
230
231#[derive(Debug, Clone)]
233pub struct TargetUtilizationPolicy {
234 target: f64,
236 tolerance: f64,
238}
239
240impl TargetUtilizationPolicy {
241 pub fn new(target: f64) -> Self {
243 Self {
244 target: target.clamp(0.1, 0.9),
245 tolerance: 0.1,
246 }
247 }
248
249 pub fn with_tolerance(mut self, tolerance: f64) -> Self {
251 self.tolerance = tolerance.clamp(0.01, 0.5);
252 self
253 }
254}
255
256#[async_trait]
257impl ScalingPolicy for TargetUtilizationPolicy {
258 async fn evaluate(
259 &self,
260 metrics: &MetricsSnapshot,
261 config: &AutoscalerConfig,
262 ) -> ScalingDecision {
263 let utilization = metrics.utilization();
264 let current = metrics.current_instances as f64;
265
266 let desired = (current * utilization / self.target).ceil() as u32;
268 let desired = desired.clamp(config.min_instances, config.max_instances);
269
270 let diff = (utilization - self.target).abs();
271 if diff <= self.tolerance {
272 return ScalingDecision::NoChange;
273 }
274
275 if desired > metrics.current_instances {
276 ScalingDecision::ScaleUp(desired - metrics.current_instances)
277 } else if desired < metrics.current_instances {
278 ScalingDecision::ScaleDown(metrics.current_instances - desired)
279 } else {
280 ScalingDecision::NoChange
281 }
282 }
283
284 fn name(&self) -> &str {
285 "target-utilization"
286 }
287}
288
289#[derive(Debug)]
291struct JobScalingState {
292 last_scale_time: Option<Instant>,
293 metrics_history: Vec<MetricsSnapshot>,
294}
295
296impl JobScalingState {
297 fn new() -> Self {
298 Self {
299 last_scale_time: None,
300 metrics_history: Vec::new(),
301 }
302 }
303
304 fn can_scale(&self, hysteresis: Duration) -> bool {
305 match self.last_scale_time {
306 Some(t) => t.elapsed() >= hysteresis,
307 None => true,
308 }
309 }
310
311 fn record_scale(&mut self) {
312 self.last_scale_time = Some(Instant::now());
313 }
314
315 fn add_metrics(&mut self, snapshot: MetricsSnapshot, max_history: usize) {
316 self.metrics_history.push(snapshot);
317 if self.metrics_history.len() > max_history {
318 self.metrics_history.remove(0);
319 }
320 }
321}
322
323pub struct Autoscaler {
325 config: AutoscalerConfig,
326 policy: Arc<dyn ScalingPolicy>,
327 job_states: RwLock<HashMap<String, JobScalingState>>,
328}
329
330impl Autoscaler {
331 pub fn new(config: AutoscalerConfig) -> Result<Self> {
333 config.validate()?;
334 Ok(Self {
335 config,
336 policy: Arc::new(ThresholdPolicy),
337 job_states: RwLock::new(HashMap::new()),
338 })
339 }
340
341 pub fn with_policy(config: AutoscalerConfig, policy: Arc<dyn ScalingPolicy>) -> Result<Self> {
343 config.validate()?;
344 Ok(Self {
345 config,
346 policy,
347 job_states: RwLock::new(HashMap::new()),
348 })
349 }
350
351 pub fn config(&self) -> &AutoscalerConfig {
353 &self.config
354 }
355
356 pub async fn evaluate(&self, job_id: &str, metrics: MetricsSnapshot) -> ScalingDecision {
358 let hysteresis = Duration::from_secs(self.config.hysteresis_secs);
359
360 {
362 let states = self.job_states.read().await;
363 if let Some(state) = states.get(job_id) {
364 if !state.can_scale(hysteresis) {
365 debug!(
366 job_id = %job_id,
367 "Scaling blocked by hysteresis"
368 );
369 return ScalingDecision::NoChange;
370 }
371 }
372 }
373
374 let decision = self.policy.evaluate(&metrics, &self.config).await;
376
377 {
379 let mut states = self.job_states.write().await;
380 let state = states
381 .entry(job_id.to_string())
382 .or_insert_with(JobScalingState::new);
383
384 state.add_metrics(metrics, 100);
385
386 if decision.is_scaling() {
387 info!(
388 job_id = %job_id,
389 decision = ?decision,
390 policy = %self.policy.name(),
391 "Scaling decision made"
392 );
393 state.record_scale();
394 }
395 }
396
397 decision
398 }
399
400 pub async fn force_scale(&self, job_id: &str, decision: ScalingDecision) {
402 let mut states = self.job_states.write().await;
403 let state = states
404 .entry(job_id.to_string())
405 .or_insert_with(JobScalingState::new);
406
407 warn!(
408 job_id = %job_id,
409 decision = ?decision,
410 "Forced scaling decision"
411 );
412 state.record_scale();
413 }
414
415 pub async fn get_metrics_history(&self, job_id: &str) -> Vec<MetricsSnapshot> {
417 let states = self.job_states.read().await;
418 states
419 .get(job_id)
420 .map(|s| s.metrics_history.clone())
421 .unwrap_or_default()
422 }
423
424 pub async fn clear_job(&self, job_id: &str) {
426 let mut states = self.job_states.write().await;
427 states.remove(job_id);
428 }
429}
430
431#[cfg(test)]
432mod tests {
433 use super::*;
434
435 #[tokio::test]
436 async fn test_threshold_policy_scale_up() {
437 let policy = ThresholdPolicy;
438 let config = AutoscalerConfig::default();
439 let metrics = MetricsSnapshot::new(0.85, 0.5, 5);
440
441 let decision = policy.evaluate(&metrics, &config).await;
442 assert_eq!(decision, ScalingDecision::ScaleUp(1));
443 }
444
445 #[tokio::test]
446 async fn test_threshold_policy_scale_down() {
447 let policy = ThresholdPolicy;
448 let config = AutoscalerConfig::default();
449 let metrics = MetricsSnapshot::new(0.2, 0.1, 5);
450
451 let decision = policy.evaluate(&metrics, &config).await;
452 assert_eq!(decision, ScalingDecision::ScaleDown(1));
453 }
454
455 #[tokio::test]
456 async fn test_threshold_policy_no_change() {
457 let policy = ThresholdPolicy;
458 let config = AutoscalerConfig::default();
459 let metrics = MetricsSnapshot::new(0.5, 0.5, 5);
460
461 let decision = policy.evaluate(&metrics, &config).await;
462 assert_eq!(decision, ScalingDecision::NoChange);
463 }
464
465 #[tokio::test]
466 async fn test_autoscaler_hysteresis() {
467 let config = AutoscalerConfig::default().hysteresis_secs(1);
468 let autoscaler = Autoscaler::new(config).unwrap();
469
470 let metrics = MetricsSnapshot::new(0.9, 0.5, 5);
472 let decision = autoscaler.evaluate("job-1", metrics.clone()).await;
473 assert!(decision.is_scaling());
474
475 let decision = autoscaler.evaluate("job-1", metrics).await;
477 assert_eq!(decision, ScalingDecision::NoChange);
478 }
479
480 #[tokio::test]
481 async fn test_target_utilization_policy() {
482 let policy = TargetUtilizationPolicy::new(0.7);
483 let config = AutoscalerConfig::default();
484
485 let metrics = MetricsSnapshot::new(0.9, 0.5, 5);
487 let decision = policy.evaluate(&metrics, &config).await;
488 assert!(matches!(decision, ScalingDecision::ScaleUp(_)));
489
490 let metrics = MetricsSnapshot::new(0.3, 0.2, 5);
492 let decision = policy.evaluate(&metrics, &config).await;
493 assert!(matches!(decision, ScalingDecision::ScaleDown(_)));
494 }
495
496 #[test]
497 fn test_config_validation() {
498 let config = AutoscalerConfig::default()
499 .upscale_threshold(0.3)
500 .downscale_threshold(0.8);
501
502 assert!(config.validate().is_err());
503 }
504}