Skip to main content

symbi_runtime/resource/
mod.rs

1//! Agent Resource Manager
2//!
3//! Manages resource allocation, monitoring, and enforcement for agents
4
5use async_trait::async_trait;
6use parking_lot::RwLock;
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, SystemTime};
10use tokio::sync::{mpsc, Notify};
11use tokio::time::interval;
12
13use crate::integrations::policy_engine::{
14    PolicyEnforcementFactory, PolicyEnforcementPoint, ResourceAccessConfig,
15    ResourceAllocationRequest,
16};
17use crate::types::*;
18
19/// Resource manager trait
20#[async_trait]
21pub trait ResourceManager {
22    /// Allocate resources for an agent
23    async fn allocate_resources(
24        &self,
25        agent_id: AgentId,
26        requirements: ResourceRequirements,
27    ) -> Result<ResourceAllocation, ResourceError>;
28
29    /// Deallocate resources for an agent
30    async fn deallocate_resources(&self, agent_id: AgentId) -> Result<(), ResourceError>;
31
32    /// Update resource usage for an agent
33    async fn update_usage(
34        &self,
35        agent_id: AgentId,
36        usage: ResourceUsage,
37    ) -> Result<(), ResourceError>;
38
39    /// Get current resource usage for an agent
40    async fn get_usage(&self, agent_id: AgentId) -> Result<ResourceUsage, ResourceError>;
41
42    /// Get system resource status
43    async fn get_system_status(&self) -> ResourceSystemStatus;
44
45    /// Set resource limits for an agent
46    async fn set_limits(
47        &self,
48        agent_id: AgentId,
49        limits: ResourceLimits,
50    ) -> Result<(), ResourceError>;
51
52    /// Check if agent is within resource limits
53    async fn check_limits(&self, agent_id: AgentId) -> Result<bool, ResourceError>;
54
55    /// Check resource access violations for an agent
56    async fn check_resource_violations(
57        &self,
58        agent_id: AgentId,
59    ) -> Result<Vec<ResourceViolation>, ResourceError>;
60
61    /// Shutdown the resource manager
62    async fn shutdown(&self) -> Result<(), ResourceError>;
63
64    /// Check the health of the resource manager
65    async fn check_health(&self) -> Result<ComponentHealth, ResourceError>;
66}
67
68/// Resource manager configuration
69#[derive(Debug, Clone)]
70pub struct ResourceManagerConfig {
71    pub total_memory: usize,
72    pub total_cpu_cores: u32,
73    pub total_disk_space: usize,
74    pub total_network_bandwidth: usize,
75    pub monitoring_interval: Duration,
76    pub enforcement_enabled: bool,
77    pub auto_scaling_enabled: bool,
78    pub resource_reservation_percentage: f32,
79    pub policy_enforcement_config: ResourceAccessConfig,
80}
81
82impl Default for ResourceManagerConfig {
83    fn default() -> Self {
84        Self {
85            total_memory: 16 * 1024 * 1024 * 1024, // 16GB
86            total_cpu_cores: 8,
87            total_disk_space: 1024 * 1024 * 1024 * 1024, // 1TB
88            total_network_bandwidth: 1000 * 1024 * 1024, // 1Gbps
89            monitoring_interval: Duration::from_secs(5),
90            enforcement_enabled: true,
91            auto_scaling_enabled: false,
92            resource_reservation_percentage: 0.1, // 10% reserved
93            policy_enforcement_config: ResourceAccessConfig::default(),
94        }
95    }
96}
97
98/// Default implementation of the resource manager
99pub struct DefaultResourceManager {
100    config: ResourceManagerConfig,
101    allocations: Arc<RwLock<HashMap<AgentId, ResourceAllocation>>>,
102    usage_tracker: Arc<RwLock<HashMap<AgentId, ResourceUsage>>>,
103    system_resources: Arc<RwLock<SystemResources>>,
104    monitoring_sender: mpsc::UnboundedSender<MonitoringEvent>,
105    shutdown_notify: Arc<Notify>,
106    is_running: Arc<RwLock<bool>>,
107    policy_enforcement: Arc<dyn PolicyEnforcementPoint>,
108}
109
110impl DefaultResourceManager {
111    /// Create a new resource manager
112    pub async fn new(config: ResourceManagerConfig) -> Result<Self, ResourceError> {
113        let allocations = Arc::new(RwLock::new(HashMap::new()));
114        let usage_tracker = Arc::new(RwLock::new(HashMap::new()));
115        let system_resources = Arc::new(RwLock::new(SystemResources::new(&config)));
116        let (monitoring_sender, monitoring_receiver) = mpsc::unbounded_channel();
117        let shutdown_notify = Arc::new(Notify::new());
118        let is_running = Arc::new(RwLock::new(true));
119
120        // Create policy enforcement point
121        let policy_enforcement = PolicyEnforcementFactory::create_enforcement_point(
122            config.policy_enforcement_config.clone(),
123        )
124        .await
125        .map_err(|e| {
126            ResourceError::PolicyError(format!("Failed to create policy enforcement: {}", e))
127        })?;
128
129        let manager = Self {
130            config,
131            allocations,
132            usage_tracker,
133            system_resources,
134            monitoring_sender,
135            shutdown_notify,
136            is_running,
137            policy_enforcement,
138        };
139
140        // Start background tasks
141        manager.start_monitoring_loop(monitoring_receiver).await;
142        manager.start_enforcement_loop().await;
143
144        Ok(manager)
145    }
146
147    /// Start the resource monitoring loop
148    async fn start_monitoring_loop(
149        &self,
150        mut monitoring_receiver: mpsc::UnboundedReceiver<MonitoringEvent>,
151    ) {
152        let usage_tracker = self.usage_tracker.clone();
153        let allocations = self.allocations.clone();
154        let system_resources = self.system_resources.clone();
155        let shutdown_notify = self.shutdown_notify.clone();
156
157        tokio::spawn(async move {
158            loop {
159                tokio::select! {
160                    event = monitoring_receiver.recv() => {
161                        if let Some(event) = event {
162                            Self::process_monitoring_event(event, &usage_tracker, &allocations, &system_resources).await;
163                        } else {
164                            break;
165                        }
166                    }
167                    _ = shutdown_notify.notified() => {
168                        break;
169                    }
170                }
171            }
172        });
173    }
174
175    /// Start the resource enforcement loop
176    async fn start_enforcement_loop(&self) {
177        let usage_tracker = self.usage_tracker.clone();
178        let allocations = self.allocations.clone();
179        let monitoring_sender = self.monitoring_sender.clone();
180        let shutdown_notify = self.shutdown_notify.clone();
181        let is_running = self.is_running.clone();
182        let monitoring_interval = self.config.monitoring_interval;
183        let enforcement_enabled = self.config.enforcement_enabled;
184
185        tokio::spawn(async move {
186            let mut interval = interval(monitoring_interval);
187
188            loop {
189                tokio::select! {
190                    _ = interval.tick() => {
191                        if !*is_running.read() {
192                            break;
193                        }
194
195                        if enforcement_enabled {
196                            Self::enforce_resource_limits(&usage_tracker, &allocations, &monitoring_sender).await;
197                        }
198                    }
199                    _ = shutdown_notify.notified() => {
200                        break;
201                    }
202                }
203            }
204        });
205    }
206
207    /// Process a monitoring event
208    async fn process_monitoring_event(
209        event: MonitoringEvent,
210        usage_tracker: &Arc<RwLock<HashMap<AgentId, ResourceUsage>>>,
211        allocations: &Arc<RwLock<HashMap<AgentId, ResourceAllocation>>>,
212        system_resources: &Arc<RwLock<SystemResources>>,
213    ) {
214        match event {
215            MonitoringEvent::UsageUpdate { agent_id, usage } => {
216                usage_tracker.write().insert(agent_id, usage.clone());
217
218                // Update system resource usage
219                system_resources.write().update_usage(&usage);
220
221                tracing::debug!("Updated resource usage for agent {}: {:?}", agent_id, usage);
222            }
223            MonitoringEvent::AllocationRequest {
224                agent_id,
225                requirements,
226            } => {
227                let mut system = system_resources.write();
228                if system.can_allocate(&requirements) {
229                    let allocation = system.allocate(&requirements);
230                    allocations.write().insert(agent_id, allocation.clone());
231
232                    tracing::info!(
233                        "Allocated resources for agent {}: {:?}",
234                        agent_id,
235                        allocation
236                    );
237                } else {
238                    tracing::warn!(
239                        "Cannot allocate resources for agent {}: insufficient resources",
240                        agent_id
241                    );
242                }
243            }
244            MonitoringEvent::DeallocationRequest { agent_id } => {
245                if let Some(allocation) = allocations.write().remove(&agent_id) {
246                    system_resources.write().deallocate(&allocation);
247                    usage_tracker.write().remove(&agent_id);
248
249                    tracing::info!("Deallocated resources for agent {}", agent_id);
250                }
251            }
252            MonitoringEvent::LimitViolation {
253                agent_id,
254                violations,
255            } => {
256                // Handle limit violation event - this is typically sent by the enforcement loop
257                // and processed by external systems, so we just log it here
258                tracing::warn!(
259                    "Resource limit violation detected for agent {}: {:?}",
260                    agent_id,
261                    violations
262                );
263            }
264        }
265    }
266
267    /// Enforce resource limits
268    async fn enforce_resource_limits(
269        usage_tracker: &Arc<RwLock<HashMap<AgentId, ResourceUsage>>>,
270        allocations: &Arc<RwLock<HashMap<AgentId, ResourceAllocation>>>,
271        monitoring_sender: &mpsc::UnboundedSender<MonitoringEvent>,
272    ) {
273        let usage_map = usage_tracker.read();
274        let allocations_map = allocations.read();
275
276        for (agent_id, usage) in usage_map.iter() {
277            if let Some(allocation) = allocations_map.get(agent_id) {
278                // Create limits from allocation for violation checking
279                let limits = ResourceLimits {
280                    memory_mb: allocation.allocated_memory / (1024 * 1024),
281                    cpu_cores: allocation.allocated_cpu_cores,
282                    disk_io_mbps: allocation.allocated_disk_io / (1024 * 1024),
283                    network_io_mbps: allocation.allocated_network_io / (1024 * 1024),
284                    execution_timeout: Duration::from_secs(3600),
285                    idle_timeout: Duration::from_secs(300),
286                };
287                let violations = Self::check_resource_violations(usage, &limits);
288
289                if !violations.is_empty() {
290                    tracing::warn!(
291                        "Agent {} violated resource limits: {:?}",
292                        agent_id,
293                        violations
294                    );
295
296                    // Send violation event (could trigger throttling, suspension, etc.)
297                    let _ = monitoring_sender.send(MonitoringEvent::LimitViolation {
298                        agent_id: *agent_id,
299                        violations,
300                    });
301                }
302            }
303        }
304    }
305
306    /// Check for resource limit violations
307    fn check_resource_violations(
308        usage: &ResourceUsage,
309        limits: &ResourceLimits,
310    ) -> Vec<ResourceViolation> {
311        let mut violations = Vec::new();
312
313        if usage.memory_used > limits.memory_mb * 1024 * 1024 {
314            violations.push(ResourceViolation::Memory {
315                used: usage.memory_used,
316                limit: limits.memory_mb * 1024 * 1024,
317            });
318        }
319
320        if usage.cpu_utilization > limits.cpu_cores {
321            violations.push(ResourceViolation::Cpu {
322                used: usage.cpu_utilization,
323                limit: limits.cpu_cores,
324            });
325        }
326
327        if usage.disk_io_rate > limits.disk_io_mbps * 1024 * 1024 {
328            violations.push(ResourceViolation::DiskIo {
329                used: usage.disk_io_rate,
330                limit: limits.disk_io_mbps * 1024 * 1024,
331            });
332        }
333
334        if usage.network_io_rate > limits.network_io_mbps * 1024 * 1024 {
335            violations.push(ResourceViolation::NetworkIo {
336                used: usage.network_io_rate,
337                limit: limits.network_io_mbps * 1024 * 1024,
338            });
339        }
340
341        violations
342    }
343
344    /// Send a monitoring event
345    fn send_monitoring_event(&self, event: MonitoringEvent) -> Result<(), ResourceError> {
346        self.monitoring_sender.send(event).map_err(|_| {
347            ResourceError::MonitoringFailed("Failed to send monitoring event".to_string())
348        })
349    }
350}
351
352#[async_trait]
353impl ResourceManager for DefaultResourceManager {
354    async fn allocate_resources(
355        &self,
356        agent_id: AgentId,
357        requirements: ResourceRequirements,
358    ) -> Result<ResourceAllocation, ResourceError> {
359        if !*self.is_running.read() {
360            return Err(ResourceError::ShuttingDown);
361        }
362
363        // Check if agent already has allocation
364        if self.allocations.read().contains_key(&agent_id) {
365            return Err(ResourceError::AllocationExists { agent_id });
366        }
367
368        // Check policy for resource allocation
369        let allocation_request = ResourceAllocationRequest {
370            agent_id,
371            requirements: requirements.clone(),
372            priority: Priority::Normal,
373            justification: None,
374            max_duration: None,
375            timestamp: SystemTime::now(),
376        };
377
378        let policy_decision = self
379            .policy_enforcement
380            .validate_resource_allocation(agent_id, &allocation_request)
381            .await
382            .map_err(|e| ResourceError::PolicyError(format!("Policy validation failed: {}", e)))?;
383
384        let final_requirements = match policy_decision.decision {
385            crate::integrations::policy_engine::AllocationResult::Approve => requirements,
386            crate::integrations::policy_engine::AllocationResult::Deny => {
387                return Err(ResourceError::PolicyViolation {
388                    reason: policy_decision.reason.into(),
389                });
390            }
391            crate::integrations::policy_engine::AllocationResult::Modified => {
392                // Use modified requirements if provided
393                policy_decision
394                    .modified_requirements
395                    .unwrap_or(requirements)
396            }
397            crate::integrations::policy_engine::AllocationResult::Queued => {
398                return Err(ResourceError::AllocationQueued {
399                    reason: policy_decision.reason.into(),
400                });
401            }
402            crate::integrations::policy_engine::AllocationResult::Escalate => {
403                return Err(ResourceError::EscalationRequired {
404                    reason: policy_decision.reason.into(),
405                });
406            }
407        };
408
409        // Send allocation request
410        self.send_monitoring_event(MonitoringEvent::AllocationRequest {
411            agent_id,
412            requirements: final_requirements.clone(),
413        })?;
414
415        // Give the monitoring loop time to process
416        tokio::time::sleep(Duration::from_millis(10)).await;
417
418        // Check if allocation was successful
419        self.allocations.read().get(&agent_id).cloned().ok_or(
420            ResourceError::InsufficientResources {
421                requirements: "Insufficient system resources".into(),
422            },
423        )
424    }
425
426    async fn deallocate_resources(&self, agent_id: AgentId) -> Result<(), ResourceError> {
427        self.send_monitoring_event(MonitoringEvent::DeallocationRequest { agent_id })?;
428
429        // Give the monitoring loop time to process
430        tokio::time::sleep(Duration::from_millis(10)).await;
431
432        Ok(())
433    }
434
435    async fn update_usage(
436        &self,
437        agent_id: AgentId,
438        usage: ResourceUsage,
439    ) -> Result<(), ResourceError> {
440        self.send_monitoring_event(MonitoringEvent::UsageUpdate { agent_id, usage })?;
441        Ok(())
442    }
443
444    async fn get_usage(&self, agent_id: AgentId) -> Result<ResourceUsage, ResourceError> {
445        self.usage_tracker
446            .read()
447            .get(&agent_id)
448            .cloned()
449            .ok_or(ResourceError::AgentNotFound { agent_id })
450    }
451
452    async fn get_system_status(&self) -> ResourceSystemStatus {
453        let system = self.system_resources.read();
454        let allocations_count = self.allocations.read().len();
455
456        // Use the resource info to access reserved fields
457        let resource_info = system.get_resource_info();
458
459        ResourceSystemStatus {
460            total_memory: self.config.total_memory,
461            available_memory: resource_info.available_memory,
462            total_cpu_cores: self.config.total_cpu_cores,
463            available_cpu_cores: resource_info.available_cpu_cores,
464            total_disk_space: self.config.total_disk_space,
465            available_disk_space: resource_info.available_disk_space,
466            total_network_bandwidth: self.config.total_network_bandwidth,
467            available_network_bandwidth: resource_info.available_network_bandwidth,
468            active_allocations: allocations_count,
469            last_updated: SystemTime::now(),
470        }
471    }
472
473    async fn set_limits(
474        &self,
475        agent_id: AgentId,
476        limits: ResourceLimits,
477    ) -> Result<(), ResourceError> {
478        let mut allocations = self.allocations.write();
479        if let Some(allocation) = allocations.get_mut(&agent_id) {
480            // Update allocation fields based on limits
481            allocation.allocated_memory = limits.memory_mb * 1024 * 1024;
482            allocation.allocated_cpu_cores = limits.cpu_cores;
483            allocation.allocated_disk_io = limits.disk_io_mbps * 1024 * 1024;
484            allocation.allocated_network_io = limits.network_io_mbps * 1024 * 1024;
485            Ok(())
486        } else {
487            Err(ResourceError::AgentNotFound { agent_id })
488        }
489    }
490
491    async fn check_limits(&self, agent_id: AgentId) -> Result<bool, ResourceError> {
492        let usage_map = self.usage_tracker.read();
493        let allocations_map = self.allocations.read();
494
495        // Agent must have allocation to check limits
496        if let Some(allocation) = allocations_map.get(&agent_id) {
497            // If there's usage data, check for violations
498            if let Some(usage) = usage_map.get(&agent_id) {
499                let limits = ResourceLimits {
500                    memory_mb: allocation.allocated_memory / (1024 * 1024),
501                    cpu_cores: allocation.allocated_cpu_cores,
502                    disk_io_mbps: allocation.allocated_disk_io / (1024 * 1024),
503                    network_io_mbps: allocation.allocated_network_io / (1024 * 1024),
504                    execution_timeout: Duration::from_secs(3600),
505                    idle_timeout: Duration::from_secs(300),
506                };
507                let violations = Self::check_resource_violations(usage, &limits);
508                Ok(violations.is_empty())
509            } else {
510                // No usage data yet - assume within limits since no actual usage recorded
511                Ok(true)
512            }
513        } else {
514            Err(ResourceError::AgentNotFound { agent_id })
515        }
516    }
517
518    async fn check_resource_violations(
519        &self,
520        agent_id: AgentId,
521    ) -> Result<Vec<ResourceViolation>, ResourceError> {
522        let usage_map = self.usage_tracker.read();
523        let allocations_map = self.allocations.read();
524
525        if let (Some(usage), Some(allocation)) =
526            (usage_map.get(&agent_id), allocations_map.get(&agent_id))
527        {
528            // Create limits from allocation for violation checking
529            let limits = ResourceLimits {
530                memory_mb: allocation.allocated_memory / (1024 * 1024),
531                cpu_cores: allocation.allocated_cpu_cores,
532                disk_io_mbps: allocation.allocated_disk_io / (1024 * 1024),
533                network_io_mbps: allocation.allocated_network_io / (1024 * 1024),
534                execution_timeout: Duration::from_secs(3600),
535                idle_timeout: Duration::from_secs(300),
536            };
537            Ok(Self::check_resource_violations(usage, &limits))
538        } else {
539            Err(ResourceError::AgentNotFound { agent_id })
540        }
541    }
542
543    async fn shutdown(&self) -> Result<(), ResourceError> {
544        tracing::info!("Shutting down resource manager");
545
546        *self.is_running.write() = false;
547        self.shutdown_notify.notify_waiters();
548
549        // Deallocate all resources
550        let agent_ids: Vec<AgentId> = self.allocations.read().keys().copied().collect();
551
552        for agent_id in agent_ids {
553            if let Err(e) = self.deallocate_resources(agent_id).await {
554                tracing::error!(
555                    "Failed to deallocate resources for agent {} during shutdown: {}",
556                    agent_id,
557                    e
558                );
559            }
560        }
561
562        Ok(())
563    }
564
565    async fn check_health(&self) -> Result<ComponentHealth, ResourceError> {
566        let is_running = *self.is_running.read();
567        if !is_running {
568            return Ok(ComponentHealth::unhealthy(
569                "Resource manager is shut down".to_string(),
570            ));
571        }
572
573        let system_status = self.get_system_status().await;
574        let allocations_count = self.allocations.read().len();
575
576        // Calculate resource utilization percentages
577        let memory_usage = if system_status.total_memory > 0 {
578            (system_status.total_memory - system_status.available_memory) as f64
579                / system_status.total_memory as f64
580        } else {
581            0.0
582        };
583
584        let cpu_usage = if system_status.total_cpu_cores > 0 {
585            (system_status.total_cpu_cores - system_status.available_cpu_cores) as f64
586                / system_status.total_cpu_cores as f64
587        } else {
588            0.0
589        };
590
591        let status = if memory_usage > 0.9 || cpu_usage > 0.9 {
592            ComponentHealth::unhealthy(format!(
593                "Critical resource usage - Memory: {:.1}%, CPU: {:.1}%",
594                memory_usage * 100.0,
595                cpu_usage * 100.0
596            ))
597        } else if memory_usage > 0.8 || cpu_usage > 0.8 {
598            ComponentHealth::degraded(format!(
599                "High resource usage - Memory: {:.1}%, CPU: {:.1}%",
600                memory_usage * 100.0,
601                cpu_usage * 100.0
602            ))
603        } else {
604            ComponentHealth::healthy(Some(format!(
605                "Resources available - Memory: {:.1}%, CPU: {:.1}%, {} active allocations",
606                memory_usage * 100.0,
607                cpu_usage * 100.0,
608                allocations_count
609            )))
610        };
611
612        Ok(status
613            .with_metric(
614                "memory_usage_percent".to_string(),
615                format!("{:.2}", memory_usage * 100.0),
616            )
617            .with_metric(
618                "cpu_usage_percent".to_string(),
619                format!("{:.2}", cpu_usage * 100.0),
620            )
621            .with_metric(
622                "active_allocations".to_string(),
623                allocations_count.to_string(),
624            )
625            .with_metric(
626                "available_memory_mb".to_string(),
627                (system_status.available_memory / (1024 * 1024)).to_string(),
628            )
629            .with_metric(
630                "available_cpu_cores".to_string(),
631                system_status.available_cpu_cores.to_string(),
632            ))
633    }
634}
635
636/// System resources tracking
637#[derive(Debug, Clone)]
638struct SystemResources {
639    available_memory: usize,
640    available_cpu_cores: u32,
641    available_disk_space: usize,
642    available_network_bandwidth: usize,
643    reserved_memory: usize,
644    reserved_cpu_cores: u32,
645    reserved_disk_space: usize,
646    reserved_network_bandwidth: usize,
647}
648
649impl SystemResources {
650    fn new(config: &ResourceManagerConfig) -> Self {
651        let reservation_factor = config.resource_reservation_percentage;
652
653        // Use ceiling operation to ensure we always reserve at least 1 CPU when factor > 0
654        let reserved_cpu = if reservation_factor > 0.0 {
655            ((config.total_cpu_cores as f32 * reservation_factor).ceil() as u32).max(1)
656        } else {
657            0
658        };
659        let available_cpu = config.total_cpu_cores.saturating_sub(reserved_cpu);
660
661        Self {
662            available_memory: config.total_memory
663                - (config.total_memory as f32 * reservation_factor) as usize,
664            available_cpu_cores: available_cpu,
665            available_disk_space: config.total_disk_space
666                - (config.total_disk_space as f32 * reservation_factor) as usize,
667            available_network_bandwidth: config.total_network_bandwidth
668                - (config.total_network_bandwidth as f32 * reservation_factor) as usize,
669            reserved_memory: (config.total_memory as f32 * reservation_factor) as usize,
670            reserved_cpu_cores: reserved_cpu,
671            reserved_disk_space: (config.total_disk_space as f32 * reservation_factor) as usize,
672            reserved_network_bandwidth: (config.total_network_bandwidth as f32 * reservation_factor)
673                as usize,
674        }
675    }
676
677    fn can_allocate(&self, requirements: &ResourceRequirements) -> bool {
678        self.available_memory >= requirements.max_memory_mb * 1024 * 1024
679            && self.available_cpu_cores >= requirements.max_cpu_cores as u32
680            && self.available_disk_space >= requirements.disk_space_mb * 1024 * 1024
681            && self.available_network_bandwidth >= requirements.network_bandwidth_mbps * 1024 * 1024
682    }
683
684    fn allocate(&mut self, requirements: &ResourceRequirements) -> ResourceAllocation {
685        let memory_bytes = requirements.max_memory_mb * 1024 * 1024;
686        let disk_bytes = requirements.disk_space_mb * 1024 * 1024;
687        let network_bytes = requirements.network_bandwidth_mbps * 1024 * 1024;
688
689        self.available_memory -= memory_bytes;
690        self.available_cpu_cores -= requirements.max_cpu_cores as u32;
691        self.available_disk_space -= disk_bytes;
692        self.available_network_bandwidth -= network_bytes;
693
694        ResourceAllocation {
695            agent_id: AgentId::new(), // Will be set by caller
696            allocated_memory: memory_bytes,
697            allocated_cpu_cores: requirements.max_cpu_cores,
698            allocated_disk_io: disk_bytes,
699            allocated_network_io: network_bytes,
700            allocation_time: SystemTime::now(),
701        }
702    }
703
704    fn deallocate(&mut self, allocation: &ResourceAllocation) {
705        self.available_memory += allocation.allocated_memory;
706        self.available_cpu_cores += allocation.allocated_cpu_cores as u32;
707        self.available_disk_space += allocation.allocated_disk_io;
708        self.available_network_bandwidth += allocation.allocated_network_io;
709    }
710
711    fn update_usage(&mut self, _usage: &ResourceUsage) {
712        // In a real implementation, this would update current usage metrics
713        // For now, we just track allocations vs available resources
714    }
715
716    /// Get system resource information including reservations
717    fn get_resource_info(&self) -> ResourceInfo {
718        ResourceInfo {
719            available_memory: self.available_memory,
720            available_cpu_cores: self.available_cpu_cores,
721            available_disk_space: self.available_disk_space,
722            available_network_bandwidth: self.available_network_bandwidth,
723            reserved_memory: self.reserved_memory,
724            reserved_cpu_cores: self.reserved_cpu_cores,
725            reserved_disk_space: self.reserved_disk_space,
726            reserved_network_bandwidth: self.reserved_network_bandwidth,
727        }
728    }
729}
730
731/// Resource system status
732#[derive(Debug, Clone)]
733pub struct ResourceSystemStatus {
734    pub total_memory: usize,
735    pub available_memory: usize,
736    pub total_cpu_cores: u32,
737    pub available_cpu_cores: u32,
738    pub total_disk_space: usize,
739    pub available_disk_space: usize,
740    pub total_network_bandwidth: usize,
741    pub available_network_bandwidth: usize,
742    pub active_allocations: usize,
743    pub last_updated: SystemTime,
744}
745
746/// Resource information including reservations
747#[derive(Debug, Clone)]
748pub struct ResourceInfo {
749    pub available_memory: usize,
750    pub available_cpu_cores: u32,
751    pub available_disk_space: usize,
752    pub available_network_bandwidth: usize,
753    pub reserved_memory: usize,
754    pub reserved_cpu_cores: u32,
755    pub reserved_disk_space: usize,
756    pub reserved_network_bandwidth: usize,
757}
758
759/// Resource violations
760#[derive(Debug, Clone)]
761pub enum ResourceViolation {
762    Memory { used: usize, limit: usize },
763    Cpu { used: f32, limit: f32 },
764    DiskIo { used: usize, limit: usize },
765    NetworkIo { used: usize, limit: usize },
766}
767
768/// Monitoring events for internal processing
769#[derive(Debug, Clone)]
770enum MonitoringEvent {
771    UsageUpdate {
772        agent_id: AgentId,
773        usage: ResourceUsage,
774    },
775    AllocationRequest {
776        agent_id: AgentId,
777        requirements: ResourceRequirements,
778    },
779    DeallocationRequest {
780        agent_id: AgentId,
781    },
782    LimitViolation {
783        agent_id: AgentId,
784        violations: Vec<ResourceViolation>,
785    },
786}
787
788#[cfg(test)]
789mod tests {
790    use super::*;
791
792    fn create_test_requirements() -> ResourceRequirements {
793        ResourceRequirements {
794            min_memory_mb: 1,
795            max_memory_mb: 1,
796            min_cpu_cores: 1.0,
797            max_cpu_cores: 1.0,
798            disk_space_mb: 1,
799            network_bandwidth_mbps: 1,
800        }
801    }
802
803    #[tokio::test]
804    async fn test_resource_allocation() {
805        let manager = DefaultResourceManager::new(ResourceManagerConfig::default())
806            .await
807            .unwrap();
808        let agent_id = AgentId::new();
809        let requirements = create_test_requirements();
810
811        let allocation = manager
812            .allocate_resources(agent_id, requirements)
813            .await
814            .unwrap();
815        assert_eq!(allocation.allocated_memory, 1024 * 1024);
816        assert_eq!(allocation.allocated_cpu_cores, 1.0);
817    }
818
819    #[tokio::test]
820    async fn test_resource_deallocation() {
821        let manager = DefaultResourceManager::new(ResourceManagerConfig::default())
822            .await
823            .unwrap();
824        let agent_id = AgentId::new();
825        let requirements = create_test_requirements();
826
827        manager
828            .allocate_resources(agent_id, requirements)
829            .await
830            .unwrap();
831        let result = manager.deallocate_resources(agent_id).await;
832        assert!(result.is_ok());
833    }
834
835    #[tokio::test]
836    async fn test_usage_tracking() {
837        let manager = DefaultResourceManager::new(ResourceManagerConfig::default())
838            .await
839            .unwrap();
840        let agent_id = AgentId::new();
841        let requirements = create_test_requirements();
842
843        manager
844            .allocate_resources(agent_id, requirements)
845            .await
846            .unwrap();
847
848        let usage = ResourceUsage {
849            memory_used: 512 * 1024, // 512KB
850            cpu_utilization: 0.5,
851            disk_io_rate: 512 * 1024,
852            network_io_rate: 512,
853            uptime: Duration::from_secs(60),
854        };
855
856        manager.update_usage(agent_id, usage.clone()).await.unwrap();
857
858        tokio::time::sleep(Duration::from_millis(20)).await;
859
860        let retrieved_usage = manager.get_usage(agent_id).await.unwrap();
861        assert_eq!(retrieved_usage.memory_used, usage.memory_used);
862        assert_eq!(retrieved_usage.cpu_utilization, usage.cpu_utilization);
863    }
864
865    #[tokio::test]
866    async fn test_system_status() {
867        let manager = DefaultResourceManager::new(ResourceManagerConfig::default())
868            .await
869            .unwrap();
870        let status = manager.get_system_status().await;
871
872        assert!(status.total_memory > 0);
873        assert!(status.available_memory <= status.total_memory);
874        assert!(status.total_cpu_cores > 0);
875        assert!(status.available_cpu_cores <= status.total_cpu_cores);
876    }
877
878    #[test]
879    fn test_resource_violations() {
880        let usage = ResourceUsage {
881            memory_used: 2 * 1024 * 1024, // 2MB
882            cpu_utilization: 2.0,
883            disk_io_rate: 2 * 1024 * 1024,
884            network_io_rate: 2 * 1024 * 1024, // 2MB to exceed 1MB limit
885            uptime: Duration::from_secs(60),
886        };
887
888        let limits = ResourceLimits {
889            memory_mb: 1,
890            cpu_cores: 1.0,
891            disk_io_mbps: 1,
892            network_io_mbps: 1,
893            execution_timeout: Duration::from_secs(3600),
894            idle_timeout: Duration::from_secs(300),
895        };
896
897        let violations = DefaultResourceManager::check_resource_violations(&usage, &limits);
898        assert_eq!(violations.len(), 4); // All resources exceeded
899    }
900}