1use anyhow::{anyhow, Result};
23use chrono::{DateTime, Utc};
24use dashmap::DashMap;
25use parking_lot::RwLock;
26use serde::{Deserialize, Serialize};
27use std::collections::{HashMap, VecDeque};
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30use tokio::sync::mpsc;
31use tracing::{debug, error, info, warn};
32use uuid::Uuid;
33
34struct MovingAverage {
38 window_size: usize,
39 values: VecDeque<f64>,
40 sum: f64,
41}
42
43impl MovingAverage {
44 fn new(window_size: usize) -> Self {
45 Self {
46 window_size,
47 values: VecDeque::with_capacity(window_size),
48 sum: 0.0,
49 }
50 }
51
52 fn add(&mut self, value: f64) {
53 if self.values.len() >= self.window_size {
54 if let Some(old) = self.values.pop_front() {
55 self.sum -= old;
56 }
57 }
58 self.values.push_back(value);
59 self.sum += value;
60 }
61
62 fn mean(&self) -> f64 {
63 if self.values.is_empty() {
64 0.0
65 } else {
66 self.sum / self.values.len() as f64
67 }
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
73pub enum ScalingMode {
74 Manual,
76 Horizontal,
78 Vertical,
80 Hybrid,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub enum ScalingDirection {
87 ScaleUp { amount: usize },
89 ScaleDown { amount: usize },
91 NoChange,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97pub enum PartitionStrategy {
98 RoundRobin,
100 Hash { key_field: String },
102 Range { ranges: Vec<(i64, i64)> },
104 ConsistentHash { virtual_nodes: usize },
106 Custom { strategy_name: String },
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
112pub enum LoadBalancingStrategy {
113 RoundRobin,
115 LeastConnections,
117 LeastLoaded,
119 Weighted { weights: HashMap<String, f64> },
121 ConsistentHash,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct ResourceLimits {
128 pub max_cpu_cores: usize,
130 pub max_memory_bytes: u64,
132 pub max_network_bandwidth: u64,
134 pub max_partitions: usize,
136 pub min_partitions: usize,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct ScalingConfig {
143 pub mode: ScalingMode,
145 pub partition_strategy: PartitionStrategy,
147 pub load_balancing: LoadBalancingStrategy,
149 pub resource_limits: ResourceLimits,
151 pub scale_up_threshold: f64,
153 pub scale_down_threshold: f64,
155 pub cooldown_period: Duration,
157 pub enable_adaptive_buffering: bool,
159 pub initial_buffer_size: usize,
161 pub max_buffer_size: usize,
163 pub min_buffer_size: usize,
165}
166
167impl Default for ScalingConfig {
168 fn default() -> Self {
169 Self {
170 mode: ScalingMode::Hybrid,
171 partition_strategy: PartitionStrategy::RoundRobin,
172 load_balancing: LoadBalancingStrategy::LeastLoaded,
173 resource_limits: ResourceLimits {
174 max_cpu_cores: num_cpus::get(),
175 max_memory_bytes: 8 * 1024 * 1024 * 1024, max_network_bandwidth: 1_000_000_000, max_partitions: 100,
178 min_partitions: 1,
179 },
180 scale_up_threshold: 0.8,
181 scale_down_threshold: 0.3,
182 cooldown_period: Duration::from_secs(60),
183 enable_adaptive_buffering: true,
184 initial_buffer_size: 10000,
185 max_buffer_size: 1000000,
186 min_buffer_size: 1000,
187 }
188 }
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
193pub struct Partition {
194 pub partition_id: String,
196 pub partition_number: usize,
198 pub owner_node: Option<String>,
200 pub replica_nodes: Vec<String>,
202 pub load: f64,
204 pub event_count: u64,
206 pub created_at: DateTime<Utc>,
208 pub last_updated: DateTime<Utc>,
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct Node {
215 pub node_id: String,
217 pub address: String,
219 pub partitions: Vec<usize>,
221 pub resource_usage: ResourceUsage,
223 pub health: NodeHealth,
225 pub last_heartbeat: DateTime<Utc>,
227}
228
229#[derive(Debug, Clone, Serialize, Deserialize)]
231pub struct ResourceUsage {
232 pub cpu_usage: f64,
234 pub memory_usage: u64,
236 pub network_usage: u64,
238 pub events_per_second: f64,
240}
241
242#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
244pub enum NodeHealth {
245 Healthy,
246 Degraded,
247 Unhealthy,
248 Offline,
249}
250
251pub struct AdaptiveBuffer<T> {
253 config: ScalingConfig,
255 buffer: Arc<RwLock<VecDeque<T>>>,
257 current_size: Arc<RwLock<usize>>,
259 load_history: Arc<RwLock<VecDeque<f64>>>,
261 moving_avg: Arc<RwLock<MovingAverage>>,
263 last_resize: Arc<RwLock<Instant>>,
265}
266
267impl<T> AdaptiveBuffer<T> {
268 pub fn new(config: ScalingConfig) -> Self {
270 Self {
271 current_size: Arc::new(RwLock::new(config.initial_buffer_size)),
272 config,
273 buffer: Arc::new(RwLock::new(VecDeque::new())),
274 load_history: Arc::new(RwLock::new(VecDeque::with_capacity(100))),
275 moving_avg: Arc::new(RwLock::new(MovingAverage::new(10))),
276 last_resize: Arc::new(RwLock::new(Instant::now())),
277 }
278 }
279
280 pub fn push(&self, item: T) -> Result<()> {
282 let mut buffer = self.buffer.write();
283 let current_size = *self.current_size.read();
284
285 if buffer.len() >= current_size {
286 self.try_resize()?;
288
289 let new_size = *self.current_size.read();
290 if buffer.len() >= new_size {
291 return Err(anyhow!("Buffer full: {}/{}", buffer.len(), new_size));
292 }
293 }
294
295 buffer.push_back(item);
296 self.update_load_metrics(buffer.len(), current_size);
297 Ok(())
298 }
299
300 pub fn pop(&self) -> Option<T> {
302 let mut buffer = self.buffer.write();
303 let item = buffer.pop_front();
304
305 let current_size = *self.current_size.read();
306 self.update_load_metrics(buffer.len(), current_size);
307 item
308 }
309
310 pub fn utilization(&self) -> f64 {
312 let buffer = self.buffer.read();
313 let current_size = *self.current_size.read();
314 buffer.len() as f64 / current_size as f64
315 }
316
317 fn update_load_metrics(&self, buffer_len: usize, current_size: usize) {
319 let load = buffer_len as f64 / current_size as f64;
320
321 let mut history = self.load_history.write();
322 history.push_back(load);
323 if history.len() > 100 {
324 history.pop_front();
325 }
326
327 let mut moving_avg = self.moving_avg.write();
328 moving_avg.add(load);
329 }
330
331 fn try_resize(&self) -> Result<()> {
333 if !self.config.enable_adaptive_buffering {
334 return Ok(());
335 }
336
337 let last_resize = *self.last_resize.read();
338 if last_resize.elapsed() < Duration::from_secs(10) {
339 return Ok(());
341 }
342
343 let moving_avg = self.moving_avg.read();
344 let avg_load = moving_avg.mean();
345
346 let mut current_size = self.current_size.write();
347
348 if avg_load > self.config.scale_up_threshold {
349 let new_size = (*current_size * 2).min(self.config.max_buffer_size);
351 if new_size > *current_size {
352 *current_size = new_size;
353 *self.last_resize.write() = Instant::now();
354 info!("Scaled up buffer to {}", new_size);
355 }
356 } else if avg_load < self.config.scale_down_threshold {
357 let new_size = (*current_size / 2).max(self.config.min_buffer_size);
359 if new_size < *current_size {
360 *current_size = new_size;
361 *self.last_resize.write() = Instant::now();
362 info!("Scaled down buffer to {}", new_size);
363 }
364 }
365
366 Ok(())
367 }
368
369 pub fn size(&self) -> usize {
371 *self.current_size.read()
372 }
373
374 pub fn len(&self) -> usize {
376 self.buffer.read().len()
377 }
378
379 pub fn is_empty(&self) -> bool {
381 self.buffer.read().is_empty()
382 }
383}
384
385pub struct PartitionManager {
387 config: ScalingConfig,
389 partitions: Arc<DashMap<usize, Partition>>,
391 nodes: Arc<DashMap<String, Node>>,
393 assignments: Arc<DashMap<usize, String>>,
395 last_scaling: Arc<RwLock<Instant>>,
397 counter: Arc<RwLock<usize>>,
399}
400
401impl PartitionManager {
402 pub fn new(config: ScalingConfig) -> Self {
404 let manager = Self {
405 config: config.clone(),
406 partitions: Arc::new(DashMap::new()),
407 nodes: Arc::new(DashMap::new()),
408 assignments: Arc::new(DashMap::new()),
409 last_scaling: Arc::new(RwLock::new(Instant::now())),
410 counter: Arc::new(RwLock::new(0)),
411 };
412
413 for i in 0..config.resource_limits.min_partitions {
415 manager.create_partition(i);
416 }
417
418 manager
419 }
420
421 fn create_partition(&self, partition_number: usize) {
423 let partition = Partition {
424 partition_id: Uuid::new_v4().to_string(),
425 partition_number,
426 owner_node: None,
427 replica_nodes: Vec::new(),
428 load: 0.0,
429 event_count: 0,
430 created_at: Utc::now(),
431 last_updated: Utc::now(),
432 };
433
434 self.partitions.insert(partition_number, partition);
435 info!("Created partition {}", partition_number);
436 }
437
438 pub fn add_node(&self, node: Node) -> Result<()> {
440 let node_id = node.node_id.clone();
441 self.nodes.insert(node_id.clone(), node);
442
443 self.rebalance_partitions()?;
445
446 info!("Added node {}", node_id);
447 Ok(())
448 }
449
450 pub fn remove_node(&self, node_id: &str) -> Result<()> {
452 self.nodes.remove(node_id);
453
454 self.rebalance_partitions()?;
456
457 info!("Removed node {}", node_id);
458 Ok(())
459 }
460
461 fn rebalance_partitions(&self) -> Result<()> {
463 let nodes: Vec<_> = self.nodes.iter().map(|e| e.key().clone()).collect();
464
465 if nodes.is_empty() {
466 warn!("No nodes available for partition assignment");
467 return Ok(());
468 }
469
470 let partitions: Vec<_> = self.partitions.iter().map(|e| *e.key()).collect();
471
472 for (idx, partition_num) in partitions.iter().enumerate() {
474 let node_id = &nodes[idx % nodes.len()];
475 self.assignments.insert(*partition_num, node_id.clone());
476
477 if let Some(mut partition) = self.partitions.get_mut(partition_num) {
479 partition.owner_node = Some(node_id.clone());
480 partition.last_updated = Utc::now();
481 }
482
483 if let Some(mut node) = self.nodes.get_mut(node_id) {
485 if !node.partitions.contains(partition_num) {
486 node.partitions.push(*partition_num);
487 }
488 }
489 }
490
491 debug!(
492 "Rebalanced {} partitions across {} nodes",
493 partitions.len(),
494 nodes.len()
495 );
496 Ok(())
497 }
498
499 pub fn evaluate_scaling(&self) -> ScalingDirection {
501 if !matches!(
502 self.config.mode,
503 ScalingMode::Horizontal | ScalingMode::Hybrid
504 ) {
505 return ScalingDirection::NoChange;
506 }
507
508 if self.last_scaling.read().elapsed() < self.config.cooldown_period {
510 return ScalingDirection::NoChange;
511 }
512
513 let partitions: Vec<_> = self.partitions.iter().map(|e| e.clone()).collect();
515
516 if partitions.is_empty() {
517 return ScalingDirection::NoChange;
518 }
519
520 let avg_load = partitions.iter().map(|p| p.load).sum::<f64>() / partitions.len() as f64;
521
522 if avg_load > self.config.scale_up_threshold
523 && partitions.len() < self.config.resource_limits.max_partitions
524 {
525 let amount = ((partitions.len() as f64 * 0.5).ceil() as usize)
527 .min(self.config.resource_limits.max_partitions - partitions.len())
528 .max(1);
529 ScalingDirection::ScaleUp { amount }
530 } else if avg_load < self.config.scale_down_threshold
531 && partitions.len() > self.config.resource_limits.min_partitions
532 {
533 let amount = ((partitions.len() as f64 * 0.25).ceil() as usize)
535 .min(partitions.len() - self.config.resource_limits.min_partitions)
536 .max(1);
537 ScalingDirection::ScaleDown { amount }
538 } else {
539 ScalingDirection::NoChange
540 }
541 }
542
543 pub fn apply_scaling(&self, direction: &ScalingDirection) -> Result<()> {
545 match direction {
546 ScalingDirection::ScaleUp { amount } => {
547 let current_max = self.partitions.iter().map(|e| *e.key()).max().unwrap_or(0);
548
549 for i in 1..=*amount {
550 let partition_num = current_max + i;
551 if partition_num < self.config.resource_limits.max_partitions {
552 self.create_partition(partition_num);
553 }
554 }
555
556 self.rebalance_partitions()?;
557 *self.last_scaling.write() = Instant::now();
558
559 info!("Scaled up by {} partitions", amount);
560 }
561 ScalingDirection::ScaleDown { amount } => {
562 let partition_nums: Vec<_> = self.partitions.iter().map(|e| *e.key()).collect();
563
564 let mut removed = 0;
566 for partition_num in partition_nums.iter().rev() {
567 if removed >= *amount {
568 break;
569 }
570 if partition_nums.len() - removed > self.config.resource_limits.min_partitions {
571 self.partitions.remove(partition_num);
572 self.assignments.remove(partition_num);
573 removed += 1;
574 }
575 }
576
577 self.rebalance_partitions()?;
578 *self.last_scaling.write() = Instant::now();
579
580 info!("Scaled down by {} partitions", removed);
581 }
582 ScalingDirection::NoChange => {}
583 }
584
585 Ok(())
586 }
587
588 pub fn get_partition_for_key(&self, key: &str) -> usize {
590 match &self.config.partition_strategy {
591 PartitionStrategy::RoundRobin => {
592 let mut counter = self.counter.write();
594 let partition = *counter % self.partitions.len();
595 *counter = counter.wrapping_add(1);
596 partition
597 }
598 PartitionStrategy::Hash { .. } => {
599 let hash = self.hash_key(key);
601 (hash as usize) % self.partitions.len()
602 }
603 PartitionStrategy::ConsistentHash { .. } => {
604 let hash = self.hash_key(key);
606 (hash as usize) % self.partitions.len()
607 }
608 _ => 0, }
610 }
611
612 fn hash_key(&self, key: &str) -> u64 {
614 use std::collections::hash_map::DefaultHasher;
615 use std::hash::{Hash, Hasher};
616
617 let mut hasher = DefaultHasher::new();
618 key.hash(&mut hasher);
619 hasher.finish()
620 }
621
622 pub fn update_partition_load(&self, partition_num: usize, load: f64) {
624 if let Some(mut partition) = self.partitions.get_mut(&partition_num) {
625 partition.load = load;
626 partition.last_updated = Utc::now();
627 }
628 }
629
630 pub fn partition_count(&self) -> usize {
632 self.partitions.len()
633 }
634
635 pub fn node_count(&self) -> usize {
637 self.nodes.len()
638 }
639}
640
641pub struct AutoScaler {
643 config: ScalingConfig,
645 partition_manager: Arc<PartitionManager>,
647 monitoring_interval: Duration,
649 command_tx: mpsc::UnboundedSender<ScalingCommand>,
651 _background_task: Option<tokio::task::JoinHandle<()>>,
653}
654
655enum ScalingCommand {
657 Evaluate,
658 Stop,
659}
660
661impl AutoScaler {
662 pub fn new(config: ScalingConfig, partition_manager: Arc<PartitionManager>) -> Self {
664 let (command_tx, mut command_rx) = mpsc::unbounded_channel();
665
666 let monitoring_interval = Duration::from_secs(30);
667
668 let partition_manager_clone = partition_manager.clone();
669 let background_task = tokio::spawn(async move {
670 let mut interval = tokio::time::interval(monitoring_interval);
671
672 loop {
673 tokio::select! {
674 _ = interval.tick() => {
675 let decision = partition_manager_clone.evaluate_scaling();
676 if !matches!(decision, ScalingDirection::NoChange) {
677 info!("Auto-scaler decision: {:?}", decision);
678 if let Err(e) = partition_manager_clone.apply_scaling(&decision) {
679 error!("Failed to apply scaling: {}", e);
680 }
681 }
682 }
683 Some(cmd) = command_rx.recv() => {
684 match cmd {
685 ScalingCommand::Evaluate => {
686 let decision = partition_manager_clone.evaluate_scaling();
687 if !matches!(decision, ScalingDirection::NoChange) {
688 if let Err(e) = partition_manager_clone.apply_scaling(&decision) {
689 error!("Failed to apply scaling: {}", e);
690 }
691 }
692 }
693 ScalingCommand::Stop => break,
694 }
695 }
696 }
697 }
698 });
699
700 Self {
701 config,
702 partition_manager,
703 monitoring_interval,
704 command_tx,
705 _background_task: Some(background_task),
706 }
707 }
708
709 pub fn evaluate_now(&self) -> Result<()> {
711 self.command_tx
712 .send(ScalingCommand::Evaluate)
713 .map_err(|e| anyhow!("Failed to send command: {}", e))
714 }
715}
716
717#[cfg(test)]
718mod tests {
719 use super::*;
720
721 #[test]
722 fn test_adaptive_buffer() {
723 let config = ScalingConfig::default();
724 let buffer: AdaptiveBuffer<u64> = AdaptiveBuffer::new(config);
725
726 for i in 0..100 {
728 buffer.push(i).unwrap();
729 }
730
731 assert_eq!(buffer.len(), 100);
732
733 for _ in 0..50 {
735 assert!(buffer.pop().is_some());
736 }
737
738 assert_eq!(buffer.len(), 50);
739 }
740
741 #[test]
742 fn test_partition_manager() {
743 let config = ScalingConfig::default();
744 let manager = PartitionManager::new(config);
745
746 let node = Node {
748 node_id: "node-1".to_string(),
749 address: "localhost:8001".to_string(),
750 partitions: Vec::new(),
751 resource_usage: ResourceUsage {
752 cpu_usage: 0.5,
753 memory_usage: 1024 * 1024 * 1024,
754 network_usage: 1000000,
755 events_per_second: 1000.0,
756 },
757 health: NodeHealth::Healthy,
758 last_heartbeat: Utc::now(),
759 };
760
761 manager.add_node(node).unwrap();
762
763 assert_eq!(manager.node_count(), 1);
764 assert!(manager.partition_count() >= 1);
765 }
766
767 #[test]
768 fn test_partition_assignment() {
769 let config = ScalingConfig::default();
770 let manager = PartitionManager::new(config);
771
772 let partition = manager.get_partition_for_key("test-key");
773 assert!(partition < manager.partition_count());
774 }
775}