scirs2_core/distributed/
load_balancing.rs

1//! Load balancing algorithms for distributed systems
2//!
3//! This module provides various load balancing strategies to distribute
4//! computational workloads efficiently across cluster nodes.
5
6use crate::error::{CoreError, CoreResult, ErrorContext};
7use std::collections::{HashMap, VecDeque};
8use std::net::SocketAddr;
9use std::sync::{Arc, Mutex};
10use std::time::Instant;
11
12/// Load balancing strategy
13#[derive(Debug, Clone)]
14pub enum LoadBalancingStrategy {
15    /// Round-robin distribution
16    RoundRobin,
17    /// Least connections strategy
18    LeastConnections,
19    /// Weighted round-robin
20    WeightedRoundRobin,
21    /// Resource-aware balancing
22    ResourceAware,
23    /// Latency-based balancing
24    LatencyBased,
25}
26
27/// Node load information
28#[derive(Debug, Clone)]
29pub struct NodeLoad {
30    pub nodeid: String,
31    pub address: SocketAddr,
32    pub cpu_utilization: f64,
33    pub memory_utilization: f64,
34    pub active_connections: usize,
35    pub average_latency: f64,
36    pub weight: f64,
37    pub last_updated: Instant,
38}
39
40impl NodeLoad {
41    /// Create new node load info
42    pub fn new(nodeid: String, address: SocketAddr) -> Self {
43        Self {
44            nodeid,
45            address,
46            cpu_utilization: 0.0,
47            memory_utilization: 0.0,
48            active_connections: 0,
49            average_latency: 0.0,
50            weight: 1.0,
51            last_updated: Instant::now(),
52        }
53    }
54
55    /// Calculate overall load score (0.0 = no load, 1.0 = maximum load)
56    pub fn load_score(&self) -> f64 {
57        let cpu_score = self.cpu_utilization;
58        let memory_score = self.memory_utilization;
59        let connection_score = (self.active_connections as f64) / 1000.0; // Normalize to 1000 max connections
60        let latency_score = self.average_latency / 1000.0; // Normalize to 1000ms
61
62        (cpu_score + memory_score + connection_score + latency_score) / 4.0
63    }
64
65    /// Check if node is available for new tasks
66    pub fn is_available(&self) -> bool {
67        self.cpu_utilization < 0.9 && self.memory_utilization < 0.9
68    }
69
70    /// Update load metrics
71    pub fn update_metrics(&mut self, cpu: f64, memory: f64, connections: usize, latency: f64) {
72        self.cpu_utilization = cpu;
73        self.memory_utilization = memory;
74        self.active_connections = connections;
75        self.average_latency = latency;
76        self.last_updated = Instant::now();
77    }
78}
79
80/// Task assignment result
81#[derive(Debug, Clone)]
82pub struct TaskAssignment {
83    pub taskid: String,
84    pub nodeid: String,
85    pub node_address: SocketAddr,
86    pub assigned_at: Instant,
87}
88
89/// Load balancer implementation
90#[derive(Debug)]
91pub struct LoadBalancer {
92    strategy: LoadBalancingStrategy,
93    nodes: Arc<Mutex<HashMap<String, NodeLoad>>>,
94    round_robin_index: Arc<Mutex<usize>>,
95    assignment_history: Arc<Mutex<VecDeque<TaskAssignment>>>,
96}
97
98impl LoadBalancer {
99    /// Create a new load balancer
100    pub fn new(strategy: LoadBalancingStrategy) -> Self {
101        Self {
102            strategy,
103            nodes: Arc::new(Mutex::new(HashMap::new())),
104            round_robin_index: Arc::new(Mutex::new(0)),
105            assignment_history: Arc::new(Mutex::new(VecDeque::with_capacity(10000))),
106        }
107    }
108
109    /// Register a node for load balancing
110    pub fn register_node(&self, nodeload: NodeLoad) -> CoreResult<()> {
111        let mut nodes = self.nodes.lock().map_err(|_| {
112            CoreError::InvalidState(ErrorContext::new(
113                "Failed to acquire nodes lock".to_string(),
114            ))
115        })?;
116        nodes.insert(nodeload.nodeid.clone(), nodeload);
117        Ok(())
118    }
119
120    /// Update node load metrics
121    pub fn update_nodeload(
122        &self,
123        nodeid: &str,
124        cpu: f64,
125        memory: f64,
126        connections: usize,
127        latency: f64,
128    ) -> CoreResult<()> {
129        let mut nodes = self.nodes.lock().map_err(|_| {
130            CoreError::InvalidState(ErrorContext::new(
131                "Failed to acquire nodes lock".to_string(),
132            ))
133        })?;
134
135        if let Some(node) = nodes.get_mut(nodeid) {
136            node.update_metrics(cpu, memory, connections, latency);
137        } else {
138            return Err(CoreError::InvalidArgument(ErrorContext::new(format!(
139                "Unknown node: {nodeid}"
140            ))));
141        }
142        Ok(())
143    }
144
145    /// Assign a task to the best available node
146    pub fn assign_task(&self, taskid: String) -> CoreResult<TaskAssignment> {
147        let nodes = self.nodes.lock().map_err(|_| {
148            CoreError::InvalidState(ErrorContext::new(
149                "Failed to acquire nodes lock".to_string(),
150            ))
151        })?;
152
153        let availablenodes: Vec<_> = nodes
154            .values()
155            .filter(|node| node.is_available())
156            .cloned()
157            .collect();
158
159        if availablenodes.is_empty() {
160            return Err(CoreError::InvalidState(ErrorContext::new(
161                "No available nodes for task assignment".to_string(),
162            )));
163        }
164
165        let selected_node = match &self.strategy {
166            LoadBalancingStrategy::RoundRobin => self.select_round_robin(&availablenodes)?,
167            LoadBalancingStrategy::LeastConnections => {
168                self.select_least_connections(&availablenodes)
169            }
170            LoadBalancingStrategy::WeightedRoundRobin => {
171                self.select_weighted_round_robin(&availablenodes)?
172            }
173            LoadBalancingStrategy::ResourceAware => self.select_resource_aware(&availablenodes),
174            LoadBalancingStrategy::LatencyBased => self.select_latencybased(&availablenodes),
175        };
176
177        let assignment = TaskAssignment {
178            taskid,
179            nodeid: selected_node.nodeid.clone(),
180            node_address: selected_node.address,
181            assigned_at: Instant::now(),
182        };
183
184        // Record assignment history
185        let mut history = self.assignment_history.lock().map_err(|_| {
186            CoreError::InvalidState(ErrorContext::new(
187                "Failed to acquire history lock".to_string(),
188            ))
189        })?;
190        history.push_back(assignment.clone());
191        if history.len() > 10000 {
192            history.pop_front();
193        }
194
195        Ok(assignment)
196    }
197
198    fn select_round_robin(&self, nodes: &[NodeLoad]) -> CoreResult<NodeLoad> {
199        let mut index = self.round_robin_index.lock().map_err(|_| {
200            CoreError::InvalidState(ErrorContext::new(
201                "Failed to acquire index lock".to_string(),
202            ))
203        })?;
204
205        let selected = nodes[*index % nodes.len()].clone();
206        *index += 1;
207        Ok(selected)
208    }
209
210    fn select_least_connections(&self, nodes: &[NodeLoad]) -> NodeLoad {
211        nodes
212            .iter()
213            .min_by_key(|node| node.active_connections)
214            .expect("Operation failed")
215            .clone()
216    }
217
218    fn select_weighted_round_robin(&self, nodes: &[NodeLoad]) -> CoreResult<NodeLoad> {
219        // Simple weighted selection based on inverse load score
220        let weights: Vec<f64> = nodes.iter()
221            .map(|node| 1.0 / (node.load_score() + 0.1)) // Add small value to avoid division by zero
222            .collect();
223
224        let total_weight: f64 = weights.iter().sum();
225        let mut cumulative_weight = 0.0;
226        let target = total_weight * 0.5; // Select middle-weighted node for simplicity
227
228        for (i, weight) in weights.iter().enumerate() {
229            cumulative_weight += weight;
230            if cumulative_weight >= target {
231                return Ok(nodes[i].clone());
232            }
233        }
234
235        Ok(nodes[0].clone()) // Fallback
236    }
237
238    fn select_resource_aware(&self, nodes: &[NodeLoad]) -> NodeLoad {
239        nodes
240            .iter()
241            .min_by(|a, b| {
242                a.load_score()
243                    .partial_cmp(&b.load_score())
244                    .expect("Operation failed")
245            })
246            .expect("Operation failed")
247            .clone()
248    }
249
250    fn select_latencybased(&self, nodes: &[NodeLoad]) -> NodeLoad {
251        nodes
252            .iter()
253            .min_by(|a, b| {
254                a.average_latency
255                    .partial_cmp(&b.average_latency)
256                    .expect("Operation failed")
257            })
258            .expect("Operation failed")
259            .clone()
260    }
261
262    /// Get load balancing statistics
263    pub fn get_statistics(&self) -> CoreResult<LoadBalancingStats> {
264        let nodes = self.nodes.lock().map_err(|_| {
265            CoreError::InvalidState(ErrorContext::new(
266                "Failed to acquire nodes lock".to_string(),
267            ))
268        })?;
269
270        let history = self.assignment_history.lock().map_err(|_| {
271            CoreError::InvalidState(ErrorContext::new(
272                "Failed to acquire history lock".to_string(),
273            ))
274        })?;
275
276        let total_nodes = nodes.len();
277        let availablenodes = nodes.values().filter(|node| node.is_available()).count();
278        let total_assignments = history.len();
279
280        // Calculate assignment distribution
281        let mut assignment_counts: HashMap<String, usize> = HashMap::new();
282        for assignment in history.iter() {
283            *assignment_counts
284                .entry(assignment.nodeid.clone())
285                .or_insert(0) += 1;
286        }
287
288        let average_load = if !nodes.is_empty() {
289            nodes.values().map(|node| node.load_score()).sum::<f64>() / nodes.len() as f64
290        } else {
291            0.0
292        };
293
294        Ok(LoadBalancingStats {
295            total_nodes,
296            availablenodes,
297            total_assignments,
298            assignment_distribution: assignment_counts,
299            average_load,
300            strategy: self.strategy.clone(),
301        })
302    }
303
304    /// Get all node loads
305    pub fn get_all_nodes(&self) -> CoreResult<Vec<NodeLoad>> {
306        let nodes = self.nodes.lock().map_err(|_| {
307            CoreError::InvalidState(ErrorContext::new(
308                "Failed to acquire nodes lock".to_string(),
309            ))
310        })?;
311        Ok(nodes.values().cloned().collect())
312    }
313}
314
315/// Load balancing statistics
316#[derive(Debug)]
317pub struct LoadBalancingStats {
318    pub total_nodes: usize,
319    pub availablenodes: usize,
320    pub total_assignments: usize,
321    pub assignment_distribution: HashMap<String, usize>,
322    pub average_load: f64,
323    pub strategy: LoadBalancingStrategy,
324}
325
326impl LoadBalancingStats {
327    /// Calculate load distribution balance (0.0 = perfect balance, 1.0 = maximum imbalance)
328    pub fn balance_score(&self) -> f64 {
329        if self.assignment_distribution.is_empty() || self.total_assignments == 0 {
330            return 0.0;
331        }
332
333        let average_assignments =
334            self.total_assignments as f64 / self.assignment_distribution.len() as f64;
335        let variance: f64 = self
336            .assignment_distribution
337            .values()
338            .map(|&count| {
339                let diff = count as f64 - average_assignments;
340                diff * diff
341            })
342            .sum::<f64>()
343            / self.assignment_distribution.len() as f64;
344
345        (variance.sqrt() / average_assignments).min(1.0)
346    }
347
348    /// Check if load is well balanced
349    pub fn is_well_balanced(&self) -> bool {
350        self.balance_score() < 0.2 // Less than 20% imbalance
351    }
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357    use std::net::{IpAddr, Ipv4Addr};
358
359    #[test]
360    fn test_nodeload_creation() {
361        let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
362        let node = NodeLoad::new("node1".to_string(), address);
363
364        assert_eq!(node.nodeid, "node1");
365        assert_eq!(node.address, address);
366        assert!(node.is_available());
367        assert_eq!(node.load_score(), 0.0);
368    }
369
370    #[test]
371    fn test_nodeload_update() {
372        let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
373        let mut node = NodeLoad::new("node1".to_string(), address);
374
375        node.update_metrics(0.5, 0.6, 10, 50.0);
376        assert_eq!(node.cpu_utilization, 0.5);
377        assert_eq!(node.memory_utilization, 0.6);
378        assert_eq!(node.active_connections, 10);
379        assert_eq!(node.average_latency, 50.0);
380    }
381
382    #[test]
383    fn test_load_balancer_round_robin() {
384        let balancer = LoadBalancer::new(LoadBalancingStrategy::RoundRobin);
385
386        let address1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
387        let node1 = NodeLoad::new("node1".to_string(), address1);
388
389        let address2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8081);
390        let node2 = NodeLoad::new("node2".to_string(), address2);
391
392        assert!(balancer.register_node(node1).is_ok());
393        assert!(balancer.register_node(node2).is_ok());
394
395        let assignment1 = balancer
396            .assign_task("task1".to_string())
397            .expect("Operation failed");
398        let assignment2 = balancer
399            .assign_task("task2".to_string())
400            .expect("Operation failed");
401
402        // Should alternate between nodes
403        assert_ne!(assignment1.nodeid, assignment2.nodeid);
404    }
405
406    #[test]
407    fn test_load_balancing_stats() {
408        let mut stats = LoadBalancingStats {
409            total_nodes: 3,
410            availablenodes: 3,
411            total_assignments: 100,
412            assignment_distribution: HashMap::new(),
413            average_load: 0.5,
414            strategy: LoadBalancingStrategy::RoundRobin,
415        };
416
417        // Perfect balance
418        stats
419            .assignment_distribution
420            .insert("node1".to_string(), 33);
421        stats
422            .assignment_distribution
423            .insert("node2".to_string(), 33);
424        stats
425            .assignment_distribution
426            .insert("node3".to_string(), 34);
427
428        assert!(stats.is_well_balanced());
429        assert!(stats.balance_score() < 0.1);
430    }
431}