use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use tokio::sync::RwLock;
use tracing::{debug, info};
use crate::error::Result;
use crate::raft::OxirsNodeId;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ScalingPolicy {
Threshold,
Predictive,
Scheduled,
Hybrid,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ScalingAction {
ScaleUp,
ScaleDown,
NoAction,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoadMetrics {
pub cpu_utilization: f64,
pub memory_utilization: f64,
pub query_throughput: f64,
pub avg_query_latency: f64,
pub active_connections: u32,
pub replication_lag: u64,
pub timestamp: SystemTime,
}
impl Default for LoadMetrics {
fn default() -> Self {
Self {
cpu_utilization: 0.0,
memory_utilization: 0.0,
query_throughput: 0.0,
avg_query_latency: 0.0,
active_connections: 0,
replication_lag: 0,
timestamp: SystemTime::now(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AutoScalingConfig {
pub policy: ScalingPolicy,
pub min_nodes: usize,
pub max_nodes: usize,
pub cpu_scale_up_threshold: f64,
pub cpu_scale_down_threshold: f64,
pub memory_scale_up_threshold: f64,
pub memory_scale_down_threshold: f64,
pub throughput_scale_up_threshold: f64,
pub cooldown_period_secs: u64,
pub consecutive_violations: usize,
pub gradual_scaling: bool,
pub metric_collection_interval_secs: u64,
pub enable_predictive: bool,
pub prediction_window_mins: u32,
}
impl Default for AutoScalingConfig {
fn default() -> Self {
Self {
policy: ScalingPolicy::Threshold,
min_nodes: 1,
max_nodes: 10,
cpu_scale_up_threshold: 0.75,
cpu_scale_down_threshold: 0.25,
memory_scale_up_threshold: 0.80,
memory_scale_down_threshold: 0.30,
throughput_scale_up_threshold: 1000.0,
cooldown_period_secs: 300, consecutive_violations: 3,
gradual_scaling: true,
metric_collection_interval_secs: 60,
enable_predictive: false,
prediction_window_mins: 30,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScalingEvent {
pub timestamp: SystemTime,
pub action: ScalingAction,
pub node_count_change: i32,
pub metrics: LoadMetrics,
pub reason: String,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct AutoScalingStats {
pub total_scale_ups: u64,
pub total_scale_downs: u64,
pub total_nodes_added: u64,
pub total_nodes_removed: u64,
pub last_scaling_event: Option<SystemTime>,
pub avg_time_between_scaling_secs: f64,
pub failed_scaling_attempts: u64,
}
pub struct AutoScalingManager {
config: AutoScalingConfig,
current_node_count: Arc<RwLock<usize>>,
metrics_history: Arc<RwLock<VecDeque<LoadMetrics>>>,
violation_count: Arc<RwLock<HashMap<String, usize>>>,
last_scaling_action: Arc<RwLock<Option<Instant>>>,
scaling_events: Arc<RwLock<Vec<ScalingEvent>>>,
stats: Arc<RwLock<AutoScalingStats>>,
}
impl AutoScalingManager {
pub fn new(config: AutoScalingConfig, initial_node_count: usize) -> Self {
Self {
config,
current_node_count: Arc::new(RwLock::new(initial_node_count)),
metrics_history: Arc::new(RwLock::new(VecDeque::new())),
violation_count: Arc::new(RwLock::new(HashMap::new())),
last_scaling_action: Arc::new(RwLock::new(None)),
scaling_events: Arc::new(RwLock::new(Vec::new())),
stats: Arc::new(RwLock::new(AutoScalingStats::default())),
}
}
pub async fn record_metrics(&self, metrics: LoadMetrics) {
let mut history = self.metrics_history.write().await;
let max_history_size = 3600 / self.config.metric_collection_interval_secs as usize;
while history.len() >= max_history_size {
history.pop_front();
}
history.push_back(metrics);
debug!("Recorded load metrics, history size: {}", history.len());
}
pub async fn evaluate_scaling(&self) -> Result<ScalingAction> {
if !self.is_cooldown_expired().await {
debug!("Scaling is in cooldown period");
return Ok(ScalingAction::NoAction);
}
match self.config.policy {
ScalingPolicy::Threshold => self.evaluate_threshold_policy().await,
ScalingPolicy::Predictive => self.evaluate_predictive_policy().await,
ScalingPolicy::Scheduled => self.evaluate_scheduled_policy().await,
ScalingPolicy::Hybrid => self.evaluate_hybrid_policy().await,
}
}
pub async fn execute_scaling(&self, action: ScalingAction) -> Result<Vec<OxirsNodeId>> {
if action == ScalingAction::NoAction {
return Ok(Vec::new());
}
let current_count = *self.current_node_count.read().await;
let target_count = self.calculate_target_node_count(action, current_count);
if target_count == current_count {
return Ok(Vec::new());
}
info!(
"Executing scaling action: {:?}, current: {}, target: {}",
action, current_count, target_count
);
let node_ids = if target_count > current_count {
self.scale_up(target_count - current_count).await?
} else {
self.scale_down(current_count - target_count).await?
};
{
let mut count = self.current_node_count.write().await;
*count = target_count;
}
let metrics = self.get_latest_metrics().await;
let event = ScalingEvent {
timestamp: SystemTime::now(),
action,
node_count_change: (target_count as i32) - (current_count as i32),
metrics,
reason: format!(
"Auto-scaling {:?} from {} to {} nodes",
action, current_count, target_count
),
};
{
let mut events = self.scaling_events.write().await;
events.push(event);
let len = events.len();
if len > 100 {
events.drain(0..len - 100);
}
}
{
let mut stats = self.stats.write().await;
match action {
ScalingAction::ScaleUp => {
stats.total_scale_ups += 1;
stats.total_nodes_added += (target_count - current_count) as u64;
}
ScalingAction::ScaleDown => {
stats.total_scale_downs += 1;
stats.total_nodes_removed += (current_count - target_count) as u64;
}
ScalingAction::NoAction => {}
}
stats.last_scaling_event = Some(SystemTime::now());
}
{
let mut last_action = self.last_scaling_action.write().await;
*last_action = Some(Instant::now());
}
Ok(node_ids)
}
pub async fn get_current_node_count(&self) -> usize {
*self.current_node_count.read().await
}
pub async fn get_statistics(&self) -> AutoScalingStats {
self.stats.read().await.clone()
}
pub async fn get_recent_events(&self, count: usize) -> Vec<ScalingEvent> {
let events = self.scaling_events.read().await;
events.iter().rev().take(count).cloned().collect()
}
pub async fn get_metrics_history(&self, duration_secs: u64) -> Vec<LoadMetrics> {
let history = self.metrics_history.read().await;
let cutoff = SystemTime::now() - Duration::from_secs(duration_secs);
history
.iter()
.filter(|m| m.timestamp >= cutoff)
.cloned()
.collect()
}
async fn is_cooldown_expired(&self) -> bool {
let last_action = self.last_scaling_action.read().await;
if let Some(last) = *last_action {
let elapsed = last.elapsed().as_secs();
elapsed >= self.config.cooldown_period_secs
} else {
true
}
}
async fn evaluate_threshold_policy(&self) -> Result<ScalingAction> {
let metrics = self.get_latest_metrics().await;
let mut violations = Vec::new();
if metrics.cpu_utilization >= self.config.cpu_scale_up_threshold {
violations.push("cpu_high");
} else if metrics.cpu_utilization <= self.config.cpu_scale_down_threshold {
violations.push("cpu_low");
}
if metrics.memory_utilization >= self.config.memory_scale_up_threshold {
violations.push("memory_high");
} else if metrics.memory_utilization <= self.config.memory_scale_down_threshold {
violations.push("memory_low");
}
if metrics.query_throughput >= self.config.throughput_scale_up_threshold {
violations.push("throughput_high");
}
let scale_up_violations = violations.iter().filter(|v| v.ends_with("high")).count();
let scale_down_violations = violations.iter().filter(|v| v.ends_with("low")).count();
if scale_up_violations > 0 {
let count = self.increment_violation_count("scale_up").await;
if count >= self.config.consecutive_violations {
self.reset_violation_count("scale_up").await;
return Ok(ScalingAction::ScaleUp);
}
} else {
self.reset_violation_count("scale_up").await;
}
if scale_down_violations >= 2 {
let count = self.increment_violation_count("scale_down").await;
if count >= self.config.consecutive_violations {
self.reset_violation_count("scale_down").await;
return Ok(ScalingAction::ScaleDown);
}
} else {
self.reset_violation_count("scale_down").await;
}
Ok(ScalingAction::NoAction)
}
async fn evaluate_predictive_policy(&self) -> Result<ScalingAction> {
if !self.config.enable_predictive {
return self.evaluate_threshold_policy().await;
}
let history = self.metrics_history.read().await;
if history.len() < 10 {
return self.evaluate_threshold_policy().await;
}
use scirs2_core::ndarray_ext::Array1;
let cpu_values: Vec<f64> = history.iter().map(|m| m.cpu_utilization).collect();
let cpu_array = Array1::from_vec(cpu_values);
let n = cpu_array.len() as f64;
let x_values: Vec<f64> = (0..cpu_array.len()).map(|i| i as f64).collect();
let x_array = Array1::from_vec(x_values);
let x_mean = x_array.mean().unwrap_or(0.0);
let y_mean = cpu_array.mean().unwrap_or(0.0);
let mut numerator = 0.0;
let mut denominator = 0.0;
for i in 0..cpu_array.len() {
let x_diff = x_array[i] - x_mean;
let y_diff = cpu_array[i] - y_mean;
numerator += x_diff * y_diff;
denominator += x_diff * x_diff;
}
let slope = if denominator != 0.0 {
numerator / denominator
} else {
0.0
};
let prediction_steps = (self.config.prediction_window_mins as f64 * 60.0
/ self.config.metric_collection_interval_secs as f64)
as usize;
let future_x = n + prediction_steps as f64;
let predicted_cpu = y_mean + slope * (future_x - x_mean);
debug!(
"Predictive scaling: current CPU: {:.2}%, predicted CPU: {:.2}%, trend slope: {:.4}",
y_mean * 100.0,
predicted_cpu * 100.0,
slope
);
if predicted_cpu >= self.config.cpu_scale_up_threshold {
Ok(ScalingAction::ScaleUp)
} else if predicted_cpu <= self.config.cpu_scale_down_threshold {
Ok(ScalingAction::ScaleDown)
} else {
Ok(ScalingAction::NoAction)
}
}
async fn evaluate_scheduled_policy(&self) -> Result<ScalingAction> {
self.evaluate_threshold_policy().await
}
async fn evaluate_hybrid_policy(&self) -> Result<ScalingAction> {
let threshold_action = self.evaluate_threshold_policy().await?;
let predictive_action = self.evaluate_predictive_policy().await?;
if threshold_action == predictive_action {
Ok(threshold_action)
} else {
Ok(ScalingAction::NoAction)
}
}
fn calculate_target_node_count(&self, action: ScalingAction, current: usize) -> usize {
let target = match action {
ScalingAction::ScaleUp => {
if self.config.gradual_scaling {
current + 1
} else {
(current as f64 * 1.5).ceil() as usize
}
}
ScalingAction::ScaleDown => {
if self.config.gradual_scaling {
current.saturating_sub(1)
} else {
(current as f64 * 0.7).ceil() as usize
}
}
ScalingAction::NoAction => current,
};
target.max(self.config.min_nodes).min(self.config.max_nodes)
}
async fn scale_up(&self, count: usize) -> Result<Vec<OxirsNodeId>> {
info!("Scaling up: adding {} nodes", count);
let base_id = 1000 + *self.current_node_count.read().await as u64;
let node_ids: Vec<OxirsNodeId> = (0..count).map(|i| base_id + i as u64).collect();
Ok(node_ids)
}
async fn scale_down(&self, count: usize) -> Result<Vec<OxirsNodeId>> {
info!("Scaling down: removing {} nodes", count);
let base_id = 1000 + *self.current_node_count.read().await as u64;
let node_ids: Vec<OxirsNodeId> = (0..count)
.map(|i| base_id.saturating_sub(i as u64 + 1))
.collect();
Ok(node_ids)
}
async fn get_latest_metrics(&self) -> LoadMetrics {
let history = self.metrics_history.read().await;
history.back().cloned().unwrap_or_default()
}
async fn increment_violation_count(&self, key: &str) -> usize {
let mut counts = self.violation_count.write().await;
let count = counts.entry(key.to_string()).or_insert(0);
*count += 1;
*count
}
async fn reset_violation_count(&self, key: &str) {
let mut counts = self.violation_count.write().await;
counts.insert(key.to_string(), 0);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_auto_scaling_manager_creation() {
let config = AutoScalingConfig::default();
let manager = AutoScalingManager::new(config, 3);
assert_eq!(manager.get_current_node_count().await, 3);
}
#[tokio::test]
async fn test_record_metrics() {
let config = AutoScalingConfig::default();
let manager = AutoScalingManager::new(config, 3);
let metrics = LoadMetrics {
cpu_utilization: 0.5,
memory_utilization: 0.6,
query_throughput: 500.0,
..Default::default()
};
manager.record_metrics(metrics).await;
let history = manager.get_metrics_history(3600).await;
assert_eq!(history.len(), 1);
}
#[tokio::test]
async fn test_threshold_scale_up() {
let config = AutoScalingConfig {
consecutive_violations: 1,
..Default::default()
};
let manager = AutoScalingManager::new(config, 3);
let metrics = LoadMetrics {
cpu_utilization: 0.9,
memory_utilization: 0.85,
..Default::default()
};
manager.record_metrics(metrics).await;
let action = manager.evaluate_scaling().await.unwrap();
assert_eq!(action, ScalingAction::ScaleUp);
}
#[tokio::test]
async fn test_threshold_scale_down() {
let config = AutoScalingConfig {
consecutive_violations: 1,
..Default::default()
};
let manager = AutoScalingManager::new(config, 5);
let metrics = LoadMetrics {
cpu_utilization: 0.1,
memory_utilization: 0.15,
..Default::default()
};
manager.record_metrics(metrics).await;
let action = manager.evaluate_scaling().await.unwrap();
assert_eq!(action, ScalingAction::ScaleDown);
}
#[tokio::test]
async fn test_execute_scaling() {
let config = AutoScalingConfig::default();
let manager = AutoScalingManager::new(config, 3);
let node_ids = manager
.execute_scaling(ScalingAction::ScaleUp)
.await
.unwrap();
assert!(!node_ids.is_empty());
assert_eq!(manager.get_current_node_count().await, 4);
}
#[tokio::test]
async fn test_scaling_statistics() {
let config = AutoScalingConfig::default();
let manager = AutoScalingManager::new(config, 3);
manager
.execute_scaling(ScalingAction::ScaleUp)
.await
.unwrap();
let stats = manager.get_statistics().await;
assert_eq!(stats.total_scale_ups, 1);
assert_eq!(stats.total_nodes_added, 1);
}
#[tokio::test]
async fn test_min_max_constraints() {
let config = AutoScalingConfig {
min_nodes: 2,
max_nodes: 5,
..Default::default()
};
let manager = AutoScalingManager::new(config, 3);
manager
.execute_scaling(ScalingAction::ScaleDown)
.await
.unwrap();
manager
.execute_scaling(ScalingAction::ScaleDown)
.await
.unwrap();
manager
.execute_scaling(ScalingAction::ScaleDown)
.await
.unwrap();
assert_eq!(manager.get_current_node_count().await, 2);
}
#[tokio::test]
async fn test_cooldown_period() {
let config = AutoScalingConfig {
cooldown_period_secs: 10,
consecutive_violations: 1,
..Default::default()
};
let manager = AutoScalingManager::new(config, 3);
manager
.execute_scaling(ScalingAction::ScaleUp)
.await
.unwrap();
let metrics = LoadMetrics {
cpu_utilization: 0.9,
..Default::default()
};
manager.record_metrics(metrics).await;
let action = manager.evaluate_scaling().await.unwrap();
assert_eq!(action, ScalingAction::NoAction);
}
}