1use parking_lot::RwLock;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::sync::Arc;
7use sysinfo::System;
8
9use crate::types::*;
10
11pub struct LoadBalancer {
13 strategy: LoadBalancingStrategy,
14 resource_pool: Arc<RwLock<ResourcePool>>,
15 allocation_history: Arc<RwLock<AllocationHistory>>,
16 system_info: Arc<RwLock<System>>,
17 created_at: std::time::Instant,
18}
19
20impl LoadBalancer {
21 pub fn new(strategy: LoadBalancingStrategy) -> Self {
23 let mut sys = System::new_all();
24 sys.refresh_all();
25
26 let total_memory = (sys.total_memory() / (1024 * 1024)) as usize; let total_cpu_cores = sys.cpus().len();
28
29 Self {
30 strategy,
31 resource_pool: Arc::new(RwLock::new(ResourcePool::new(
32 total_memory,
33 total_cpu_cores,
34 ))),
35 allocation_history: Arc::new(RwLock::new(AllocationHistory::new())),
36 system_info: Arc::new(RwLock::new(sys)),
37 created_at: std::time::Instant::now(),
38 }
39 }
40
41 pub async fn allocate_resources(
43 &self,
44 requirements: &ResourceRequirements,
45 ) -> Result<ResourceAllocation, ResourceError> {
46 let agent_id = AgentId::new(); let start_time = std::time::Instant::now();
48
49 let limits = ResourceLimits {
51 memory_mb: requirements.max_memory_mb,
52 cpu_cores: requirements.max_cpu_cores,
53 disk_io_mbps: 100, network_io_mbps: 100, execution_timeout: std::time::Duration::from_secs(3600),
56 idle_timeout: std::time::Duration::from_secs(300),
57 };
58
59 let mut pool = self.resource_pool.write();
60
61 let result = match self.strategy {
62 LoadBalancingStrategy::RoundRobin => {
63 self.allocate_round_robin(&mut pool, agent_id, &limits)
64 }
65 LoadBalancingStrategy::LeastConnections => {
66 self.allocate_least_connections(&mut pool, agent_id, &limits)
67 }
68 LoadBalancingStrategy::ResourceBased => {
69 self.allocate_resource_based(&mut pool, agent_id, &limits)
70 }
71 LoadBalancingStrategy::WeightedRoundRobin => {
72 self.allocate_weighted_round_robin(&mut pool, agent_id, &limits)
73 }
74 };
75
76 let mut history = self.allocation_history.write();
78 match &result {
79 Ok(_) => {
80 let duration = start_time.elapsed();
81 history.record_allocation(agent_id, duration);
82 }
83 Err(_) => {
84 history.record_failure();
85 }
86 }
87
88 result
89 }
90
91 pub async fn deallocate_resources(&self, allocation: ResourceAllocation) {
93 let mut pool = self.resource_pool.write();
94 pool.deallocate(allocation.agent_id);
95
96 let mut history = self.allocation_history.write();
97 history.record_deallocation(allocation.agent_id);
98 }
99
100 pub async fn get_resource_utilization(&self) -> ResourceUsage {
102 let pool = self.resource_pool.read();
103 let mut sys = self.system_info.write();
104 sys.refresh_all();
105
106 ResourceUsage {
107 memory_used: pool.total_memory - pool.available_memory,
108 cpu_utilization: sys.global_cpu_info().cpu_usage(),
109 disk_io_rate: 0, network_io_rate: 0, uptime: self.created_at.elapsed(),
112 }
113 }
114
115 fn allocate_round_robin(
117 &self,
118 pool: &mut ResourcePool,
119 agent_id: AgentId,
120 limits: &ResourceLimits,
121 ) -> Result<ResourceAllocation, ResourceError> {
122 pool.allocate(agent_id, limits)
123 .ok_or_else(|| ResourceError::AllocationFailed {
124 agent_id,
125 reason: "Insufficient resources for round-robin allocation".into(),
126 })
127 }
128
129 fn allocate_least_connections(
134 &self,
135 pool: &mut ResourcePool,
136 agent_id: AgentId,
137 limits: &ResourceLimits,
138 ) -> Result<ResourceAllocation, ResourceError> {
139 let active_count = pool.allocated_agents.len();
140 let max_agents_estimate = if limits.memory_mb > 0 {
142 pool.total_memory / limits.memory_mb
143 } else {
144 pool.total_memory };
146 let threshold = (max_agents_estimate as f64 * 0.8) as usize;
147
148 if active_count >= threshold.max(1) {
149 return Err(ResourceError::AllocationFailed {
150 agent_id,
151 reason: format!(
152 "Active allocation count ({}) exceeds 80% capacity threshold ({})",
153 active_count, threshold
154 )
155 .into(),
156 });
157 }
158
159 pool.allocate(agent_id, limits)
160 .ok_or_else(|| ResourceError::AllocationFailed {
161 agent_id,
162 reason: "Insufficient resources for least-connections allocation".into(),
163 })
164 }
165
166 fn allocate_resource_based(
168 &self,
169 pool: &mut ResourcePool,
170 agent_id: AgentId,
171 limits: &ResourceLimits,
172 ) -> Result<ResourceAllocation, ResourceError> {
173 if !pool.can_allocate(limits) {
175 return Err(ResourceError::AllocationFailed {
176 agent_id,
177 reason: format!(
178 "Insufficient resources: need {}MB memory, {}CPU cores, available: {}MB memory, {:.2}CPU cores",
179 limits.memory_mb,
180 limits.cpu_cores,
181 pool.available_memory,
182 pool.available_cpu_cores
183 )
184 .into(),
185 });
186 }
187
188 pool.allocate(agent_id, limits)
189 .ok_or_else(|| ResourceError::AllocationFailed {
190 agent_id,
191 reason: "Resource allocation failed unexpectedly".into(),
192 })
193 }
194
195 fn allocate_weighted_round_robin(
200 &self,
201 pool: &mut ResourcePool,
202 agent_id: AgentId,
203 limits: &ResourceLimits,
204 ) -> Result<ResourceAllocation, ResourceError> {
205 let available_fraction = if pool.total_memory > 0 {
206 pool.available_memory as f64 / pool.total_memory as f64
207 } else {
208 0.0
209 };
210
211 if available_fraction < 0.1 {
213 return Err(ResourceError::AllocationFailed {
214 agent_id,
215 reason: format!(
216 "Weighted round-robin rejected: only {:.1}% resources available (minimum 10% required)",
217 available_fraction * 100.0
218 )
219 .into(),
220 });
221 }
222
223 let proportional_memory = (limits.memory_mb as f64 * available_fraction).ceil() as usize;
225 let scaled_memory = proportional_memory.max(limits.memory_mb.min(pool.available_memory));
226
227 let scaled_limits = ResourceLimits {
228 memory_mb: scaled_memory.min(limits.memory_mb),
229 cpu_cores: limits.cpu_cores,
230 disk_io_mbps: limits.disk_io_mbps,
231 network_io_mbps: limits.network_io_mbps,
232 execution_timeout: limits.execution_timeout,
233 idle_timeout: limits.idle_timeout,
234 };
235
236 pool.allocate(agent_id, &scaled_limits)
237 .ok_or_else(|| ResourceError::AllocationFailed {
238 agent_id,
239 reason: format!(
240 "Weighted round-robin allocation failed: requested {}MB (scaled from {}MB), available {}MB",
241 scaled_limits.memory_mb, limits.memory_mb, pool.available_memory
242 )
243 .into(),
244 })
245 }
246
247 pub async fn get_statistics(&self) -> LoadBalancingStats {
249 let pool = self.resource_pool.read();
250 let history = self.allocation_history.read();
251 let utilization = pool.get_utilization();
252
253 LoadBalancingStats {
254 total_allocations: history.total_allocations,
255 active_allocations: pool.allocated_agents.len(),
256 memory_utilization: utilization.memory_utilization,
257 cpu_utilization: utilization.cpu_utilization,
258 allocation_failures: history.allocation_failures,
259 average_allocation_time: history.average_allocation_time(),
260 }
261 }
262}
263
264#[derive(Debug)]
266struct AllocationHistory {
267 total_allocations: usize,
268 allocation_failures: usize,
269 allocation_times: Vec<std::time::Duration>,
270 recent_allocations: HashMap<AgentId, std::time::SystemTime>,
271}
272
273impl AllocationHistory {
274 fn new() -> Self {
275 Self {
276 total_allocations: 0,
277 allocation_failures: 0,
278 allocation_times: Vec::new(),
279 recent_allocations: HashMap::new(),
280 }
281 }
282
283 fn record_allocation(&mut self, agent_id: AgentId, duration: std::time::Duration) {
284 self.total_allocations += 1;
285 self.allocation_times.push(duration);
286 self.recent_allocations
287 .insert(agent_id, std::time::SystemTime::now());
288
289 if self.allocation_times.len() > 1000 {
291 self.allocation_times.remove(0);
292 }
293 }
294
295 fn record_failure(&mut self) {
296 self.allocation_failures += 1;
297 }
298
299 fn record_deallocation(&mut self, agent_id: AgentId) {
300 self.recent_allocations.remove(&agent_id);
301 }
302
303 fn average_allocation_time(&self) -> std::time::Duration {
304 if self.allocation_times.is_empty() {
305 std::time::Duration::from_millis(0)
306 } else {
307 let total: std::time::Duration = self.allocation_times.iter().sum();
308 total / self.allocation_times.len() as u32
309 }
310 }
311}
312
313#[derive(Debug, Clone, Serialize, Deserialize)]
315pub struct LoadBalancingStats {
316 pub total_allocations: usize,
317 pub active_allocations: usize,
318 pub memory_utilization: f32,
319 pub cpu_utilization: f32,
320 pub allocation_failures: usize,
321 pub average_allocation_time: std::time::Duration,
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327
328 fn detected_total_memory() -> usize {
330 let mut sys = System::new_all();
331 sys.refresh_all();
332 (sys.total_memory() / (1024 * 1024)) as usize
333 }
334
335 #[tokio::test]
336 async fn test_resource_allocation() {
337 let load_balancer = LoadBalancer::new(LoadBalancingStrategy::ResourceBased);
338
339 let requirements = ResourceRequirements {
340 min_memory_mb: 100,
341 max_memory_mb: 200,
342 min_cpu_cores: 0.5,
343 max_cpu_cores: 1.0,
344 disk_space_mb: 100,
345 network_bandwidth_mbps: 10,
346 };
347
348 let result = load_balancer.allocate_resources(&requirements).await;
349 assert!(result.is_ok());
350
351 let allocation = result.unwrap();
352 assert_eq!(allocation.allocated_memory, 200);
353 assert_eq!(allocation.allocated_cpu_cores, 1.0);
354 }
355
356 #[tokio::test]
357 async fn test_resource_deallocation() {
358 let load_balancer = LoadBalancer::new(LoadBalancingStrategy::ResourceBased);
359
360 let requirements = ResourceRequirements {
361 min_memory_mb: 100,
362 max_memory_mb: 200,
363 min_cpu_cores: 0.5,
364 max_cpu_cores: 1.0,
365 disk_space_mb: 100,
366 network_bandwidth_mbps: 10,
367 };
368
369 let allocation = load_balancer
370 .allocate_resources(&requirements)
371 .await
372 .unwrap();
373
374 load_balancer.deallocate_resources(allocation).await;
375
376 let utilization = load_balancer.get_resource_utilization().await;
378 assert_eq!(utilization.memory_used, 0);
379 }
380
381 #[tokio::test]
382 async fn test_insufficient_resources() {
383 let total_memory = detected_total_memory();
384 let load_balancer = LoadBalancer::new(LoadBalancingStrategy::ResourceBased);
385
386 let requirements = ResourceRequirements {
388 min_memory_mb: total_memory + 1024,
389 max_memory_mb: total_memory + 1024,
390 min_cpu_cores: 1024.0,
391 max_cpu_cores: 1024.0,
392 disk_space_mb: 100,
393 network_bandwidth_mbps: 10,
394 };
395
396 let result = load_balancer.allocate_resources(&requirements).await;
397 assert!(result.is_err());
398
399 if let Err(ResourceError::AllocationFailed { reason, .. }) = result {
400 assert!(reason.contains("Insufficient resources"));
401 }
402 }
403
404 #[tokio::test]
405 async fn test_get_statistics() {
406 let load_balancer = LoadBalancer::new(LoadBalancingStrategy::RoundRobin);
407
408 let stats = load_balancer.get_statistics().await;
409 assert_eq!(stats.total_allocations, 0);
410 assert_eq!(stats.active_allocations, 0);
411 assert_eq!(stats.allocation_failures, 0);
412
413 let requirements = ResourceRequirements {
415 min_memory_mb: 50,
416 max_memory_mb: 100,
417 min_cpu_cores: 0.5,
418 max_cpu_cores: 1.0,
419 disk_space_mb: 50,
420 network_bandwidth_mbps: 10,
421 };
422
423 let _ = load_balancer
424 .allocate_resources(&requirements)
425 .await
426 .unwrap();
427
428 let stats = load_balancer.get_statistics().await;
429 assert_eq!(stats.total_allocations, 1);
430 assert_eq!(stats.active_allocations, 1);
431 assert_eq!(stats.allocation_failures, 0);
432 assert!(stats.memory_utilization > 0.0);
433 }
434
435 #[tokio::test]
436 async fn test_statistics_serializable() {
437 let load_balancer = LoadBalancer::new(LoadBalancingStrategy::RoundRobin);
438 let stats = load_balancer.get_statistics().await;
439 let json = serde_json::to_value(&stats);
440 assert!(json.is_ok());
441 }
442
443 #[tokio::test]
444 async fn test_sysinfo_detection() {
445 let load_balancer = LoadBalancer::new(LoadBalancingStrategy::ResourceBased);
446 let total = detected_total_memory();
447
448 assert!(total > 0);
450
451 let utilization = load_balancer.get_resource_utilization().await;
453 assert_eq!(utilization.memory_used, 0);
454 assert!(utilization.uptime.as_nanos() > 0);
455 }
456}