use crate::config::ScalingConfig;
use crate::error::Result;
use crate::scaling::executor::{ScaleDecision, ScaleDirection, ScaleExecutor};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
#[derive(Debug, Clone)]
pub struct ServiceMetricsSnapshot {
pub service: String,
#[allow(dead_code)]
pub healthy_backends: usize,
pub in_flight: usize,
pub queue_depth: usize,
}
struct ServiceScaleState {
config: ScalingConfig,
last_request_at: Instant,
current_replicas: u32,
}
pub struct Autoscaler {
executor: Arc<dyn ScaleExecutor>,
services: HashMap<String, ServiceScaleState>,
}
impl Autoscaler {
pub fn new(executor: Arc<dyn ScaleExecutor>, configs: HashMap<String, ScalingConfig>) -> Self {
let now = Instant::now();
let services = configs
.into_iter()
.map(|(name, config)| {
let state = ServiceScaleState {
config,
last_request_at: now,
current_replicas: 0,
};
(name, state)
})
.collect();
Self { executor, services }
}
pub fn compute_desired_replicas(
config: &ScalingConfig,
snapshot: &ServiceMetricsSnapshot,
) -> u32 {
let cc = config.container_concurrency;
if cc == 0 {
return config.min_replicas.max(1);
}
let total_load = (snapshot.in_flight + snapshot.queue_depth) as f64;
if total_load == 0.0 {
return config.min_replicas;
}
let effective_capacity = cc as f64 * config.target_utilization;
if effective_capacity <= 0.0 {
return config.max_replicas;
}
let desired = (total_load / effective_capacity).ceil() as u32;
desired.clamp(config.min_replicas, config.max_replicas)
}
pub fn evaluate(&mut self, snapshot: &ServiceMetricsSnapshot) -> Option<ScaleDecision> {
let state = self.services.get_mut(&snapshot.service)?;
if snapshot.in_flight > 0 || snapshot.queue_depth > 0 {
state.last_request_at = Instant::now();
}
let desired = Self::compute_desired_replicas(&state.config, snapshot);
let current = state.current_replicas;
if desired == current {
return None;
}
if desired < current {
let elapsed = state.last_request_at.elapsed().as_secs();
if elapsed < state.config.scale_down_delay_secs {
return None;
}
}
let direction = if desired > current {
ScaleDirection::Up
} else {
ScaleDirection::Down
};
let reason = format!(
"{}: in_flight={}, queue={}, cc={}, util={:.0}%, current={}, desired={}",
direction,
snapshot.in_flight,
snapshot.queue_depth,
state.config.container_concurrency,
state.config.target_utilization * 100.0,
current,
desired,
);
state.current_replicas = desired;
Some(ScaleDecision {
service: snapshot.service.clone(),
direction,
current_replicas: current,
desired_replicas: desired,
reason,
})
}
pub async fn tick<F>(&mut self, metrics_fn: F) -> Vec<Result<()>>
where
F: Fn(&str) -> Option<ServiceMetricsSnapshot>,
{
let service_names: Vec<String> = self.services.keys().cloned().collect();
let mut results = Vec::new();
for name in &service_names {
if let Some(snapshot) = metrics_fn(name) {
if let Some(decision) = self.evaluate(&snapshot) {
tracing::info!(
service = decision.service,
direction = %decision.direction,
from = decision.current_replicas,
to = decision.desired_replicas,
reason = decision.reason,
"Autoscaler decision"
);
let result = self.executor.execute(&decision).await;
results.push(result.map(|_| ()));
}
}
}
results
}
#[allow(dead_code)]
pub fn executor(&self) -> &Arc<dyn ScaleExecutor> {
&self.executor
}
#[allow(dead_code)]
pub fn has_service(&self, name: &str) -> bool {
self.services.contains_key(name)
}
pub fn service_count(&self) -> usize {
self.services.len()
}
#[allow(dead_code)]
pub fn set_current_replicas(&mut self, service: &str, replicas: u32) {
if let Some(state) = self.services.get_mut(service) {
state.current_replicas = replicas;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::scaling::executor::MockScaleExecutor;
fn default_config() -> ScalingConfig {
ScalingConfig {
min_replicas: 0,
max_replicas: 10,
container_concurrency: 10,
target_utilization: 0.7,
scale_down_delay_secs: 300,
..ScalingConfig::default()
}
}
fn snapshot(service: &str, in_flight: usize, queue_depth: usize) -> ServiceMetricsSnapshot {
ServiceMetricsSnapshot {
service: service.into(),
healthy_backends: 2,
in_flight,
queue_depth,
}
}
#[test]
fn test_formula_basic() {
let config = default_config();
let snap = snapshot("svc", 10, 0);
assert_eq!(Autoscaler::compute_desired_replicas(&config, &snap), 2);
}
#[test]
fn test_formula_includes_queue_depth() {
let config = default_config();
let snap = snapshot("svc", 5, 5);
assert_eq!(Autoscaler::compute_desired_replicas(&config, &snap), 2);
}
#[test]
fn test_formula_high_load() {
let config = default_config();
let snap = snapshot("svc", 70, 0);
assert_eq!(Autoscaler::compute_desired_replicas(&config, &snap), 10);
}
#[test]
fn test_formula_clamped_to_max() {
let config = default_config();
let snap = snapshot("svc", 100, 0);
assert_eq!(Autoscaler::compute_desired_replicas(&config, &snap), 10);
}
#[test]
fn test_formula_clamped_to_min() {
let config = ScalingConfig {
min_replicas: 2,
..default_config()
};
let snap = snapshot("svc", 1, 0);
assert_eq!(Autoscaler::compute_desired_replicas(&config, &snap), 2);
}
#[test]
fn test_formula_zero_load_returns_min() {
let config = default_config();
let snap = snapshot("svc", 0, 0);
assert_eq!(Autoscaler::compute_desired_replicas(&config, &snap), 0);
}
#[test]
fn test_formula_zero_load_with_min() {
let config = ScalingConfig {
min_replicas: 1,
..default_config()
};
let snap = snapshot("svc", 0, 0);
assert_eq!(Autoscaler::compute_desired_replicas(&config, &snap), 1);
}
#[test]
fn test_formula_cc_zero_unlimited() {
let config = ScalingConfig {
container_concurrency: 0,
min_replicas: 1,
..default_config()
};
let snap = snapshot("svc", 50, 0);
assert_eq!(Autoscaler::compute_desired_replicas(&config, &snap), 1);
}
#[test]
fn test_formula_utilization_100_percent() {
let config = ScalingConfig {
target_utilization: 1.0,
..default_config()
};
let snap = snapshot("svc", 10, 0);
assert_eq!(Autoscaler::compute_desired_replicas(&config, &snap), 1);
}
#[test]
fn test_evaluate_scale_up() {
let mock = Arc::new(MockScaleExecutor::new());
let mut configs = HashMap::new();
configs.insert("svc".into(), default_config());
let mut autoscaler = Autoscaler::new(mock, configs);
let snap = snapshot("svc", 20, 0);
let decision = autoscaler.evaluate(&snap).unwrap();
assert_eq!(decision.direction, ScaleDirection::Up);
assert_eq!(decision.current_replicas, 0);
assert_eq!(decision.desired_replicas, 3); }
#[test]
fn test_evaluate_no_change() {
let mock = Arc::new(MockScaleExecutor::new());
let mut configs = HashMap::new();
configs.insert("svc".into(), default_config());
let mut autoscaler = Autoscaler::new(mock, configs);
autoscaler.set_current_replicas("svc", 3);
let snap = snapshot("svc", 20, 0);
assert!(autoscaler.evaluate(&snap).is_none());
}
#[test]
fn test_evaluate_scale_down_blocked_by_cooldown() {
let mock = Arc::new(MockScaleExecutor::new());
let mut configs = HashMap::new();
configs.insert("svc".into(), default_config());
let mut autoscaler = Autoscaler::new(mock, configs);
autoscaler.set_current_replicas("svc", 5);
let snap = snapshot("svc", 0, 0);
assert!(autoscaler.evaluate(&snap).is_none());
}
#[test]
fn test_evaluate_unknown_service() {
let mock = Arc::new(MockScaleExecutor::new());
let mut autoscaler = Autoscaler::new(mock, HashMap::new());
let snap = snapshot("unknown", 10, 0);
assert!(autoscaler.evaluate(&snap).is_none());
}
#[test]
fn test_evaluate_reason_formatting() {
let mock = Arc::new(MockScaleExecutor::new());
let mut configs = HashMap::new();
configs.insert("svc".into(), default_config());
let mut autoscaler = Autoscaler::new(mock, configs);
let snap = snapshot("svc", 15, 5);
let decision = autoscaler.evaluate(&snap).unwrap();
assert!(decision.reason.contains("in_flight=15"));
assert!(decision.reason.contains("queue=5"));
assert!(decision.reason.contains("cc=10"));
}
#[tokio::test]
async fn test_tick_executes_decisions() {
let mock = Arc::new(MockScaleExecutor::new());
let mut configs = HashMap::new();
configs.insert("svc".into(), default_config());
let mut autoscaler = Autoscaler::new(mock.clone(), configs);
let results = autoscaler
.tick(|name| {
if name == "svc" {
Some(snapshot("svc", 20, 0))
} else {
None
}
})
.await;
assert_eq!(results.len(), 1);
assert!(results[0].is_ok());
assert_eq!(mock.decisions().len(), 1);
}
#[tokio::test]
async fn test_tick_no_metrics_no_decision() {
let mock = Arc::new(MockScaleExecutor::new());
let mut configs = HashMap::new();
configs.insert("svc".into(), default_config());
let mut autoscaler = Autoscaler::new(mock.clone(), configs);
let results = autoscaler.tick(|_| None).await;
assert!(results.is_empty());
assert!(mock.decisions().is_empty());
}
#[test]
fn test_autoscaler_has_service() {
let mock = Arc::new(MockScaleExecutor::new());
let mut configs = HashMap::new();
configs.insert("api".into(), default_config());
let autoscaler = Autoscaler::new(mock, configs);
assert!(autoscaler.has_service("api"));
assert!(!autoscaler.has_service("web"));
assert_eq!(autoscaler.service_count(), 1);
}
#[test]
fn test_autoscaler_executor_name() {
let mock = Arc::new(MockScaleExecutor::new());
let autoscaler = Autoscaler::new(mock, HashMap::new());
assert_eq!(autoscaler.executor().name(), "mock");
}
}