1use crate::error::{CoreError, CoreResult, ErrorContext};
7use std::collections::{HashMap, VecDeque};
8use std::net::SocketAddr;
9use std::sync::{Arc, Mutex};
10use std::time::Instant;
11
12#[derive(Debug, Clone)]
14pub enum LoadBalancingStrategy {
15 RoundRobin,
17 LeastConnections,
19 WeightedRoundRobin,
21 ResourceAware,
23 LatencyBased,
25}
26
27#[derive(Debug, Clone)]
29pub struct NodeLoad {
30 pub nodeid: String,
31 pub address: SocketAddr,
32 pub cpu_utilization: f64,
33 pub memory_utilization: f64,
34 pub active_connections: usize,
35 pub average_latency: f64,
36 pub weight: f64,
37 pub last_updated: Instant,
38}
39
40impl NodeLoad {
41 pub fn new(nodeid: String, address: SocketAddr) -> Self {
43 Self {
44 nodeid,
45 address,
46 cpu_utilization: 0.0,
47 memory_utilization: 0.0,
48 active_connections: 0,
49 average_latency: 0.0,
50 weight: 1.0,
51 last_updated: Instant::now(),
52 }
53 }
54
55 pub fn load_score(&self) -> f64 {
57 let cpu_score = self.cpu_utilization;
58 let memory_score = self.memory_utilization;
59 let connection_score = (self.active_connections as f64) / 1000.0; let latency_score = self.average_latency / 1000.0; (cpu_score + memory_score + connection_score + latency_score) / 4.0
63 }
64
65 pub fn is_available(&self) -> bool {
67 self.cpu_utilization < 0.9 && self.memory_utilization < 0.9
68 }
69
70 pub fn update_metrics(&mut self, cpu: f64, memory: f64, connections: usize, latency: f64) {
72 self.cpu_utilization = cpu;
73 self.memory_utilization = memory;
74 self.active_connections = connections;
75 self.average_latency = latency;
76 self.last_updated = Instant::now();
77 }
78}
79
80#[derive(Debug, Clone)]
82pub struct TaskAssignment {
83 pub taskid: String,
84 pub nodeid: String,
85 pub node_address: SocketAddr,
86 pub assigned_at: Instant,
87}
88
89#[derive(Debug)]
91pub struct LoadBalancer {
92 strategy: LoadBalancingStrategy,
93 nodes: Arc<Mutex<HashMap<String, NodeLoad>>>,
94 round_robin_index: Arc<Mutex<usize>>,
95 assignment_history: Arc<Mutex<VecDeque<TaskAssignment>>>,
96}
97
98impl LoadBalancer {
99 pub fn new(strategy: LoadBalancingStrategy) -> Self {
101 Self {
102 strategy,
103 nodes: Arc::new(Mutex::new(HashMap::new())),
104 round_robin_index: Arc::new(Mutex::new(0)),
105 assignment_history: Arc::new(Mutex::new(VecDeque::with_capacity(10000))),
106 }
107 }
108
109 pub fn register_node(&self, nodeload: NodeLoad) -> CoreResult<()> {
111 let mut nodes = self.nodes.lock().map_err(|_| {
112 CoreError::InvalidState(ErrorContext::new(
113 "Failed to acquire nodes lock".to_string(),
114 ))
115 })?;
116 nodes.insert(nodeload.nodeid.clone(), nodeload);
117 Ok(())
118 }
119
120 pub fn update_nodeload(
122 &self,
123 nodeid: &str,
124 cpu: f64,
125 memory: f64,
126 connections: usize,
127 latency: f64,
128 ) -> CoreResult<()> {
129 let mut nodes = self.nodes.lock().map_err(|_| {
130 CoreError::InvalidState(ErrorContext::new(
131 "Failed to acquire nodes lock".to_string(),
132 ))
133 })?;
134
135 if let Some(node) = nodes.get_mut(nodeid) {
136 node.update_metrics(cpu, memory, connections, latency);
137 } else {
138 return Err(CoreError::InvalidArgument(ErrorContext::new(format!(
139 "Unknown node: {nodeid}"
140 ))));
141 }
142 Ok(())
143 }
144
145 pub fn assign_task(&self, taskid: String) -> CoreResult<TaskAssignment> {
147 let nodes = self.nodes.lock().map_err(|_| {
148 CoreError::InvalidState(ErrorContext::new(
149 "Failed to acquire nodes lock".to_string(),
150 ))
151 })?;
152
153 let availablenodes: Vec<_> = nodes
154 .values()
155 .filter(|node| node.is_available())
156 .cloned()
157 .collect();
158
159 if availablenodes.is_empty() {
160 return Err(CoreError::InvalidState(ErrorContext::new(
161 "No available nodes for task assignment".to_string(),
162 )));
163 }
164
165 let selected_node = match &self.strategy {
166 LoadBalancingStrategy::RoundRobin => self.select_round_robin(&availablenodes)?,
167 LoadBalancingStrategy::LeastConnections => {
168 self.select_least_connections(&availablenodes)
169 }
170 LoadBalancingStrategy::WeightedRoundRobin => {
171 self.select_weighted_round_robin(&availablenodes)?
172 }
173 LoadBalancingStrategy::ResourceAware => self.select_resource_aware(&availablenodes),
174 LoadBalancingStrategy::LatencyBased => self.select_latencybased(&availablenodes),
175 };
176
177 let assignment = TaskAssignment {
178 taskid,
179 nodeid: selected_node.nodeid.clone(),
180 node_address: selected_node.address,
181 assigned_at: Instant::now(),
182 };
183
184 let mut history = self.assignment_history.lock().map_err(|_| {
186 CoreError::InvalidState(ErrorContext::new(
187 "Failed to acquire history lock".to_string(),
188 ))
189 })?;
190 history.push_back(assignment.clone());
191 if history.len() > 10000 {
192 history.pop_front();
193 }
194
195 Ok(assignment)
196 }
197
198 fn select_round_robin(&self, nodes: &[NodeLoad]) -> CoreResult<NodeLoad> {
199 let mut index = self.round_robin_index.lock().map_err(|_| {
200 CoreError::InvalidState(ErrorContext::new(
201 "Failed to acquire index lock".to_string(),
202 ))
203 })?;
204
205 let selected = nodes[*index % nodes.len()].clone();
206 *index += 1;
207 Ok(selected)
208 }
209
210 fn select_least_connections(&self, nodes: &[NodeLoad]) -> NodeLoad {
211 nodes
212 .iter()
213 .min_by_key(|node| node.active_connections)
214 .expect("Operation failed")
215 .clone()
216 }
217
218 fn select_weighted_round_robin(&self, nodes: &[NodeLoad]) -> CoreResult<NodeLoad> {
219 let weights: Vec<f64> = nodes.iter()
221 .map(|node| 1.0 / (node.load_score() + 0.1)) .collect();
223
224 let total_weight: f64 = weights.iter().sum();
225 let mut cumulative_weight = 0.0;
226 let target = total_weight * 0.5; for (i, weight) in weights.iter().enumerate() {
229 cumulative_weight += weight;
230 if cumulative_weight >= target {
231 return Ok(nodes[i].clone());
232 }
233 }
234
235 Ok(nodes[0].clone()) }
237
238 fn select_resource_aware(&self, nodes: &[NodeLoad]) -> NodeLoad {
239 nodes
240 .iter()
241 .min_by(|a, b| {
242 a.load_score()
243 .partial_cmp(&b.load_score())
244 .expect("Operation failed")
245 })
246 .expect("Operation failed")
247 .clone()
248 }
249
250 fn select_latencybased(&self, nodes: &[NodeLoad]) -> NodeLoad {
251 nodes
252 .iter()
253 .min_by(|a, b| {
254 a.average_latency
255 .partial_cmp(&b.average_latency)
256 .expect("Operation failed")
257 })
258 .expect("Operation failed")
259 .clone()
260 }
261
262 pub fn get_statistics(&self) -> CoreResult<LoadBalancingStats> {
264 let nodes = self.nodes.lock().map_err(|_| {
265 CoreError::InvalidState(ErrorContext::new(
266 "Failed to acquire nodes lock".to_string(),
267 ))
268 })?;
269
270 let history = self.assignment_history.lock().map_err(|_| {
271 CoreError::InvalidState(ErrorContext::new(
272 "Failed to acquire history lock".to_string(),
273 ))
274 })?;
275
276 let total_nodes = nodes.len();
277 let availablenodes = nodes.values().filter(|node| node.is_available()).count();
278 let total_assignments = history.len();
279
280 let mut assignment_counts: HashMap<String, usize> = HashMap::new();
282 for assignment in history.iter() {
283 *assignment_counts
284 .entry(assignment.nodeid.clone())
285 .or_insert(0) += 1;
286 }
287
288 let average_load = if !nodes.is_empty() {
289 nodes.values().map(|node| node.load_score()).sum::<f64>() / nodes.len() as f64
290 } else {
291 0.0
292 };
293
294 Ok(LoadBalancingStats {
295 total_nodes,
296 availablenodes,
297 total_assignments,
298 assignment_distribution: assignment_counts,
299 average_load,
300 strategy: self.strategy.clone(),
301 })
302 }
303
304 pub fn get_all_nodes(&self) -> CoreResult<Vec<NodeLoad>> {
306 let nodes = self.nodes.lock().map_err(|_| {
307 CoreError::InvalidState(ErrorContext::new(
308 "Failed to acquire nodes lock".to_string(),
309 ))
310 })?;
311 Ok(nodes.values().cloned().collect())
312 }
313}
314
315#[derive(Debug)]
317pub struct LoadBalancingStats {
318 pub total_nodes: usize,
319 pub availablenodes: usize,
320 pub total_assignments: usize,
321 pub assignment_distribution: HashMap<String, usize>,
322 pub average_load: f64,
323 pub strategy: LoadBalancingStrategy,
324}
325
326impl LoadBalancingStats {
327 pub fn balance_score(&self) -> f64 {
329 if self.assignment_distribution.is_empty() || self.total_assignments == 0 {
330 return 0.0;
331 }
332
333 let average_assignments =
334 self.total_assignments as f64 / self.assignment_distribution.len() as f64;
335 let variance: f64 = self
336 .assignment_distribution
337 .values()
338 .map(|&count| {
339 let diff = count as f64 - average_assignments;
340 diff * diff
341 })
342 .sum::<f64>()
343 / self.assignment_distribution.len() as f64;
344
345 (variance.sqrt() / average_assignments).min(1.0)
346 }
347
348 pub fn is_well_balanced(&self) -> bool {
350 self.balance_score() < 0.2 }
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357 use std::net::{IpAddr, Ipv4Addr};
358
359 #[test]
360 fn test_nodeload_creation() {
361 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
362 let node = NodeLoad::new("node1".to_string(), address);
363
364 assert_eq!(node.nodeid, "node1");
365 assert_eq!(node.address, address);
366 assert!(node.is_available());
367 assert_eq!(node.load_score(), 0.0);
368 }
369
370 #[test]
371 fn test_nodeload_update() {
372 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
373 let mut node = NodeLoad::new("node1".to_string(), address);
374
375 node.update_metrics(0.5, 0.6, 10, 50.0);
376 assert_eq!(node.cpu_utilization, 0.5);
377 assert_eq!(node.memory_utilization, 0.6);
378 assert_eq!(node.active_connections, 10);
379 assert_eq!(node.average_latency, 50.0);
380 }
381
382 #[test]
383 fn test_load_balancer_round_robin() {
384 let balancer = LoadBalancer::new(LoadBalancingStrategy::RoundRobin);
385
386 let address1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
387 let node1 = NodeLoad::new("node1".to_string(), address1);
388
389 let address2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8081);
390 let node2 = NodeLoad::new("node2".to_string(), address2);
391
392 assert!(balancer.register_node(node1).is_ok());
393 assert!(balancer.register_node(node2).is_ok());
394
395 let assignment1 = balancer
396 .assign_task("task1".to_string())
397 .expect("Operation failed");
398 let assignment2 = balancer
399 .assign_task("task2".to_string())
400 .expect("Operation failed");
401
402 assert_ne!(assignment1.nodeid, assignment2.nodeid);
404 }
405
406 #[test]
407 fn test_load_balancing_stats() {
408 let mut stats = LoadBalancingStats {
409 total_nodes: 3,
410 availablenodes: 3,
411 total_assignments: 100,
412 assignment_distribution: HashMap::new(),
413 average_load: 0.5,
414 strategy: LoadBalancingStrategy::RoundRobin,
415 };
416
417 stats
419 .assignment_distribution
420 .insert("node1".to_string(), 33);
421 stats
422 .assignment_distribution
423 .insert("node2".to_string(), 33);
424 stats
425 .assignment_distribution
426 .insert("node3".to_string(), 34);
427
428 assert!(stats.is_well_balanced());
429 assert!(stats.balance_score() < 0.1);
430 }
431}