1use crate::error::{CoreError, CoreResult, ErrorContext};
7use std::collections::{HashMap, VecDeque};
8use std::net::SocketAddr;
9use std::sync::{Arc, Mutex};
10use std::time::Instant;
11
12#[derive(Debug, Clone)]
14pub enum LoadBalancingStrategy {
15 RoundRobin,
17 LeastConnections,
19 WeightedRoundRobin,
21 ResourceAware,
23 LatencyBased,
25}
26
27#[derive(Debug, Clone)]
29pub struct NodeLoad {
30 pub nodeid: String,
31 pub address: SocketAddr,
32 pub cpu_utilization: f64,
33 pub memory_utilization: f64,
34 pub active_connections: usize,
35 pub average_latency: f64,
36 pub weight: f64,
37 pub last_updated: Instant,
38}
39
40impl NodeLoad {
41 pub fn new(nodeid: String, address: SocketAddr) -> Self {
43 Self {
44 nodeid,
45 address,
46 cpu_utilization: 0.0,
47 memory_utilization: 0.0,
48 active_connections: 0,
49 average_latency: 0.0,
50 weight: 1.0,
51 last_updated: Instant::now(),
52 }
53 }
54
55 pub fn load_score(&self) -> f64 {
57 let cpu_score = self.cpu_utilization;
58 let memory_score = self.memory_utilization;
59 let connection_score = (self.active_connections as f64) / 1000.0; let latency_score = self.average_latency / 1000.0; (cpu_score + memory_score + connection_score + latency_score) / 4.0
63 }
64
65 pub fn is_available(&self) -> bool {
67 self.cpu_utilization < 0.9 && self.memory_utilization < 0.9
68 }
69
70 pub fn update_metrics(&mut self, cpu: f64, memory: f64, connections: usize, latency: f64) {
72 self.cpu_utilization = cpu;
73 self.memory_utilization = memory;
74 self.active_connections = connections;
75 self.average_latency = latency;
76 self.last_updated = Instant::now();
77 }
78}
79
80#[derive(Debug, Clone)]
82pub struct TaskAssignment {
83 pub taskid: String,
84 pub nodeid: String,
85 pub node_address: SocketAddr,
86 pub assigned_at: Instant,
87}
88
89#[derive(Debug)]
91pub struct LoadBalancer {
92 strategy: LoadBalancingStrategy,
93 nodes: Arc<Mutex<HashMap<String, NodeLoad>>>,
94 round_robin_index: Arc<Mutex<usize>>,
95 assignment_history: Arc<Mutex<VecDeque<TaskAssignment>>>,
96}
97
98impl LoadBalancer {
99 pub fn new(strategy: LoadBalancingStrategy) -> Self {
101 Self {
102 strategy,
103 nodes: Arc::new(Mutex::new(HashMap::new())),
104 round_robin_index: Arc::new(Mutex::new(0)),
105 assignment_history: Arc::new(Mutex::new(VecDeque::with_capacity(10000))),
106 }
107 }
108
109 pub fn register_node(&self, nodeload: NodeLoad) -> CoreResult<()> {
111 let mut nodes = self.nodes.lock().map_err(|_| {
112 CoreError::InvalidState(ErrorContext::new(
113 "Failed to acquire nodes lock".to_string(),
114 ))
115 })?;
116 nodes.insert(nodeload.nodeid.clone(), nodeload);
117 Ok(())
118 }
119
120 pub fn update_nodeload(
122 &self,
123 nodeid: &str,
124 cpu: f64,
125 memory: f64,
126 connections: usize,
127 latency: f64,
128 ) -> CoreResult<()> {
129 let mut nodes = self.nodes.lock().map_err(|_| {
130 CoreError::InvalidState(ErrorContext::new(
131 "Failed to acquire nodes lock".to_string(),
132 ))
133 })?;
134
135 if let Some(node) = nodes.get_mut(nodeid) {
136 node.update_metrics(cpu, memory, connections, latency);
137 } else {
138 return Err(CoreError::InvalidArgument(ErrorContext::new(format!(
139 "Unknown node: {nodeid}"
140 ))));
141 }
142 Ok(())
143 }
144
145 pub fn assign_task(&self, taskid: String) -> CoreResult<TaskAssignment> {
147 let nodes = self.nodes.lock().map_err(|_| {
148 CoreError::InvalidState(ErrorContext::new(
149 "Failed to acquire nodes lock".to_string(),
150 ))
151 })?;
152
153 let availablenodes: Vec<_> = nodes
154 .values()
155 .filter(|node| node.is_available())
156 .cloned()
157 .collect();
158
159 if availablenodes.is_empty() {
160 return Err(CoreError::InvalidState(ErrorContext::new(
161 "No available nodes for task assignment".to_string(),
162 )));
163 }
164
165 let selected_node = match &self.strategy {
166 LoadBalancingStrategy::RoundRobin => self.select_round_robin(&availablenodes)?,
167 LoadBalancingStrategy::LeastConnections => {
168 self.select_least_connections(&availablenodes)
169 }
170 LoadBalancingStrategy::WeightedRoundRobin => {
171 self.select_weighted_round_robin(&availablenodes)?
172 }
173 LoadBalancingStrategy::ResourceAware => self.select_resource_aware(&availablenodes),
174 LoadBalancingStrategy::LatencyBased => self.select_latencybased(&availablenodes),
175 };
176
177 let assignment = TaskAssignment {
178 taskid,
179 nodeid: selected_node.nodeid.clone(),
180 node_address: selected_node.address,
181 assigned_at: Instant::now(),
182 };
183
184 let mut history = self.assignment_history.lock().map_err(|_| {
186 CoreError::InvalidState(ErrorContext::new(
187 "Failed to acquire history lock".to_string(),
188 ))
189 })?;
190 history.push_back(assignment.clone());
191 if history.len() > 10000 {
192 history.pop_front();
193 }
194
195 Ok(assignment)
196 }
197
198 fn select_round_robin(&self, nodes: &[NodeLoad]) -> CoreResult<NodeLoad> {
199 let mut index = self.round_robin_index.lock().map_err(|_| {
200 CoreError::InvalidState(ErrorContext::new(
201 "Failed to acquire index lock".to_string(),
202 ))
203 })?;
204
205 let selected = nodes[*index % nodes.len()].clone();
206 *index += 1;
207 Ok(selected)
208 }
209
210 fn select_least_connections(&self, nodes: &[NodeLoad]) -> NodeLoad {
211 nodes
212 .iter()
213 .min_by_key(|node| node.active_connections)
214 .unwrap()
215 .clone()
216 }
217
218 fn select_weighted_round_robin(&self, nodes: &[NodeLoad]) -> CoreResult<NodeLoad> {
219 let weights: Vec<f64> = nodes.iter()
221 .map(|node| 1.0 / (node.load_score() + 0.1)) .collect();
223
224 let total_weight: f64 = weights.iter().sum();
225 let mut cumulative_weight = 0.0;
226 let target = total_weight * 0.5; for (i, weight) in weights.iter().enumerate() {
229 cumulative_weight += weight;
230 if cumulative_weight >= target {
231 return Ok(nodes[i].clone());
232 }
233 }
234
235 Ok(nodes[0].clone()) }
237
238 fn select_resource_aware(&self, nodes: &[NodeLoad]) -> NodeLoad {
239 nodes
240 .iter()
241 .min_by(|a, b| a.load_score().partial_cmp(&b.load_score()).unwrap())
242 .unwrap()
243 .clone()
244 }
245
246 fn select_latencybased(&self, nodes: &[NodeLoad]) -> NodeLoad {
247 nodes
248 .iter()
249 .min_by(|a, b| a.average_latency.partial_cmp(&b.average_latency).unwrap())
250 .unwrap()
251 .clone()
252 }
253
254 pub fn get_statistics(&self) -> CoreResult<LoadBalancingStats> {
256 let nodes = self.nodes.lock().map_err(|_| {
257 CoreError::InvalidState(ErrorContext::new(
258 "Failed to acquire nodes lock".to_string(),
259 ))
260 })?;
261
262 let history = self.assignment_history.lock().map_err(|_| {
263 CoreError::InvalidState(ErrorContext::new(
264 "Failed to acquire history lock".to_string(),
265 ))
266 })?;
267
268 let total_nodes = nodes.len();
269 let availablenodes = nodes.values().filter(|node| node.is_available()).count();
270 let total_assignments = history.len();
271
272 let mut assignment_counts: HashMap<String, usize> = HashMap::new();
274 for assignment in history.iter() {
275 *assignment_counts
276 .entry(assignment.nodeid.clone())
277 .or_insert(0) += 1;
278 }
279
280 let average_load = if !nodes.is_empty() {
281 nodes.values().map(|node| node.load_score()).sum::<f64>() / nodes.len() as f64
282 } else {
283 0.0
284 };
285
286 Ok(LoadBalancingStats {
287 total_nodes,
288 availablenodes,
289 total_assignments,
290 assignment_distribution: assignment_counts,
291 average_load,
292 strategy: self.strategy.clone(),
293 })
294 }
295
296 pub fn get_all_nodes(&self) -> CoreResult<Vec<NodeLoad>> {
298 let nodes = self.nodes.lock().map_err(|_| {
299 CoreError::InvalidState(ErrorContext::new(
300 "Failed to acquire nodes lock".to_string(),
301 ))
302 })?;
303 Ok(nodes.values().cloned().collect())
304 }
305}
306
307#[derive(Debug)]
309pub struct LoadBalancingStats {
310 pub total_nodes: usize,
311 pub availablenodes: usize,
312 pub total_assignments: usize,
313 pub assignment_distribution: HashMap<String, usize>,
314 pub average_load: f64,
315 pub strategy: LoadBalancingStrategy,
316}
317
318impl LoadBalancingStats {
319 pub fn balance_score(&self) -> f64 {
321 if self.assignment_distribution.is_empty() || self.total_assignments == 0 {
322 return 0.0;
323 }
324
325 let average_assignments =
326 self.total_assignments as f64 / self.assignment_distribution.len() as f64;
327 let variance: f64 = self
328 .assignment_distribution
329 .values()
330 .map(|&count| {
331 let diff = count as f64 - average_assignments;
332 diff * diff
333 })
334 .sum::<f64>()
335 / self.assignment_distribution.len() as f64;
336
337 (variance.sqrt() / average_assignments).min(1.0)
338 }
339
340 pub fn is_well_balanced(&self) -> bool {
342 self.balance_score() < 0.2 }
344}
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349 use std::net::{IpAddr, Ipv4Addr};
350
351 #[test]
352 fn test_nodeload_creation() {
353 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
354 let node = NodeLoad::new("node1".to_string(), address);
355
356 assert_eq!(node.nodeid, "node1");
357 assert_eq!(node.address, address);
358 assert!(node.is_available());
359 assert_eq!(node.load_score(), 0.0);
360 }
361
362 #[test]
363 fn test_nodeload_update() {
364 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
365 let mut node = NodeLoad::new("node1".to_string(), address);
366
367 node.update_metrics(0.5, 0.6, 10, 50.0);
368 assert_eq!(node.cpu_utilization, 0.5);
369 assert_eq!(node.memory_utilization, 0.6);
370 assert_eq!(node.active_connections, 10);
371 assert_eq!(node.average_latency, 50.0);
372 }
373
374 #[test]
375 fn test_load_balancer_round_robin() {
376 let balancer = LoadBalancer::new(LoadBalancingStrategy::RoundRobin);
377
378 let address1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
379 let node1 = NodeLoad::new("node1".to_string(), address1);
380
381 let address2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8081);
382 let node2 = NodeLoad::new("node2".to_string(), address2);
383
384 assert!(balancer.register_node(node1).is_ok());
385 assert!(balancer.register_node(node2).is_ok());
386
387 let assignment1 = balancer.assign_task("task1".to_string()).unwrap();
388 let assignment2 = balancer.assign_task("task2".to_string()).unwrap();
389
390 assert_ne!(assignment1.nodeid, assignment2.nodeid);
392 }
393
394 #[test]
395 fn test_load_balancing_stats() {
396 let mut stats = LoadBalancingStats {
397 total_nodes: 3,
398 availablenodes: 3,
399 total_assignments: 100,
400 assignment_distribution: HashMap::new(),
401 average_load: 0.5,
402 strategy: LoadBalancingStrategy::RoundRobin,
403 };
404
405 stats
407 .assignment_distribution
408 .insert("node1".to_string(), 33);
409 stats
410 .assignment_distribution
411 .insert("node2".to_string(), 33);
412 stats
413 .assignment_distribution
414 .insert("node3".to_string(), 34);
415
416 assert!(stats.is_well_balanced());
417 assert!(stats.balance_score() < 0.1);
418 }
419}