Skip to main content

symbi_runtime/scheduler/
load_balancer.rs

1//! Load balancer for distributing agents across available resources
2
3use parking_lot::RwLock;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::sync::Arc;
7use sysinfo::System;
8
9use crate::types::*;
10
11/// Load balancer for resource allocation
12pub struct LoadBalancer {
13    strategy: LoadBalancingStrategy,
14    resource_pool: Arc<RwLock<ResourcePool>>,
15    allocation_history: Arc<RwLock<AllocationHistory>>,
16    system_info: Arc<RwLock<System>>,
17    created_at: std::time::Instant,
18}
19
20impl LoadBalancer {
21    /// Create a new load balancer with real system resource detection
22    pub fn new(strategy: LoadBalancingStrategy) -> Self {
23        let mut sys = System::new_all();
24        sys.refresh_all();
25
26        let total_memory = (sys.total_memory() / (1024 * 1024)) as usize; // bytes -> MB
27        let total_cpu_cores = sys.cpus().len();
28
29        Self {
30            strategy,
31            resource_pool: Arc::new(RwLock::new(ResourcePool::new(
32                total_memory,
33                total_cpu_cores,
34            ))),
35            allocation_history: Arc::new(RwLock::new(AllocationHistory::new())),
36            system_info: Arc::new(RwLock::new(sys)),
37            created_at: std::time::Instant::now(),
38        }
39    }
40
41    /// Allocate resources for an agent
42    pub async fn allocate_resources(
43        &self,
44        requirements: &ResourceRequirements,
45    ) -> Result<ResourceAllocation, ResourceError> {
46        let agent_id = AgentId::new(); // This would normally come from the task
47        let start_time = std::time::Instant::now();
48
49        // Convert requirements to limits
50        let limits = ResourceLimits {
51            memory_mb: requirements.max_memory_mb,
52            cpu_cores: requirements.max_cpu_cores,
53            disk_io_mbps: 100,    // Default
54            network_io_mbps: 100, // Default
55            execution_timeout: std::time::Duration::from_secs(3600),
56            idle_timeout: std::time::Duration::from_secs(300),
57        };
58
59        let mut pool = self.resource_pool.write();
60
61        let result = match self.strategy {
62            LoadBalancingStrategy::RoundRobin => {
63                self.allocate_round_robin(&mut pool, agent_id, &limits)
64            }
65            LoadBalancingStrategy::LeastConnections => {
66                self.allocate_least_connections(&mut pool, agent_id, &limits)
67            }
68            LoadBalancingStrategy::ResourceBased => {
69                self.allocate_resource_based(&mut pool, agent_id, &limits)
70            }
71            LoadBalancingStrategy::WeightedRoundRobin => {
72                self.allocate_weighted_round_robin(&mut pool, agent_id, &limits)
73            }
74        };
75
76        // Record allocation metrics
77        let mut history = self.allocation_history.write();
78        match &result {
79            Ok(_) => {
80                let duration = start_time.elapsed();
81                history.record_allocation(agent_id, duration);
82            }
83            Err(_) => {
84                history.record_failure();
85            }
86        }
87
88        result
89    }
90
91    /// Deallocate resources for an agent
92    pub async fn deallocate_resources(&self, allocation: ResourceAllocation) {
93        let mut pool = self.resource_pool.write();
94        pool.deallocate(allocation.agent_id);
95
96        let mut history = self.allocation_history.write();
97        history.record_deallocation(allocation.agent_id);
98    }
99
100    /// Get current resource utilization using real system metrics
101    pub async fn get_resource_utilization(&self) -> ResourceUsage {
102        let pool = self.resource_pool.read();
103        let mut sys = self.system_info.write();
104        sys.refresh_all();
105
106        ResourceUsage {
107            memory_used: pool.total_memory - pool.available_memory,
108            cpu_utilization: sys.global_cpu_info().cpu_usage(),
109            disk_io_rate: 0,    // sysinfo doesn't track disk I/O rate directly
110            network_io_rate: 0, // sysinfo doesn't track network I/O rate directly
111            uptime: self.created_at.elapsed(),
112        }
113    }
114
115    /// Round-robin allocation strategy
116    fn allocate_round_robin(
117        &self,
118        pool: &mut ResourcePool,
119        agent_id: AgentId,
120        limits: &ResourceLimits,
121    ) -> Result<ResourceAllocation, ResourceError> {
122        pool.allocate(agent_id, limits)
123            .ok_or_else(|| ResourceError::AllocationFailed {
124                agent_id,
125                reason: "Insufficient resources for round-robin allocation".into(),
126            })
127    }
128
129    /// Least connections allocation strategy
130    ///
131    /// Considers the count of active allocations relative to a capacity threshold
132    /// (80% of max based on memory/CPU). Rejects when active count exceeds threshold.
133    fn allocate_least_connections(
134        &self,
135        pool: &mut ResourcePool,
136        agent_id: AgentId,
137        limits: &ResourceLimits,
138    ) -> Result<ResourceAllocation, ResourceError> {
139        let active_count = pool.allocated_agents.len();
140        // Estimate max agents from available resources (use memory as proxy)
141        let max_agents_estimate = if limits.memory_mb > 0 {
142            pool.total_memory / limits.memory_mb
143        } else {
144            pool.total_memory // Fallback: 1 agent per MB
145        };
146        let threshold = (max_agents_estimate as f64 * 0.8) as usize;
147
148        if active_count >= threshold.max(1) {
149            return Err(ResourceError::AllocationFailed {
150                agent_id,
151                reason: format!(
152                    "Active allocation count ({}) exceeds 80% capacity threshold ({})",
153                    active_count, threshold
154                )
155                .into(),
156            });
157        }
158
159        pool.allocate(agent_id, limits)
160            .ok_or_else(|| ResourceError::AllocationFailed {
161                agent_id,
162                reason: "Insufficient resources for least-connections allocation".into(),
163            })
164    }
165
166    /// Resource-based allocation strategy
167    fn allocate_resource_based(
168        &self,
169        pool: &mut ResourcePool,
170        agent_id: AgentId,
171        limits: &ResourceLimits,
172    ) -> Result<ResourceAllocation, ResourceError> {
173        // Check if we have enough resources
174        if !pool.can_allocate(limits) {
175            return Err(ResourceError::AllocationFailed {
176                agent_id,
177                reason: format!(
178                    "Insufficient resources: need {}MB memory, {}CPU cores, available: {}MB memory, {:.2}CPU cores",
179                    limits.memory_mb,
180                    limits.cpu_cores,
181                    pool.available_memory,
182                    pool.available_cpu_cores
183                )
184                .into(),
185            });
186        }
187
188        pool.allocate(agent_id, limits)
189            .ok_or_else(|| ResourceError::AllocationFailed {
190                agent_id,
191                reason: "Resource allocation failed unexpectedly".into(),
192            })
193    }
194
195    /// Weighted round-robin allocation strategy
196    ///
197    /// Scales allocation based on available resource fraction and applies back-pressure
198    /// as the system fills up. Rejects when less than 10% resources remain.
199    fn allocate_weighted_round_robin(
200        &self,
201        pool: &mut ResourcePool,
202        agent_id: AgentId,
203        limits: &ResourceLimits,
204    ) -> Result<ResourceAllocation, ResourceError> {
205        let available_fraction = if pool.total_memory > 0 {
206            pool.available_memory as f64 / pool.total_memory as f64
207        } else {
208            0.0
209        };
210
211        // Reject if less than 10% resources remain to preserve headroom
212        if available_fraction < 0.1 {
213            return Err(ResourceError::AllocationFailed {
214                agent_id,
215                reason: format!(
216                    "Weighted round-robin rejected: only {:.1}% resources available (minimum 10% required)",
217                    available_fraction * 100.0
218                )
219                .into(),
220            });
221        }
222
223        // Scale requested memory proportionally to available resources
224        let proportional_memory = (limits.memory_mb as f64 * available_fraction).ceil() as usize;
225        let scaled_memory = proportional_memory.max(limits.memory_mb.min(pool.available_memory));
226
227        let scaled_limits = ResourceLimits {
228            memory_mb: scaled_memory.min(limits.memory_mb),
229            cpu_cores: limits.cpu_cores,
230            disk_io_mbps: limits.disk_io_mbps,
231            network_io_mbps: limits.network_io_mbps,
232            execution_timeout: limits.execution_timeout,
233            idle_timeout: limits.idle_timeout,
234        };
235
236        pool.allocate(agent_id, &scaled_limits)
237            .ok_or_else(|| ResourceError::AllocationFailed {
238                agent_id,
239                reason: format!(
240                    "Weighted round-robin allocation failed: requested {}MB (scaled from {}MB), available {}MB",
241                    scaled_limits.memory_mb, limits.memory_mb, pool.available_memory
242                )
243                .into(),
244            })
245    }
246
247    /// Get load balancing statistics
248    pub async fn get_statistics(&self) -> LoadBalancingStats {
249        let pool = self.resource_pool.read();
250        let history = self.allocation_history.read();
251        let utilization = pool.get_utilization();
252
253        LoadBalancingStats {
254            total_allocations: history.total_allocations,
255            active_allocations: pool.allocated_agents.len(),
256            memory_utilization: utilization.memory_utilization,
257            cpu_utilization: utilization.cpu_utilization,
258            allocation_failures: history.allocation_failures,
259            average_allocation_time: history.average_allocation_time(),
260        }
261    }
262}
263
264/// Allocation history for tracking and optimization
265#[derive(Debug)]
266struct AllocationHistory {
267    total_allocations: usize,
268    allocation_failures: usize,
269    allocation_times: Vec<std::time::Duration>,
270    recent_allocations: HashMap<AgentId, std::time::SystemTime>,
271}
272
273impl AllocationHistory {
274    fn new() -> Self {
275        Self {
276            total_allocations: 0,
277            allocation_failures: 0,
278            allocation_times: Vec::new(),
279            recent_allocations: HashMap::new(),
280        }
281    }
282
283    fn record_allocation(&mut self, agent_id: AgentId, duration: std::time::Duration) {
284        self.total_allocations += 1;
285        self.allocation_times.push(duration);
286        self.recent_allocations
287            .insert(agent_id, std::time::SystemTime::now());
288
289        // Keep only recent allocation times (last 1000)
290        if self.allocation_times.len() > 1000 {
291            self.allocation_times.remove(0);
292        }
293    }
294
295    fn record_failure(&mut self) {
296        self.allocation_failures += 1;
297    }
298
299    fn record_deallocation(&mut self, agent_id: AgentId) {
300        self.recent_allocations.remove(&agent_id);
301    }
302
303    fn average_allocation_time(&self) -> std::time::Duration {
304        if self.allocation_times.is_empty() {
305            std::time::Duration::from_millis(0)
306        } else {
307            let total: std::time::Duration = self.allocation_times.iter().sum();
308            total / self.allocation_times.len() as u32
309        }
310    }
311}
312
313/// Load balancing statistics
314#[derive(Debug, Clone, Serialize, Deserialize)]
315pub struct LoadBalancingStats {
316    pub total_allocations: usize,
317    pub active_allocations: usize,
318    pub memory_utilization: f32,
319    pub cpu_utilization: f32,
320    pub allocation_failures: usize,
321    pub average_allocation_time: std::time::Duration,
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327
328    /// Helper to get the detected total memory for assertions
329    fn detected_total_memory() -> usize {
330        let mut sys = System::new_all();
331        sys.refresh_all();
332        (sys.total_memory() / (1024 * 1024)) as usize
333    }
334
335    #[tokio::test]
336    async fn test_resource_allocation() {
337        let load_balancer = LoadBalancer::new(LoadBalancingStrategy::ResourceBased);
338
339        let requirements = ResourceRequirements {
340            min_memory_mb: 100,
341            max_memory_mb: 200,
342            min_cpu_cores: 0.5,
343            max_cpu_cores: 1.0,
344            disk_space_mb: 100,
345            network_bandwidth_mbps: 10,
346        };
347
348        let result = load_balancer.allocate_resources(&requirements).await;
349        assert!(result.is_ok());
350
351        let allocation = result.unwrap();
352        assert_eq!(allocation.allocated_memory, 200);
353        assert_eq!(allocation.allocated_cpu_cores, 1.0);
354    }
355
356    #[tokio::test]
357    async fn test_resource_deallocation() {
358        let load_balancer = LoadBalancer::new(LoadBalancingStrategy::ResourceBased);
359
360        let requirements = ResourceRequirements {
361            min_memory_mb: 100,
362            max_memory_mb: 200,
363            min_cpu_cores: 0.5,
364            max_cpu_cores: 1.0,
365            disk_space_mb: 100,
366            network_bandwidth_mbps: 10,
367        };
368
369        let allocation = load_balancer
370            .allocate_resources(&requirements)
371            .await
372            .unwrap();
373
374        load_balancer.deallocate_resources(allocation).await;
375
376        // After deallocation, memory_used should be 0
377        let utilization = load_balancer.get_resource_utilization().await;
378        assert_eq!(utilization.memory_used, 0);
379    }
380
381    #[tokio::test]
382    async fn test_insufficient_resources() {
383        let total_memory = detected_total_memory();
384        let load_balancer = LoadBalancer::new(LoadBalancingStrategy::ResourceBased);
385
386        // Request more than the system actually has
387        let requirements = ResourceRequirements {
388            min_memory_mb: total_memory + 1024,
389            max_memory_mb: total_memory + 1024,
390            min_cpu_cores: 1024.0,
391            max_cpu_cores: 1024.0,
392            disk_space_mb: 100,
393            network_bandwidth_mbps: 10,
394        };
395
396        let result = load_balancer.allocate_resources(&requirements).await;
397        assert!(result.is_err());
398
399        if let Err(ResourceError::AllocationFailed { reason, .. }) = result {
400            assert!(reason.contains("Insufficient resources"));
401        }
402    }
403
404    #[tokio::test]
405    async fn test_get_statistics() {
406        let load_balancer = LoadBalancer::new(LoadBalancingStrategy::RoundRobin);
407
408        let stats = load_balancer.get_statistics().await;
409        assert_eq!(stats.total_allocations, 0);
410        assert_eq!(stats.active_allocations, 0);
411        assert_eq!(stats.allocation_failures, 0);
412
413        // Allocate one agent
414        let requirements = ResourceRequirements {
415            min_memory_mb: 50,
416            max_memory_mb: 100,
417            min_cpu_cores: 0.5,
418            max_cpu_cores: 1.0,
419            disk_space_mb: 50,
420            network_bandwidth_mbps: 10,
421        };
422
423        let _ = load_balancer
424            .allocate_resources(&requirements)
425            .await
426            .unwrap();
427
428        let stats = load_balancer.get_statistics().await;
429        assert_eq!(stats.total_allocations, 1);
430        assert_eq!(stats.active_allocations, 1);
431        assert_eq!(stats.allocation_failures, 0);
432        assert!(stats.memory_utilization > 0.0);
433    }
434
435    #[tokio::test]
436    async fn test_statistics_serializable() {
437        let load_balancer = LoadBalancer::new(LoadBalancingStrategy::RoundRobin);
438        let stats = load_balancer.get_statistics().await;
439        let json = serde_json::to_value(&stats);
440        assert!(json.is_ok());
441    }
442
443    #[tokio::test]
444    async fn test_sysinfo_detection() {
445        let load_balancer = LoadBalancer::new(LoadBalancingStrategy::ResourceBased);
446        let total = detected_total_memory();
447
448        // Verify the load balancer detected real system memory (should be > 0)
449        assert!(total > 0);
450
451        // Verify utilization reports zero usage initially
452        let utilization = load_balancer.get_resource_utilization().await;
453        assert_eq!(utilization.memory_used, 0);
454        assert!(utilization.uptime.as_nanos() > 0);
455    }
456}