Skip to main content

oxirs_stream/
scalability.rs

1//! # Scalability Features for Stream Processing
2//!
3//! This module provides comprehensive scalability capabilities including
4//! horizontal and vertical scaling, adaptive buffering, and dynamic resource management.
5//!
6//! ## Features
7//!
8//! - **Horizontal Scaling**: Dynamic partition management and load balancing
9//! - **Vertical Scaling**: Adaptive resource allocation
10//! - **Adaptive Buffering**: Smart buffer sizing based on load
11//! - **Dynamic Partitioning**: Automatic partition assignment and rebalancing
12//! - **Resource Optimization**: CPU, memory, and network optimization
13//! - **Auto-scaling**: Automatic scaling based on metrics
14//!
15//! ## Use Cases
16//!
17//! - **High Throughput**: Handle millions of events per second
18//! - **Burst Handling**: Adapt to traffic spikes
19//! - **Cost Optimization**: Scale down during low traffic
20//! - **Fault Tolerance**: Redistribute load on failures
21
22use anyhow::{anyhow, Result};
23use chrono::{DateTime, Utc};
24use dashmap::DashMap;
25use parking_lot::RwLock;
26use serde::{Deserialize, Serialize};
27use std::collections::{HashMap, VecDeque};
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30use tokio::sync::mpsc;
31use tracing::{debug, error, info, warn};
32use uuid::Uuid;
33
34// Use SciRS2 for scientific computing and statistics
35
36/// Simple moving average calculator
37struct MovingAverage {
38    window_size: usize,
39    values: VecDeque<f64>,
40    sum: f64,
41}
42
43impl MovingAverage {
44    fn new(window_size: usize) -> Self {
45        Self {
46            window_size,
47            values: VecDeque::with_capacity(window_size),
48            sum: 0.0,
49        }
50    }
51
52    fn add(&mut self, value: f64) {
53        if self.values.len() >= self.window_size {
54            if let Some(old) = self.values.pop_front() {
55                self.sum -= old;
56            }
57        }
58        self.values.push_back(value);
59        self.sum += value;
60    }
61
62    fn mean(&self) -> f64 {
63        if self.values.is_empty() {
64            0.0
65        } else {
66            self.sum / self.values.len() as f64
67        }
68    }
69}
70
71/// Scaling mode
72#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
73pub enum ScalingMode {
74    /// Manual scaling - no automatic scaling
75    Manual,
76    /// Horizontal scaling - add/remove partitions
77    Horizontal,
78    /// Vertical scaling - adjust resources
79    Vertical,
80    /// Both horizontal and vertical
81    Hybrid,
82}
83
84/// Scaling direction
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub enum ScalingDirection {
87    /// Scale up
88    ScaleUp { amount: usize },
89    /// Scale down
90    ScaleDown { amount: usize },
91    /// No scaling needed
92    NoChange,
93}
94
95/// Partition strategy
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub enum PartitionStrategy {
98    /// Round-robin partitioning
99    RoundRobin,
100    /// Hash-based partitioning
101    Hash { key_field: String },
102    /// Range-based partitioning
103    Range { ranges: Vec<(i64, i64)> },
104    /// Consistent hashing
105    ConsistentHash { virtual_nodes: usize },
106    /// Custom partitioning
107    Custom { strategy_name: String },
108}
109
110/// Load balancing strategy
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub enum LoadBalancingStrategy {
113    /// Round-robin
114    RoundRobin,
115    /// Least connections
116    LeastConnections,
117    /// Least loaded
118    LeastLoaded,
119    /// Weighted distribution
120    Weighted { weights: HashMap<String, f64> },
121    /// Consistent hashing
122    ConsistentHash,
123}
124
125/// Resource limits
126#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct ResourceLimits {
128    /// Maximum CPU cores
129    pub max_cpu_cores: usize,
130    /// Maximum memory (bytes)
131    pub max_memory_bytes: u64,
132    /// Maximum network bandwidth (bytes/sec)
133    pub max_network_bandwidth: u64,
134    /// Maximum partitions
135    pub max_partitions: usize,
136    /// Minimum partitions
137    pub min_partitions: usize,
138}
139
140/// Scaling configuration
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct ScalingConfig {
143    /// Scaling mode
144    pub mode: ScalingMode,
145    /// Partition strategy
146    pub partition_strategy: PartitionStrategy,
147    /// Load balancing strategy
148    pub load_balancing: LoadBalancingStrategy,
149    /// Resource limits
150    pub resource_limits: ResourceLimits,
151    /// Scale-up threshold (0.0 to 1.0)
152    pub scale_up_threshold: f64,
153    /// Scale-down threshold (0.0 to 1.0)
154    pub scale_down_threshold: f64,
155    /// Cooldown period between scaling operations
156    pub cooldown_period: Duration,
157    /// Enable adaptive buffering
158    pub enable_adaptive_buffering: bool,
159    /// Initial buffer size
160    pub initial_buffer_size: usize,
161    /// Maximum buffer size
162    pub max_buffer_size: usize,
163    /// Minimum buffer size
164    pub min_buffer_size: usize,
165}
166
167impl Default for ScalingConfig {
168    fn default() -> Self {
169        Self {
170            mode: ScalingMode::Hybrid,
171            partition_strategy: PartitionStrategy::RoundRobin,
172            load_balancing: LoadBalancingStrategy::LeastLoaded,
173            resource_limits: ResourceLimits {
174                max_cpu_cores: num_cpus::get(),
175                max_memory_bytes: 8 * 1024 * 1024 * 1024, // 8GB
176                max_network_bandwidth: 1_000_000_000,     // 1 Gbps
177                max_partitions: 100,
178                min_partitions: 1,
179            },
180            scale_up_threshold: 0.8,
181            scale_down_threshold: 0.3,
182            cooldown_period: Duration::from_secs(60),
183            enable_adaptive_buffering: true,
184            initial_buffer_size: 10000,
185            max_buffer_size: 1000000,
186            min_buffer_size: 1000,
187        }
188    }
189}
190
191/// Partition metadata
192#[derive(Debug, Clone, Serialize, Deserialize)]
193pub struct Partition {
194    /// Partition ID
195    pub partition_id: String,
196    /// Partition number
197    pub partition_number: usize,
198    /// Owner node
199    pub owner_node: Option<String>,
200    /// Replica nodes
201    pub replica_nodes: Vec<String>,
202    /// Current load (0.0 to 1.0)
203    pub load: f64,
204    /// Number of events
205    pub event_count: u64,
206    /// Created at
207    pub created_at: DateTime<Utc>,
208    /// Last updated
209    pub last_updated: DateTime<Utc>,
210}
211
212/// Node metadata
213#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct Node {
215    /// Node ID
216    pub node_id: String,
217    /// Node address
218    pub address: String,
219    /// Assigned partitions
220    pub partitions: Vec<usize>,
221    /// Resource usage
222    pub resource_usage: ResourceUsage,
223    /// Health status
224    pub health: NodeHealth,
225    /// Last heartbeat
226    pub last_heartbeat: DateTime<Utc>,
227}
228
229/// Resource usage
230#[derive(Debug, Clone, Serialize, Deserialize)]
231pub struct ResourceUsage {
232    /// CPU usage (0.0 to 1.0)
233    pub cpu_usage: f64,
234    /// Memory usage (bytes)
235    pub memory_usage: u64,
236    /// Network usage (bytes/sec)
237    pub network_usage: u64,
238    /// Events per second
239    pub events_per_second: f64,
240}
241
242/// Node health status
243#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
244pub enum NodeHealth {
245    Healthy,
246    Degraded,
247    Unhealthy,
248    Offline,
249}
250
251/// Adaptive buffer
252pub struct AdaptiveBuffer<T> {
253    /// Buffer configuration
254    config: ScalingConfig,
255    /// Current buffer
256    buffer: Arc<RwLock<VecDeque<T>>>,
257    /// Current buffer size limit
258    current_size: Arc<RwLock<usize>>,
259    /// Load statistics (using SciRS2)
260    load_history: Arc<RwLock<VecDeque<f64>>>,
261    /// Moving average calculator
262    moving_avg: Arc<RwLock<MovingAverage>>,
263    /// Last resize time
264    last_resize: Arc<RwLock<Instant>>,
265}
266
267impl<T> AdaptiveBuffer<T> {
268    /// Create a new adaptive buffer
269    pub fn new(config: ScalingConfig) -> Self {
270        Self {
271            current_size: Arc::new(RwLock::new(config.initial_buffer_size)),
272            config,
273            buffer: Arc::new(RwLock::new(VecDeque::new())),
274            load_history: Arc::new(RwLock::new(VecDeque::with_capacity(100))),
275            moving_avg: Arc::new(RwLock::new(MovingAverage::new(10))),
276            last_resize: Arc::new(RwLock::new(Instant::now())),
277        }
278    }
279
280    /// Push an item to the buffer
281    pub fn push(&self, item: T) -> Result<()> {
282        let mut buffer = self.buffer.write();
283        let current_size = *self.current_size.read();
284
285        if buffer.len() >= current_size {
286            // Buffer is full - try to resize or reject
287            self.try_resize()?;
288
289            let new_size = *self.current_size.read();
290            if buffer.len() >= new_size {
291                return Err(anyhow!("Buffer full: {}/{}", buffer.len(), new_size));
292            }
293        }
294
295        buffer.push_back(item);
296        self.update_load_metrics(buffer.len(), current_size);
297        Ok(())
298    }
299
300    /// Pop an item from the buffer
301    pub fn pop(&self) -> Option<T> {
302        let mut buffer = self.buffer.write();
303        let item = buffer.pop_front();
304
305        let current_size = *self.current_size.read();
306        self.update_load_metrics(buffer.len(), current_size);
307        item
308    }
309
310    /// Get current buffer utilization
311    pub fn utilization(&self) -> f64 {
312        let buffer = self.buffer.read();
313        let current_size = *self.current_size.read();
314        buffer.len() as f64 / current_size as f64
315    }
316
317    /// Update load metrics
318    fn update_load_metrics(&self, buffer_len: usize, current_size: usize) {
319        let load = buffer_len as f64 / current_size as f64;
320
321        let mut history = self.load_history.write();
322        history.push_back(load);
323        if history.len() > 100 {
324            history.pop_front();
325        }
326
327        let mut moving_avg = self.moving_avg.write();
328        moving_avg.add(load);
329    }
330
331    /// Try to resize the buffer based on load
332    fn try_resize(&self) -> Result<()> {
333        if !self.config.enable_adaptive_buffering {
334            return Ok(());
335        }
336
337        let last_resize = *self.last_resize.read();
338        if last_resize.elapsed() < Duration::from_secs(10) {
339            // Cooldown period
340            return Ok(());
341        }
342
343        let moving_avg = self.moving_avg.read();
344        let avg_load = moving_avg.mean();
345
346        let mut current_size = self.current_size.write();
347
348        if avg_load > self.config.scale_up_threshold {
349            // Scale up
350            let new_size = (*current_size * 2).min(self.config.max_buffer_size);
351            if new_size > *current_size {
352                *current_size = new_size;
353                *self.last_resize.write() = Instant::now();
354                info!("Scaled up buffer to {}", new_size);
355            }
356        } else if avg_load < self.config.scale_down_threshold {
357            // Scale down
358            let new_size = (*current_size / 2).max(self.config.min_buffer_size);
359            if new_size < *current_size {
360                *current_size = new_size;
361                *self.last_resize.write() = Instant::now();
362                info!("Scaled down buffer to {}", new_size);
363            }
364        }
365
366        Ok(())
367    }
368
369    /// Get current buffer size
370    pub fn size(&self) -> usize {
371        *self.current_size.read()
372    }
373
374    /// Get current buffer length
375    pub fn len(&self) -> usize {
376        self.buffer.read().len()
377    }
378
379    /// Check if buffer is empty
380    pub fn is_empty(&self) -> bool {
381        self.buffer.read().is_empty()
382    }
383}
384
385/// Partition manager for horizontal scaling
386pub struct PartitionManager {
387    /// Configuration
388    config: ScalingConfig,
389    /// Partitions
390    partitions: Arc<DashMap<usize, Partition>>,
391    /// Nodes
392    nodes: Arc<DashMap<String, Node>>,
393    /// Partition assignments
394    assignments: Arc<DashMap<usize, String>>,
395    /// Last scaling operation
396    last_scaling: Arc<RwLock<Instant>>,
397    /// Counter for round-robin
398    counter: Arc<RwLock<usize>>,
399}
400
401impl PartitionManager {
402    /// Create a new partition manager
403    pub fn new(config: ScalingConfig) -> Self {
404        let manager = Self {
405            config: config.clone(),
406            partitions: Arc::new(DashMap::new()),
407            nodes: Arc::new(DashMap::new()),
408            assignments: Arc::new(DashMap::new()),
409            last_scaling: Arc::new(RwLock::new(Instant::now())),
410            counter: Arc::new(RwLock::new(0)),
411        };
412
413        // Initialize with minimum partitions
414        for i in 0..config.resource_limits.min_partitions {
415            manager.create_partition(i);
416        }
417
418        manager
419    }
420
421    /// Create a new partition
422    fn create_partition(&self, partition_number: usize) {
423        let partition = Partition {
424            partition_id: Uuid::new_v4().to_string(),
425            partition_number,
426            owner_node: None,
427            replica_nodes: Vec::new(),
428            load: 0.0,
429            event_count: 0,
430            created_at: Utc::now(),
431            last_updated: Utc::now(),
432        };
433
434        self.partitions.insert(partition_number, partition);
435        info!("Created partition {}", partition_number);
436    }
437
438    /// Add a node to the cluster
439    pub fn add_node(&self, node: Node) -> Result<()> {
440        let node_id = node.node_id.clone();
441        self.nodes.insert(node_id.clone(), node);
442
443        // Rebalance partitions
444        self.rebalance_partitions()?;
445
446        info!("Added node {}", node_id);
447        Ok(())
448    }
449
450    /// Remove a node from the cluster
451    pub fn remove_node(&self, node_id: &str) -> Result<()> {
452        self.nodes.remove(node_id);
453
454        // Reassign partitions from this node
455        self.rebalance_partitions()?;
456
457        info!("Removed node {}", node_id);
458        Ok(())
459    }
460
461    /// Rebalance partitions across nodes
462    fn rebalance_partitions(&self) -> Result<()> {
463        let nodes: Vec<_> = self.nodes.iter().map(|e| e.key().clone()).collect();
464
465        if nodes.is_empty() {
466            warn!("No nodes available for partition assignment");
467            return Ok(());
468        }
469
470        let partitions: Vec<_> = self.partitions.iter().map(|e| *e.key()).collect();
471
472        // Simple round-robin assignment
473        for (idx, partition_num) in partitions.iter().enumerate() {
474            let node_id = &nodes[idx % nodes.len()];
475            self.assignments.insert(*partition_num, node_id.clone());
476
477            // Update partition owner
478            if let Some(mut partition) = self.partitions.get_mut(partition_num) {
479                partition.owner_node = Some(node_id.clone());
480                partition.last_updated = Utc::now();
481            }
482
483            // Update node partitions
484            if let Some(mut node) = self.nodes.get_mut(node_id) {
485                if !node.partitions.contains(partition_num) {
486                    node.partitions.push(*partition_num);
487                }
488            }
489        }
490
491        debug!(
492            "Rebalanced {} partitions across {} nodes",
493            partitions.len(),
494            nodes.len()
495        );
496        Ok(())
497    }
498
499    /// Evaluate scaling needs
500    pub fn evaluate_scaling(&self) -> ScalingDirection {
501        if !matches!(
502            self.config.mode,
503            ScalingMode::Horizontal | ScalingMode::Hybrid
504        ) {
505            return ScalingDirection::NoChange;
506        }
507
508        // Check cooldown
509        if self.last_scaling.read().elapsed() < self.config.cooldown_period {
510            return ScalingDirection::NoChange;
511        }
512
513        // Calculate average load across partitions
514        let partitions: Vec<_> = self.partitions.iter().map(|e| e.clone()).collect();
515
516        if partitions.is_empty() {
517            return ScalingDirection::NoChange;
518        }
519
520        let avg_load = partitions.iter().map(|p| p.load).sum::<f64>() / partitions.len() as f64;
521
522        if avg_load > self.config.scale_up_threshold
523            && partitions.len() < self.config.resource_limits.max_partitions
524        {
525            // Scale up
526            let amount = ((partitions.len() as f64 * 0.5).ceil() as usize)
527                .min(self.config.resource_limits.max_partitions - partitions.len())
528                .max(1);
529            ScalingDirection::ScaleUp { amount }
530        } else if avg_load < self.config.scale_down_threshold
531            && partitions.len() > self.config.resource_limits.min_partitions
532        {
533            // Scale down
534            let amount = ((partitions.len() as f64 * 0.25).ceil() as usize)
535                .min(partitions.len() - self.config.resource_limits.min_partitions)
536                .max(1);
537            ScalingDirection::ScaleDown { amount }
538        } else {
539            ScalingDirection::NoChange
540        }
541    }
542
543    /// Apply scaling decision
544    pub fn apply_scaling(&self, direction: &ScalingDirection) -> Result<()> {
545        match direction {
546            ScalingDirection::ScaleUp { amount } => {
547                let current_max = self.partitions.iter().map(|e| *e.key()).max().unwrap_or(0);
548
549                for i in 1..=*amount {
550                    let partition_num = current_max + i;
551                    if partition_num < self.config.resource_limits.max_partitions {
552                        self.create_partition(partition_num);
553                    }
554                }
555
556                self.rebalance_partitions()?;
557                *self.last_scaling.write() = Instant::now();
558
559                info!("Scaled up by {} partitions", amount);
560            }
561            ScalingDirection::ScaleDown { amount } => {
562                let partition_nums: Vec<_> = self.partitions.iter().map(|e| *e.key()).collect();
563
564                // Remove the highest numbered partitions
565                let mut removed = 0;
566                for partition_num in partition_nums.iter().rev() {
567                    if removed >= *amount {
568                        break;
569                    }
570                    if partition_nums.len() - removed > self.config.resource_limits.min_partitions {
571                        self.partitions.remove(partition_num);
572                        self.assignments.remove(partition_num);
573                        removed += 1;
574                    }
575                }
576
577                self.rebalance_partitions()?;
578                *self.last_scaling.write() = Instant::now();
579
580                info!("Scaled down by {} partitions", removed);
581            }
582            ScalingDirection::NoChange => {}
583        }
584
585        Ok(())
586    }
587
588    /// Get partition for a key
589    pub fn get_partition_for_key(&self, key: &str) -> usize {
590        match &self.config.partition_strategy {
591            PartitionStrategy::RoundRobin => {
592                // Use counter for round-robin
593                let mut counter = self.counter.write();
594                let partition = *counter % self.partitions.len();
595                *counter = counter.wrapping_add(1);
596                partition
597            }
598            PartitionStrategy::Hash { .. } => {
599                // Simple hash-based partitioning
600                let hash = self.hash_key(key);
601                (hash as usize) % self.partitions.len()
602            }
603            PartitionStrategy::ConsistentHash { .. } => {
604                // Simplified consistent hashing
605                let hash = self.hash_key(key);
606                (hash as usize) % self.partitions.len()
607            }
608            _ => 0, // Default to first partition
609        }
610    }
611
612    /// Simple hash function
613    fn hash_key(&self, key: &str) -> u64 {
614        use std::collections::hash_map::DefaultHasher;
615        use std::hash::{Hash, Hasher};
616
617        let mut hasher = DefaultHasher::new();
618        key.hash(&mut hasher);
619        hasher.finish()
620    }
621
622    /// Update partition load
623    pub fn update_partition_load(&self, partition_num: usize, load: f64) {
624        if let Some(mut partition) = self.partitions.get_mut(&partition_num) {
625            partition.load = load;
626            partition.last_updated = Utc::now();
627        }
628    }
629
630    /// Get partition count
631    pub fn partition_count(&self) -> usize {
632        self.partitions.len()
633    }
634
635    /// Get node count
636    pub fn node_count(&self) -> usize {
637        self.nodes.len()
638    }
639}
640
641/// Auto-scaler that monitors metrics and applies scaling decisions
642pub struct AutoScaler {
643    /// Configuration
644    config: ScalingConfig,
645    /// Partition manager
646    partition_manager: Arc<PartitionManager>,
647    /// Monitoring interval
648    monitoring_interval: Duration,
649    /// Command channel
650    command_tx: mpsc::UnboundedSender<ScalingCommand>,
651    /// Background task
652    _background_task: Option<tokio::task::JoinHandle<()>>,
653}
654
655/// Scaling command
656enum ScalingCommand {
657    Evaluate,
658    Stop,
659}
660
661impl AutoScaler {
662    /// Create a new auto-scaler
663    pub fn new(config: ScalingConfig, partition_manager: Arc<PartitionManager>) -> Self {
664        let (command_tx, mut command_rx) = mpsc::unbounded_channel();
665
666        let monitoring_interval = Duration::from_secs(30);
667
668        let partition_manager_clone = partition_manager.clone();
669        let background_task = tokio::spawn(async move {
670            let mut interval = tokio::time::interval(monitoring_interval);
671
672            loop {
673                tokio::select! {
674                    _ = interval.tick() => {
675                        let decision = partition_manager_clone.evaluate_scaling();
676                        if !matches!(decision, ScalingDirection::NoChange) {
677                            info!("Auto-scaler decision: {:?}", decision);
678                            if let Err(e) = partition_manager_clone.apply_scaling(&decision) {
679                                error!("Failed to apply scaling: {}", e);
680                            }
681                        }
682                    }
683                    Some(cmd) = command_rx.recv() => {
684                        match cmd {
685                            ScalingCommand::Evaluate => {
686                                let decision = partition_manager_clone.evaluate_scaling();
687                                if !matches!(decision, ScalingDirection::NoChange) {
688                                    if let Err(e) = partition_manager_clone.apply_scaling(&decision) {
689                                        error!("Failed to apply scaling: {}", e);
690                                    }
691                                }
692                            }
693                            ScalingCommand::Stop => break,
694                        }
695                    }
696                }
697            }
698        });
699
700        Self {
701            config,
702            partition_manager,
703            monitoring_interval,
704            command_tx,
705            _background_task: Some(background_task),
706        }
707    }
708
709    /// Trigger immediate evaluation
710    pub fn evaluate_now(&self) -> Result<()> {
711        self.command_tx
712            .send(ScalingCommand::Evaluate)
713            .map_err(|e| anyhow!("Failed to send command: {}", e))
714    }
715}
716
717#[cfg(test)]
718mod tests {
719    use super::*;
720
721    #[test]
722    fn test_adaptive_buffer() {
723        let config = ScalingConfig::default();
724        let buffer: AdaptiveBuffer<u64> = AdaptiveBuffer::new(config);
725
726        // Push items
727        for i in 0..100 {
728            buffer.push(i).unwrap();
729        }
730
731        assert_eq!(buffer.len(), 100);
732
733        // Pop items
734        for _ in 0..50 {
735            assert!(buffer.pop().is_some());
736        }
737
738        assert_eq!(buffer.len(), 50);
739    }
740
741    #[test]
742    fn test_partition_manager() {
743        let config = ScalingConfig::default();
744        let manager = PartitionManager::new(config);
745
746        // Add a node
747        let node = Node {
748            node_id: "node-1".to_string(),
749            address: "localhost:8001".to_string(),
750            partitions: Vec::new(),
751            resource_usage: ResourceUsage {
752                cpu_usage: 0.5,
753                memory_usage: 1024 * 1024 * 1024,
754                network_usage: 1000000,
755                events_per_second: 1000.0,
756            },
757            health: NodeHealth::Healthy,
758            last_heartbeat: Utc::now(),
759        };
760
761        manager.add_node(node).unwrap();
762
763        assert_eq!(manager.node_count(), 1);
764        assert!(manager.partition_count() >= 1);
765    }
766
767    #[test]
768    fn test_partition_assignment() {
769        let config = ScalingConfig::default();
770        let manager = PartitionManager::new(config);
771
772        let partition = manager.get_partition_for_key("test-key");
773        assert!(partition < manager.partition_count());
774    }
775}