use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
struct MovingAverage {
window_size: usize,
values: VecDeque<f64>,
sum: f64,
}
impl MovingAverage {
fn new(window_size: usize) -> Self {
Self {
window_size,
values: VecDeque::with_capacity(window_size),
sum: 0.0,
}
}
fn add(&mut self, value: f64) {
if self.values.len() >= self.window_size {
if let Some(old) = self.values.pop_front() {
self.sum -= old;
}
}
self.values.push_back(value);
self.sum += value;
}
fn mean(&self) -> f64 {
if self.values.is_empty() {
0.0
} else {
self.sum / self.values.len() as f64
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ScalingMode {
Manual,
Horizontal,
Vertical,
Hybrid,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ScalingDirection {
ScaleUp { amount: usize },
ScaleDown { amount: usize },
NoChange,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PartitionStrategy {
RoundRobin,
Hash { key_field: String },
Range { ranges: Vec<(i64, i64)> },
ConsistentHash { virtual_nodes: usize },
Custom { strategy_name: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LoadBalancingStrategy {
RoundRobin,
LeastConnections,
LeastLoaded,
Weighted { weights: HashMap<String, f64> },
ConsistentHash,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceLimits {
pub max_cpu_cores: usize,
pub max_memory_bytes: u64,
pub max_network_bandwidth: u64,
pub max_partitions: usize,
pub min_partitions: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScalingConfig {
pub mode: ScalingMode,
pub partition_strategy: PartitionStrategy,
pub load_balancing: LoadBalancingStrategy,
pub resource_limits: ResourceLimits,
pub scale_up_threshold: f64,
pub scale_down_threshold: f64,
pub cooldown_period: Duration,
pub enable_adaptive_buffering: bool,
pub initial_buffer_size: usize,
pub max_buffer_size: usize,
pub min_buffer_size: usize,
}
impl Default for ScalingConfig {
fn default() -> Self {
Self {
mode: ScalingMode::Hybrid,
partition_strategy: PartitionStrategy::RoundRobin,
load_balancing: LoadBalancingStrategy::LeastLoaded,
resource_limits: ResourceLimits {
max_cpu_cores: num_cpus::get(),
max_memory_bytes: 8 * 1024 * 1024 * 1024, max_network_bandwidth: 1_000_000_000, max_partitions: 100,
min_partitions: 1,
},
scale_up_threshold: 0.8,
scale_down_threshold: 0.3,
cooldown_period: Duration::from_secs(60),
enable_adaptive_buffering: true,
initial_buffer_size: 10000,
max_buffer_size: 1000000,
min_buffer_size: 1000,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Partition {
pub partition_id: String,
pub partition_number: usize,
pub owner_node: Option<String>,
pub replica_nodes: Vec<String>,
pub load: f64,
pub event_count: u64,
pub created_at: DateTime<Utc>,
pub last_updated: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Node {
pub node_id: String,
pub address: String,
pub partitions: Vec<usize>,
pub resource_usage: ResourceUsage,
pub health: NodeHealth,
pub last_heartbeat: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceUsage {
pub cpu_usage: f64,
pub memory_usage: u64,
pub network_usage: u64,
pub events_per_second: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum NodeHealth {
Healthy,
Degraded,
Unhealthy,
Offline,
}
pub struct AdaptiveBuffer<T> {
config: ScalingConfig,
buffer: Arc<RwLock<VecDeque<T>>>,
current_size: Arc<RwLock<usize>>,
load_history: Arc<RwLock<VecDeque<f64>>>,
moving_avg: Arc<RwLock<MovingAverage>>,
last_resize: Arc<RwLock<Instant>>,
}
impl<T> AdaptiveBuffer<T> {
pub fn new(config: ScalingConfig) -> Self {
Self {
current_size: Arc::new(RwLock::new(config.initial_buffer_size)),
config,
buffer: Arc::new(RwLock::new(VecDeque::new())),
load_history: Arc::new(RwLock::new(VecDeque::with_capacity(100))),
moving_avg: Arc::new(RwLock::new(MovingAverage::new(10))),
last_resize: Arc::new(RwLock::new(Instant::now())),
}
}
pub fn push(&self, item: T) -> Result<()> {
let mut buffer = self.buffer.write();
let current_size = *self.current_size.read();
if buffer.len() >= current_size {
self.try_resize()?;
let new_size = *self.current_size.read();
if buffer.len() >= new_size {
return Err(anyhow!("Buffer full: {}/{}", buffer.len(), new_size));
}
}
buffer.push_back(item);
self.update_load_metrics(buffer.len(), current_size);
Ok(())
}
pub fn pop(&self) -> Option<T> {
let mut buffer = self.buffer.write();
let item = buffer.pop_front();
let current_size = *self.current_size.read();
self.update_load_metrics(buffer.len(), current_size);
item
}
pub fn utilization(&self) -> f64 {
let buffer = self.buffer.read();
let current_size = *self.current_size.read();
buffer.len() as f64 / current_size as f64
}
fn update_load_metrics(&self, buffer_len: usize, current_size: usize) {
let load = buffer_len as f64 / current_size as f64;
let mut history = self.load_history.write();
history.push_back(load);
if history.len() > 100 {
history.pop_front();
}
let mut moving_avg = self.moving_avg.write();
moving_avg.add(load);
}
fn try_resize(&self) -> Result<()> {
if !self.config.enable_adaptive_buffering {
return Ok(());
}
let last_resize = *self.last_resize.read();
if last_resize.elapsed() < Duration::from_secs(10) {
return Ok(());
}
let moving_avg = self.moving_avg.read();
let avg_load = moving_avg.mean();
let mut current_size = self.current_size.write();
if avg_load > self.config.scale_up_threshold {
let new_size = (*current_size * 2).min(self.config.max_buffer_size);
if new_size > *current_size {
*current_size = new_size;
*self.last_resize.write() = Instant::now();
info!("Scaled up buffer to {}", new_size);
}
} else if avg_load < self.config.scale_down_threshold {
let new_size = (*current_size / 2).max(self.config.min_buffer_size);
if new_size < *current_size {
*current_size = new_size;
*self.last_resize.write() = Instant::now();
info!("Scaled down buffer to {}", new_size);
}
}
Ok(())
}
pub fn size(&self) -> usize {
*self.current_size.read()
}
pub fn len(&self) -> usize {
self.buffer.read().len()
}
pub fn is_empty(&self) -> bool {
self.buffer.read().is_empty()
}
}
pub struct PartitionManager {
config: ScalingConfig,
partitions: Arc<DashMap<usize, Partition>>,
nodes: Arc<DashMap<String, Node>>,
assignments: Arc<DashMap<usize, String>>,
last_scaling: Arc<RwLock<Instant>>,
counter: Arc<RwLock<usize>>,
}
impl PartitionManager {
pub fn new(config: ScalingConfig) -> Self {
let manager = Self {
config: config.clone(),
partitions: Arc::new(DashMap::new()),
nodes: Arc::new(DashMap::new()),
assignments: Arc::new(DashMap::new()),
last_scaling: Arc::new(RwLock::new(Instant::now())),
counter: Arc::new(RwLock::new(0)),
};
for i in 0..config.resource_limits.min_partitions {
manager.create_partition(i);
}
manager
}
fn create_partition(&self, partition_number: usize) {
let partition = Partition {
partition_id: Uuid::new_v4().to_string(),
partition_number,
owner_node: None,
replica_nodes: Vec::new(),
load: 0.0,
event_count: 0,
created_at: Utc::now(),
last_updated: Utc::now(),
};
self.partitions.insert(partition_number, partition);
info!("Created partition {}", partition_number);
}
pub fn add_node(&self, node: Node) -> Result<()> {
let node_id = node.node_id.clone();
self.nodes.insert(node_id.clone(), node);
self.rebalance_partitions()?;
info!("Added node {}", node_id);
Ok(())
}
pub fn remove_node(&self, node_id: &str) -> Result<()> {
self.nodes.remove(node_id);
self.rebalance_partitions()?;
info!("Removed node {}", node_id);
Ok(())
}
fn rebalance_partitions(&self) -> Result<()> {
let nodes: Vec<_> = self.nodes.iter().map(|e| e.key().clone()).collect();
if nodes.is_empty() {
warn!("No nodes available for partition assignment");
return Ok(());
}
let partitions: Vec<_> = self.partitions.iter().map(|e| *e.key()).collect();
for (idx, partition_num) in partitions.iter().enumerate() {
let node_id = &nodes[idx % nodes.len()];
self.assignments.insert(*partition_num, node_id.clone());
if let Some(mut partition) = self.partitions.get_mut(partition_num) {
partition.owner_node = Some(node_id.clone());
partition.last_updated = Utc::now();
}
if let Some(mut node) = self.nodes.get_mut(node_id) {
if !node.partitions.contains(partition_num) {
node.partitions.push(*partition_num);
}
}
}
debug!(
"Rebalanced {} partitions across {} nodes",
partitions.len(),
nodes.len()
);
Ok(())
}
pub fn evaluate_scaling(&self) -> ScalingDirection {
if !matches!(
self.config.mode,
ScalingMode::Horizontal | ScalingMode::Hybrid
) {
return ScalingDirection::NoChange;
}
if self.last_scaling.read().elapsed() < self.config.cooldown_period {
return ScalingDirection::NoChange;
}
let partitions: Vec<_> = self.partitions.iter().map(|e| e.clone()).collect();
if partitions.is_empty() {
return ScalingDirection::NoChange;
}
let avg_load = partitions.iter().map(|p| p.load).sum::<f64>() / partitions.len() as f64;
if avg_load > self.config.scale_up_threshold
&& partitions.len() < self.config.resource_limits.max_partitions
{
let amount = ((partitions.len() as f64 * 0.5).ceil() as usize)
.min(self.config.resource_limits.max_partitions - partitions.len())
.max(1);
ScalingDirection::ScaleUp { amount }
} else if avg_load < self.config.scale_down_threshold
&& partitions.len() > self.config.resource_limits.min_partitions
{
let amount = ((partitions.len() as f64 * 0.25).ceil() as usize)
.min(partitions.len() - self.config.resource_limits.min_partitions)
.max(1);
ScalingDirection::ScaleDown { amount }
} else {
ScalingDirection::NoChange
}
}
pub fn apply_scaling(&self, direction: &ScalingDirection) -> Result<()> {
match direction {
ScalingDirection::ScaleUp { amount } => {
let current_max = self.partitions.iter().map(|e| *e.key()).max().unwrap_or(0);
for i in 1..=*amount {
let partition_num = current_max + i;
if partition_num < self.config.resource_limits.max_partitions {
self.create_partition(partition_num);
}
}
self.rebalance_partitions()?;
*self.last_scaling.write() = Instant::now();
info!("Scaled up by {} partitions", amount);
}
ScalingDirection::ScaleDown { amount } => {
let partition_nums: Vec<_> = self.partitions.iter().map(|e| *e.key()).collect();
let mut removed = 0;
for partition_num in partition_nums.iter().rev() {
if removed >= *amount {
break;
}
if partition_nums.len() - removed > self.config.resource_limits.min_partitions {
self.partitions.remove(partition_num);
self.assignments.remove(partition_num);
removed += 1;
}
}
self.rebalance_partitions()?;
*self.last_scaling.write() = Instant::now();
info!("Scaled down by {} partitions", removed);
}
ScalingDirection::NoChange => {}
}
Ok(())
}
pub fn get_partition_for_key(&self, key: &str) -> usize {
match &self.config.partition_strategy {
PartitionStrategy::RoundRobin => {
let mut counter = self.counter.write();
let partition = *counter % self.partitions.len();
*counter = counter.wrapping_add(1);
partition
}
PartitionStrategy::Hash { .. } => {
let hash = self.hash_key(key);
(hash as usize) % self.partitions.len()
}
PartitionStrategy::ConsistentHash { .. } => {
let hash = self.hash_key(key);
(hash as usize) % self.partitions.len()
}
_ => 0, }
}
fn hash_key(&self, key: &str) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
hasher.finish()
}
pub fn update_partition_load(&self, partition_num: usize, load: f64) {
if let Some(mut partition) = self.partitions.get_mut(&partition_num) {
partition.load = load;
partition.last_updated = Utc::now();
}
}
pub fn partition_count(&self) -> usize {
self.partitions.len()
}
pub fn node_count(&self) -> usize {
self.nodes.len()
}
}
pub struct AutoScaler {
config: ScalingConfig,
partition_manager: Arc<PartitionManager>,
monitoring_interval: Duration,
command_tx: mpsc::UnboundedSender<ScalingCommand>,
_background_task: Option<tokio::task::JoinHandle<()>>,
}
enum ScalingCommand {
Evaluate,
Stop,
}
impl AutoScaler {
pub fn new(config: ScalingConfig, partition_manager: Arc<PartitionManager>) -> Self {
let (command_tx, mut command_rx) = mpsc::unbounded_channel();
let monitoring_interval = Duration::from_secs(30);
let partition_manager_clone = partition_manager.clone();
let background_task = tokio::spawn(async move {
let mut interval = tokio::time::interval(monitoring_interval);
loop {
tokio::select! {
_ = interval.tick() => {
let decision = partition_manager_clone.evaluate_scaling();
if !matches!(decision, ScalingDirection::NoChange) {
info!("Auto-scaler decision: {:?}", decision);
if let Err(e) = partition_manager_clone.apply_scaling(&decision) {
error!("Failed to apply scaling: {}", e);
}
}
}
Some(cmd) = command_rx.recv() => {
match cmd {
ScalingCommand::Evaluate => {
let decision = partition_manager_clone.evaluate_scaling();
if !matches!(decision, ScalingDirection::NoChange) {
if let Err(e) = partition_manager_clone.apply_scaling(&decision) {
error!("Failed to apply scaling: {}", e);
}
}
}
ScalingCommand::Stop => break,
}
}
}
}
});
Self {
config,
partition_manager,
monitoring_interval,
command_tx,
_background_task: Some(background_task),
}
}
pub fn evaluate_now(&self) -> Result<()> {
self.command_tx
.send(ScalingCommand::Evaluate)
.map_err(|e| anyhow!("Failed to send command: {}", e))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_adaptive_buffer() {
let config = ScalingConfig::default();
let buffer: AdaptiveBuffer<u64> = AdaptiveBuffer::new(config);
for i in 0..100 {
buffer.push(i).unwrap();
}
assert_eq!(buffer.len(), 100);
for _ in 0..50 {
assert!(buffer.pop().is_some());
}
assert_eq!(buffer.len(), 50);
}
#[test]
fn test_partition_manager() {
let config = ScalingConfig::default();
let manager = PartitionManager::new(config);
let node = Node {
node_id: "node-1".to_string(),
address: "localhost:8001".to_string(),
partitions: Vec::new(),
resource_usage: ResourceUsage {
cpu_usage: 0.5,
memory_usage: 1024 * 1024 * 1024,
network_usage: 1000000,
events_per_second: 1000.0,
},
health: NodeHealth::Healthy,
last_heartbeat: Utc::now(),
};
manager.add_node(node).unwrap();
assert_eq!(manager.node_count(), 1);
assert!(manager.partition_count() >= 1);
}
#[test]
fn test_partition_assignment() {
let config = ScalingConfig::default();
let manager = PartitionManager::new(config);
let partition = manager.get_partition_for_key("test-key");
assert!(partition < manager.partition_count());
}
}