scirs2_core/distributed/
load_balancing.rs

1//! Load balancing algorithms for distributed systems
2//!
3//! This module provides various load balancing strategies to distribute
4//! computational workloads efficiently across cluster nodes.
5
6use crate::error::{CoreError, CoreResult, ErrorContext};
7use std::collections::{HashMap, VecDeque};
8use std::net::SocketAddr;
9use std::sync::{Arc, Mutex};
10use std::time::Instant;
11
12/// Load balancing strategy
13#[derive(Debug, Clone)]
14pub enum LoadBalancingStrategy {
15    /// Round-robin distribution
16    RoundRobin,
17    /// Least connections strategy
18    LeastConnections,
19    /// Weighted round-robin
20    WeightedRoundRobin,
21    /// Resource-aware balancing
22    ResourceAware,
23    /// Latency-based balancing
24    LatencyBased,
25}
26
27/// Node load information
28#[derive(Debug, Clone)]
29pub struct NodeLoad {
30    pub nodeid: String,
31    pub address: SocketAddr,
32    pub cpu_utilization: f64,
33    pub memory_utilization: f64,
34    pub active_connections: usize,
35    pub average_latency: f64,
36    pub weight: f64,
37    pub last_updated: Instant,
38}
39
40impl NodeLoad {
41    /// Create new node load info
42    pub fn new(nodeid: String, address: SocketAddr) -> Self {
43        Self {
44            nodeid,
45            address,
46            cpu_utilization: 0.0,
47            memory_utilization: 0.0,
48            active_connections: 0,
49            average_latency: 0.0,
50            weight: 1.0,
51            last_updated: Instant::now(),
52        }
53    }
54
55    /// Calculate overall load score (0.0 = no load, 1.0 = maximum load)
56    pub fn load_score(&self) -> f64 {
57        let cpu_score = self.cpu_utilization;
58        let memory_score = self.memory_utilization;
59        let connection_score = (self.active_connections as f64) / 1000.0; // Normalize to 1000 max connections
60        let latency_score = self.average_latency / 1000.0; // Normalize to 1000ms
61
62        (cpu_score + memory_score + connection_score + latency_score) / 4.0
63    }
64
65    /// Check if node is available for new tasks
66    pub fn is_available(&self) -> bool {
67        self.cpu_utilization < 0.9 && self.memory_utilization < 0.9
68    }
69
70    /// Update load metrics
71    pub fn update_metrics(&mut self, cpu: f64, memory: f64, connections: usize, latency: f64) {
72        self.cpu_utilization = cpu;
73        self.memory_utilization = memory;
74        self.active_connections = connections;
75        self.average_latency = latency;
76        self.last_updated = Instant::now();
77    }
78}
79
80/// Task assignment result
81#[derive(Debug, Clone)]
82pub struct TaskAssignment {
83    pub taskid: String,
84    pub nodeid: String,
85    pub node_address: SocketAddr,
86    pub assigned_at: Instant,
87}
88
89/// Load balancer implementation
90#[derive(Debug)]
91pub struct LoadBalancer {
92    strategy: LoadBalancingStrategy,
93    nodes: Arc<Mutex<HashMap<String, NodeLoad>>>,
94    round_robin_index: Arc<Mutex<usize>>,
95    assignment_history: Arc<Mutex<VecDeque<TaskAssignment>>>,
96}
97
98impl LoadBalancer {
99    /// Create a new load balancer
100    pub fn new(strategy: LoadBalancingStrategy) -> Self {
101        Self {
102            strategy,
103            nodes: Arc::new(Mutex::new(HashMap::new())),
104            round_robin_index: Arc::new(Mutex::new(0)),
105            assignment_history: Arc::new(Mutex::new(VecDeque::with_capacity(10000))),
106        }
107    }
108
109    /// Register a node for load balancing
110    pub fn register_node(&self, nodeload: NodeLoad) -> CoreResult<()> {
111        let mut nodes = self.nodes.lock().map_err(|_| {
112            CoreError::InvalidState(ErrorContext::new(
113                "Failed to acquire nodes lock".to_string(),
114            ))
115        })?;
116        nodes.insert(nodeload.nodeid.clone(), nodeload);
117        Ok(())
118    }
119
120    /// Update node load metrics
121    pub fn update_nodeload(
122        &self,
123        nodeid: &str,
124        cpu: f64,
125        memory: f64,
126        connections: usize,
127        latency: f64,
128    ) -> CoreResult<()> {
129        let mut nodes = self.nodes.lock().map_err(|_| {
130            CoreError::InvalidState(ErrorContext::new(
131                "Failed to acquire nodes lock".to_string(),
132            ))
133        })?;
134
135        if let Some(node) = nodes.get_mut(nodeid) {
136            node.update_metrics(cpu, memory, connections, latency);
137        } else {
138            return Err(CoreError::InvalidArgument(ErrorContext::new(format!(
139                "Unknown node: {nodeid}"
140            ))));
141        }
142        Ok(())
143    }
144
145    /// Assign a task to the best available node
146    pub fn assign_task(&self, taskid: String) -> CoreResult<TaskAssignment> {
147        let nodes = self.nodes.lock().map_err(|_| {
148            CoreError::InvalidState(ErrorContext::new(
149                "Failed to acquire nodes lock".to_string(),
150            ))
151        })?;
152
153        let availablenodes: Vec<_> = nodes
154            .values()
155            .filter(|node| node.is_available())
156            .cloned()
157            .collect();
158
159        if availablenodes.is_empty() {
160            return Err(CoreError::InvalidState(ErrorContext::new(
161                "No available nodes for task assignment".to_string(),
162            )));
163        }
164
165        let selected_node = match &self.strategy {
166            LoadBalancingStrategy::RoundRobin => self.select_round_robin(&availablenodes)?,
167            LoadBalancingStrategy::LeastConnections => {
168                self.select_least_connections(&availablenodes)
169            }
170            LoadBalancingStrategy::WeightedRoundRobin => {
171                self.select_weighted_round_robin(&availablenodes)?
172            }
173            LoadBalancingStrategy::ResourceAware => self.select_resource_aware(&availablenodes),
174            LoadBalancingStrategy::LatencyBased => self.select_latencybased(&availablenodes),
175        };
176
177        let assignment = TaskAssignment {
178            taskid,
179            nodeid: selected_node.nodeid.clone(),
180            node_address: selected_node.address,
181            assigned_at: Instant::now(),
182        };
183
184        // Record assignment history
185        let mut history = self.assignment_history.lock().map_err(|_| {
186            CoreError::InvalidState(ErrorContext::new(
187                "Failed to acquire history lock".to_string(),
188            ))
189        })?;
190        history.push_back(assignment.clone());
191        if history.len() > 10000 {
192            history.pop_front();
193        }
194
195        Ok(assignment)
196    }
197
198    fn select_round_robin(&self, nodes: &[NodeLoad]) -> CoreResult<NodeLoad> {
199        let mut index = self.round_robin_index.lock().map_err(|_| {
200            CoreError::InvalidState(ErrorContext::new(
201                "Failed to acquire index lock".to_string(),
202            ))
203        })?;
204
205        let selected = nodes[*index % nodes.len()].clone();
206        *index += 1;
207        Ok(selected)
208    }
209
210    fn select_least_connections(&self, nodes: &[NodeLoad]) -> NodeLoad {
211        nodes
212            .iter()
213            .min_by_key(|node| node.active_connections)
214            .unwrap()
215            .clone()
216    }
217
218    fn select_weighted_round_robin(&self, nodes: &[NodeLoad]) -> CoreResult<NodeLoad> {
219        // Simple weighted selection based on inverse load score
220        let weights: Vec<f64> = nodes.iter()
221            .map(|node| 1.0 / (node.load_score() + 0.1)) // Add small value to avoid division by zero
222            .collect();
223
224        let total_weight: f64 = weights.iter().sum();
225        let mut cumulative_weight = 0.0;
226        let target = total_weight * 0.5; // Select middle-weighted node for simplicity
227
228        for (i, weight) in weights.iter().enumerate() {
229            cumulative_weight += weight;
230            if cumulative_weight >= target {
231                return Ok(nodes[i].clone());
232            }
233        }
234
235        Ok(nodes[0].clone()) // Fallback
236    }
237
238    fn select_resource_aware(&self, nodes: &[NodeLoad]) -> NodeLoad {
239        nodes
240            .iter()
241            .min_by(|a, b| a.load_score().partial_cmp(&b.load_score()).unwrap())
242            .unwrap()
243            .clone()
244    }
245
246    fn select_latencybased(&self, nodes: &[NodeLoad]) -> NodeLoad {
247        nodes
248            .iter()
249            .min_by(|a, b| a.average_latency.partial_cmp(&b.average_latency).unwrap())
250            .unwrap()
251            .clone()
252    }
253
254    /// Get load balancing statistics
255    pub fn get_statistics(&self) -> CoreResult<LoadBalancingStats> {
256        let nodes = self.nodes.lock().map_err(|_| {
257            CoreError::InvalidState(ErrorContext::new(
258                "Failed to acquire nodes lock".to_string(),
259            ))
260        })?;
261
262        let history = self.assignment_history.lock().map_err(|_| {
263            CoreError::InvalidState(ErrorContext::new(
264                "Failed to acquire history lock".to_string(),
265            ))
266        })?;
267
268        let total_nodes = nodes.len();
269        let availablenodes = nodes.values().filter(|node| node.is_available()).count();
270        let total_assignments = history.len();
271
272        // Calculate assignment distribution
273        let mut assignment_counts: HashMap<String, usize> = HashMap::new();
274        for assignment in history.iter() {
275            *assignment_counts
276                .entry(assignment.nodeid.clone())
277                .or_insert(0) += 1;
278        }
279
280        let average_load = if !nodes.is_empty() {
281            nodes.values().map(|node| node.load_score()).sum::<f64>() / nodes.len() as f64
282        } else {
283            0.0
284        };
285
286        Ok(LoadBalancingStats {
287            total_nodes,
288            availablenodes,
289            total_assignments,
290            assignment_distribution: assignment_counts,
291            average_load,
292            strategy: self.strategy.clone(),
293        })
294    }
295
296    /// Get all node loads
297    pub fn get_all_nodes(&self) -> CoreResult<Vec<NodeLoad>> {
298        let nodes = self.nodes.lock().map_err(|_| {
299            CoreError::InvalidState(ErrorContext::new(
300                "Failed to acquire nodes lock".to_string(),
301            ))
302        })?;
303        Ok(nodes.values().cloned().collect())
304    }
305}
306
307/// Load balancing statistics
308#[derive(Debug)]
309pub struct LoadBalancingStats {
310    pub total_nodes: usize,
311    pub availablenodes: usize,
312    pub total_assignments: usize,
313    pub assignment_distribution: HashMap<String, usize>,
314    pub average_load: f64,
315    pub strategy: LoadBalancingStrategy,
316}
317
318impl LoadBalancingStats {
319    /// Calculate load distribution balance (0.0 = perfect balance, 1.0 = maximum imbalance)
320    pub fn balance_score(&self) -> f64 {
321        if self.assignment_distribution.is_empty() || self.total_assignments == 0 {
322            return 0.0;
323        }
324
325        let average_assignments =
326            self.total_assignments as f64 / self.assignment_distribution.len() as f64;
327        let variance: f64 = self
328            .assignment_distribution
329            .values()
330            .map(|&count| {
331                let diff = count as f64 - average_assignments;
332                diff * diff
333            })
334            .sum::<f64>()
335            / self.assignment_distribution.len() as f64;
336
337        (variance.sqrt() / average_assignments).min(1.0)
338    }
339
340    /// Check if load is well balanced
341    pub fn is_well_balanced(&self) -> bool {
342        self.balance_score() < 0.2 // Less than 20% imbalance
343    }
344}
345
346#[cfg(test)]
347mod tests {
348    use super::*;
349    use std::net::{IpAddr, Ipv4Addr};
350
351    #[test]
352    fn test_nodeload_creation() {
353        let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
354        let node = NodeLoad::new("node1".to_string(), address);
355
356        assert_eq!(node.nodeid, "node1");
357        assert_eq!(node.address, address);
358        assert!(node.is_available());
359        assert_eq!(node.load_score(), 0.0);
360    }
361
362    #[test]
363    fn test_nodeload_update() {
364        let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
365        let mut node = NodeLoad::new("node1".to_string(), address);
366
367        node.update_metrics(0.5, 0.6, 10, 50.0);
368        assert_eq!(node.cpu_utilization, 0.5);
369        assert_eq!(node.memory_utilization, 0.6);
370        assert_eq!(node.active_connections, 10);
371        assert_eq!(node.average_latency, 50.0);
372    }
373
374    #[test]
375    fn test_load_balancer_round_robin() {
376        let balancer = LoadBalancer::new(LoadBalancingStrategy::RoundRobin);
377
378        let address1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
379        let node1 = NodeLoad::new("node1".to_string(), address1);
380
381        let address2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8081);
382        let node2 = NodeLoad::new("node2".to_string(), address2);
383
384        assert!(balancer.register_node(node1).is_ok());
385        assert!(balancer.register_node(node2).is_ok());
386
387        let assignment1 = balancer.assign_task("task1".to_string()).unwrap();
388        let assignment2 = balancer.assign_task("task2".to_string()).unwrap();
389
390        // Should alternate between nodes
391        assert_ne!(assignment1.nodeid, assignment2.nodeid);
392    }
393
394    #[test]
395    fn test_load_balancing_stats() {
396        let mut stats = LoadBalancingStats {
397            total_nodes: 3,
398            availablenodes: 3,
399            total_assignments: 100,
400            assignment_distribution: HashMap::new(),
401            average_load: 0.5,
402            strategy: LoadBalancingStrategy::RoundRobin,
403        };
404
405        // Perfect balance
406        stats
407            .assignment_distribution
408            .insert("node1".to_string(), 33);
409        stats
410            .assignment_distribution
411            .insert("node2".to_string(), 33);
412        stats
413            .assignment_distribution
414            .insert("node3".to_string(), 34);
415
416        assert!(stats.is_well_balanced());
417        assert!(stats.balance_score() < 0.1);
418    }
419}