1use async_trait::async_trait;
6use parking_lot::RwLock;
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, SystemTime};
10use tokio::sync::{mpsc, Notify};
11use tokio::time::interval;
12
13use crate::integrations::policy_engine::{
14 PolicyEnforcementFactory, PolicyEnforcementPoint, ResourceAccessConfig,
15 ResourceAllocationRequest,
16};
17use crate::types::*;
18
19#[async_trait]
21pub trait ResourceManager {
22 async fn allocate_resources(
24 &self,
25 agent_id: AgentId,
26 requirements: ResourceRequirements,
27 ) -> Result<ResourceAllocation, ResourceError>;
28
29 async fn deallocate_resources(&self, agent_id: AgentId) -> Result<(), ResourceError>;
31
32 async fn update_usage(
34 &self,
35 agent_id: AgentId,
36 usage: ResourceUsage,
37 ) -> Result<(), ResourceError>;
38
39 async fn get_usage(&self, agent_id: AgentId) -> Result<ResourceUsage, ResourceError>;
41
42 async fn get_system_status(&self) -> ResourceSystemStatus;
44
45 async fn set_limits(
47 &self,
48 agent_id: AgentId,
49 limits: ResourceLimits,
50 ) -> Result<(), ResourceError>;
51
52 async fn check_limits(&self, agent_id: AgentId) -> Result<bool, ResourceError>;
54
55 async fn check_resource_violations(
57 &self,
58 agent_id: AgentId,
59 ) -> Result<Vec<ResourceViolation>, ResourceError>;
60
61 async fn shutdown(&self) -> Result<(), ResourceError>;
63
64 async fn check_health(&self) -> Result<ComponentHealth, ResourceError>;
66}
67
68#[derive(Debug, Clone)]
70pub struct ResourceManagerConfig {
71 pub total_memory: usize,
72 pub total_cpu_cores: u32,
73 pub total_disk_space: usize,
74 pub total_network_bandwidth: usize,
75 pub monitoring_interval: Duration,
76 pub enforcement_enabled: bool,
77 pub auto_scaling_enabled: bool,
78 pub resource_reservation_percentage: f32,
79 pub policy_enforcement_config: ResourceAccessConfig,
80}
81
82impl Default for ResourceManagerConfig {
83 fn default() -> Self {
84 Self {
85 total_memory: 16 * 1024 * 1024 * 1024, total_cpu_cores: 8,
87 total_disk_space: 1024 * 1024 * 1024 * 1024, total_network_bandwidth: 1000 * 1024 * 1024, monitoring_interval: Duration::from_secs(5),
90 enforcement_enabled: true,
91 auto_scaling_enabled: false,
92 resource_reservation_percentage: 0.1, policy_enforcement_config: ResourceAccessConfig::default(),
94 }
95 }
96}
97
98pub struct DefaultResourceManager {
100 config: ResourceManagerConfig,
101 allocations: Arc<RwLock<HashMap<AgentId, ResourceAllocation>>>,
102 usage_tracker: Arc<RwLock<HashMap<AgentId, ResourceUsage>>>,
103 system_resources: Arc<RwLock<SystemResources>>,
104 monitoring_sender: mpsc::UnboundedSender<MonitoringEvent>,
105 shutdown_notify: Arc<Notify>,
106 is_running: Arc<RwLock<bool>>,
107 policy_enforcement: Arc<dyn PolicyEnforcementPoint>,
108}
109
110impl DefaultResourceManager {
111 pub async fn new(config: ResourceManagerConfig) -> Result<Self, ResourceError> {
113 let allocations = Arc::new(RwLock::new(HashMap::new()));
114 let usage_tracker = Arc::new(RwLock::new(HashMap::new()));
115 let system_resources = Arc::new(RwLock::new(SystemResources::new(&config)));
116 let (monitoring_sender, monitoring_receiver) = mpsc::unbounded_channel();
117 let shutdown_notify = Arc::new(Notify::new());
118 let is_running = Arc::new(RwLock::new(true));
119
120 let policy_enforcement = PolicyEnforcementFactory::create_enforcement_point(
122 config.policy_enforcement_config.clone(),
123 )
124 .await
125 .map_err(|e| {
126 ResourceError::PolicyError(format!("Failed to create policy enforcement: {}", e))
127 })?;
128
129 let manager = Self {
130 config,
131 allocations,
132 usage_tracker,
133 system_resources,
134 monitoring_sender,
135 shutdown_notify,
136 is_running,
137 policy_enforcement,
138 };
139
140 manager.start_monitoring_loop(monitoring_receiver).await;
142 manager.start_enforcement_loop().await;
143
144 Ok(manager)
145 }
146
147 async fn start_monitoring_loop(
149 &self,
150 mut monitoring_receiver: mpsc::UnboundedReceiver<MonitoringEvent>,
151 ) {
152 let usage_tracker = self.usage_tracker.clone();
153 let allocations = self.allocations.clone();
154 let system_resources = self.system_resources.clone();
155 let shutdown_notify = self.shutdown_notify.clone();
156
157 tokio::spawn(async move {
158 loop {
159 tokio::select! {
160 event = monitoring_receiver.recv() => {
161 if let Some(event) = event {
162 Self::process_monitoring_event(event, &usage_tracker, &allocations, &system_resources).await;
163 } else {
164 break;
165 }
166 }
167 _ = shutdown_notify.notified() => {
168 break;
169 }
170 }
171 }
172 });
173 }
174
175 async fn start_enforcement_loop(&self) {
177 let usage_tracker = self.usage_tracker.clone();
178 let allocations = self.allocations.clone();
179 let monitoring_sender = self.monitoring_sender.clone();
180 let shutdown_notify = self.shutdown_notify.clone();
181 let is_running = self.is_running.clone();
182 let monitoring_interval = self.config.monitoring_interval;
183 let enforcement_enabled = self.config.enforcement_enabled;
184
185 tokio::spawn(async move {
186 let mut interval = interval(monitoring_interval);
187
188 loop {
189 tokio::select! {
190 _ = interval.tick() => {
191 if !*is_running.read() {
192 break;
193 }
194
195 if enforcement_enabled {
196 Self::enforce_resource_limits(&usage_tracker, &allocations, &monitoring_sender).await;
197 }
198 }
199 _ = shutdown_notify.notified() => {
200 break;
201 }
202 }
203 }
204 });
205 }
206
207 async fn process_monitoring_event(
209 event: MonitoringEvent,
210 usage_tracker: &Arc<RwLock<HashMap<AgentId, ResourceUsage>>>,
211 allocations: &Arc<RwLock<HashMap<AgentId, ResourceAllocation>>>,
212 system_resources: &Arc<RwLock<SystemResources>>,
213 ) {
214 match event {
215 MonitoringEvent::UsageUpdate { agent_id, usage } => {
216 usage_tracker.write().insert(agent_id, usage.clone());
217
218 system_resources.write().update_usage(&usage);
220
221 tracing::debug!("Updated resource usage for agent {}: {:?}", agent_id, usage);
222 }
223 MonitoringEvent::AllocationRequest {
224 agent_id,
225 requirements,
226 } => {
227 let mut system = system_resources.write();
228 if system.can_allocate(&requirements) {
229 let allocation = system.allocate(&requirements);
230 allocations.write().insert(agent_id, allocation.clone());
231
232 tracing::info!(
233 "Allocated resources for agent {}: {:?}",
234 agent_id,
235 allocation
236 );
237 } else {
238 tracing::warn!(
239 "Cannot allocate resources for agent {}: insufficient resources",
240 agent_id
241 );
242 }
243 }
244 MonitoringEvent::DeallocationRequest { agent_id } => {
245 if let Some(allocation) = allocations.write().remove(&agent_id) {
246 system_resources.write().deallocate(&allocation);
247 usage_tracker.write().remove(&agent_id);
248
249 tracing::info!("Deallocated resources for agent {}", agent_id);
250 }
251 }
252 MonitoringEvent::LimitViolation {
253 agent_id,
254 violations,
255 } => {
256 tracing::warn!(
259 "Resource limit violation detected for agent {}: {:?}",
260 agent_id,
261 violations
262 );
263 }
264 }
265 }
266
267 async fn enforce_resource_limits(
269 usage_tracker: &Arc<RwLock<HashMap<AgentId, ResourceUsage>>>,
270 allocations: &Arc<RwLock<HashMap<AgentId, ResourceAllocation>>>,
271 monitoring_sender: &mpsc::UnboundedSender<MonitoringEvent>,
272 ) {
273 let usage_map = usage_tracker.read();
274 let allocations_map = allocations.read();
275
276 for (agent_id, usage) in usage_map.iter() {
277 if let Some(allocation) = allocations_map.get(agent_id) {
278 let limits = ResourceLimits {
280 memory_mb: allocation.allocated_memory / (1024 * 1024),
281 cpu_cores: allocation.allocated_cpu_cores,
282 disk_io_mbps: allocation.allocated_disk_io / (1024 * 1024),
283 network_io_mbps: allocation.allocated_network_io / (1024 * 1024),
284 execution_timeout: Duration::from_secs(3600),
285 idle_timeout: Duration::from_secs(300),
286 };
287 let violations = Self::check_resource_violations(usage, &limits);
288
289 if !violations.is_empty() {
290 tracing::warn!(
291 "Agent {} violated resource limits: {:?}",
292 agent_id,
293 violations
294 );
295
296 let _ = monitoring_sender.send(MonitoringEvent::LimitViolation {
298 agent_id: *agent_id,
299 violations,
300 });
301 }
302 }
303 }
304 }
305
306 fn check_resource_violations(
308 usage: &ResourceUsage,
309 limits: &ResourceLimits,
310 ) -> Vec<ResourceViolation> {
311 let mut violations = Vec::new();
312
313 if usage.memory_used > limits.memory_mb * 1024 * 1024 {
314 violations.push(ResourceViolation::Memory {
315 used: usage.memory_used,
316 limit: limits.memory_mb * 1024 * 1024,
317 });
318 }
319
320 if usage.cpu_utilization > limits.cpu_cores {
321 violations.push(ResourceViolation::Cpu {
322 used: usage.cpu_utilization,
323 limit: limits.cpu_cores,
324 });
325 }
326
327 if usage.disk_io_rate > limits.disk_io_mbps * 1024 * 1024 {
328 violations.push(ResourceViolation::DiskIo {
329 used: usage.disk_io_rate,
330 limit: limits.disk_io_mbps * 1024 * 1024,
331 });
332 }
333
334 if usage.network_io_rate > limits.network_io_mbps * 1024 * 1024 {
335 violations.push(ResourceViolation::NetworkIo {
336 used: usage.network_io_rate,
337 limit: limits.network_io_mbps * 1024 * 1024,
338 });
339 }
340
341 violations
342 }
343
344 fn send_monitoring_event(&self, event: MonitoringEvent) -> Result<(), ResourceError> {
346 self.monitoring_sender.send(event).map_err(|_| {
347 ResourceError::MonitoringFailed("Failed to send monitoring event".to_string())
348 })
349 }
350}
351
352#[async_trait]
353impl ResourceManager for DefaultResourceManager {
354 async fn allocate_resources(
355 &self,
356 agent_id: AgentId,
357 requirements: ResourceRequirements,
358 ) -> Result<ResourceAllocation, ResourceError> {
359 if !*self.is_running.read() {
360 return Err(ResourceError::ShuttingDown);
361 }
362
363 if self.allocations.read().contains_key(&agent_id) {
365 return Err(ResourceError::AllocationExists { agent_id });
366 }
367
368 let allocation_request = ResourceAllocationRequest {
370 agent_id,
371 requirements: requirements.clone(),
372 priority: Priority::Normal,
373 justification: None,
374 max_duration: None,
375 timestamp: SystemTime::now(),
376 };
377
378 let policy_decision = self
379 .policy_enforcement
380 .validate_resource_allocation(agent_id, &allocation_request)
381 .await
382 .map_err(|e| ResourceError::PolicyError(format!("Policy validation failed: {}", e)))?;
383
384 let final_requirements = match policy_decision.decision {
385 crate::integrations::policy_engine::AllocationResult::Approve => requirements,
386 crate::integrations::policy_engine::AllocationResult::Deny => {
387 return Err(ResourceError::PolicyViolation {
388 reason: policy_decision.reason.into(),
389 });
390 }
391 crate::integrations::policy_engine::AllocationResult::Modified => {
392 policy_decision
394 .modified_requirements
395 .unwrap_or(requirements)
396 }
397 crate::integrations::policy_engine::AllocationResult::Queued => {
398 return Err(ResourceError::AllocationQueued {
399 reason: policy_decision.reason.into(),
400 });
401 }
402 crate::integrations::policy_engine::AllocationResult::Escalate => {
403 return Err(ResourceError::EscalationRequired {
404 reason: policy_decision.reason.into(),
405 });
406 }
407 };
408
409 self.send_monitoring_event(MonitoringEvent::AllocationRequest {
411 agent_id,
412 requirements: final_requirements.clone(),
413 })?;
414
415 tokio::time::sleep(Duration::from_millis(10)).await;
417
418 self.allocations.read().get(&agent_id).cloned().ok_or(
420 ResourceError::InsufficientResources {
421 requirements: "Insufficient system resources".into(),
422 },
423 )
424 }
425
426 async fn deallocate_resources(&self, agent_id: AgentId) -> Result<(), ResourceError> {
427 self.send_monitoring_event(MonitoringEvent::DeallocationRequest { agent_id })?;
428
429 tokio::time::sleep(Duration::from_millis(10)).await;
431
432 Ok(())
433 }
434
435 async fn update_usage(
436 &self,
437 agent_id: AgentId,
438 usage: ResourceUsage,
439 ) -> Result<(), ResourceError> {
440 self.send_monitoring_event(MonitoringEvent::UsageUpdate { agent_id, usage })?;
441 Ok(())
442 }
443
444 async fn get_usage(&self, agent_id: AgentId) -> Result<ResourceUsage, ResourceError> {
445 self.usage_tracker
446 .read()
447 .get(&agent_id)
448 .cloned()
449 .ok_or(ResourceError::AgentNotFound { agent_id })
450 }
451
452 async fn get_system_status(&self) -> ResourceSystemStatus {
453 let system = self.system_resources.read();
454 let allocations_count = self.allocations.read().len();
455
456 let resource_info = system.get_resource_info();
458
459 ResourceSystemStatus {
460 total_memory: self.config.total_memory,
461 available_memory: resource_info.available_memory,
462 total_cpu_cores: self.config.total_cpu_cores,
463 available_cpu_cores: resource_info.available_cpu_cores,
464 total_disk_space: self.config.total_disk_space,
465 available_disk_space: resource_info.available_disk_space,
466 total_network_bandwidth: self.config.total_network_bandwidth,
467 available_network_bandwidth: resource_info.available_network_bandwidth,
468 active_allocations: allocations_count,
469 last_updated: SystemTime::now(),
470 }
471 }
472
473 async fn set_limits(
474 &self,
475 agent_id: AgentId,
476 limits: ResourceLimits,
477 ) -> Result<(), ResourceError> {
478 let mut allocations = self.allocations.write();
479 if let Some(allocation) = allocations.get_mut(&agent_id) {
480 allocation.allocated_memory = limits.memory_mb * 1024 * 1024;
482 allocation.allocated_cpu_cores = limits.cpu_cores;
483 allocation.allocated_disk_io = limits.disk_io_mbps * 1024 * 1024;
484 allocation.allocated_network_io = limits.network_io_mbps * 1024 * 1024;
485 Ok(())
486 } else {
487 Err(ResourceError::AgentNotFound { agent_id })
488 }
489 }
490
491 async fn check_limits(&self, agent_id: AgentId) -> Result<bool, ResourceError> {
492 let usage_map = self.usage_tracker.read();
493 let allocations_map = self.allocations.read();
494
495 if let Some(allocation) = allocations_map.get(&agent_id) {
497 if let Some(usage) = usage_map.get(&agent_id) {
499 let limits = ResourceLimits {
500 memory_mb: allocation.allocated_memory / (1024 * 1024),
501 cpu_cores: allocation.allocated_cpu_cores,
502 disk_io_mbps: allocation.allocated_disk_io / (1024 * 1024),
503 network_io_mbps: allocation.allocated_network_io / (1024 * 1024),
504 execution_timeout: Duration::from_secs(3600),
505 idle_timeout: Duration::from_secs(300),
506 };
507 let violations = Self::check_resource_violations(usage, &limits);
508 Ok(violations.is_empty())
509 } else {
510 Ok(true)
512 }
513 } else {
514 Err(ResourceError::AgentNotFound { agent_id })
515 }
516 }
517
518 async fn check_resource_violations(
519 &self,
520 agent_id: AgentId,
521 ) -> Result<Vec<ResourceViolation>, ResourceError> {
522 let usage_map = self.usage_tracker.read();
523 let allocations_map = self.allocations.read();
524
525 if let (Some(usage), Some(allocation)) =
526 (usage_map.get(&agent_id), allocations_map.get(&agent_id))
527 {
528 let limits = ResourceLimits {
530 memory_mb: allocation.allocated_memory / (1024 * 1024),
531 cpu_cores: allocation.allocated_cpu_cores,
532 disk_io_mbps: allocation.allocated_disk_io / (1024 * 1024),
533 network_io_mbps: allocation.allocated_network_io / (1024 * 1024),
534 execution_timeout: Duration::from_secs(3600),
535 idle_timeout: Duration::from_secs(300),
536 };
537 Ok(Self::check_resource_violations(usage, &limits))
538 } else {
539 Err(ResourceError::AgentNotFound { agent_id })
540 }
541 }
542
543 async fn shutdown(&self) -> Result<(), ResourceError> {
544 tracing::info!("Shutting down resource manager");
545
546 *self.is_running.write() = false;
547 self.shutdown_notify.notify_waiters();
548
549 let agent_ids: Vec<AgentId> = self.allocations.read().keys().copied().collect();
551
552 for agent_id in agent_ids {
553 if let Err(e) = self.deallocate_resources(agent_id).await {
554 tracing::error!(
555 "Failed to deallocate resources for agent {} during shutdown: {}",
556 agent_id,
557 e
558 );
559 }
560 }
561
562 Ok(())
563 }
564
565 async fn check_health(&self) -> Result<ComponentHealth, ResourceError> {
566 let is_running = *self.is_running.read();
567 if !is_running {
568 return Ok(ComponentHealth::unhealthy(
569 "Resource manager is shut down".to_string(),
570 ));
571 }
572
573 let system_status = self.get_system_status().await;
574 let allocations_count = self.allocations.read().len();
575
576 let memory_usage = if system_status.total_memory > 0 {
578 (system_status.total_memory - system_status.available_memory) as f64
579 / system_status.total_memory as f64
580 } else {
581 0.0
582 };
583
584 let cpu_usage = if system_status.total_cpu_cores > 0 {
585 (system_status.total_cpu_cores - system_status.available_cpu_cores) as f64
586 / system_status.total_cpu_cores as f64
587 } else {
588 0.0
589 };
590
591 let status = if memory_usage > 0.9 || cpu_usage > 0.9 {
592 ComponentHealth::unhealthy(format!(
593 "Critical resource usage - Memory: {:.1}%, CPU: {:.1}%",
594 memory_usage * 100.0,
595 cpu_usage * 100.0
596 ))
597 } else if memory_usage > 0.8 || cpu_usage > 0.8 {
598 ComponentHealth::degraded(format!(
599 "High resource usage - Memory: {:.1}%, CPU: {:.1}%",
600 memory_usage * 100.0,
601 cpu_usage * 100.0
602 ))
603 } else {
604 ComponentHealth::healthy(Some(format!(
605 "Resources available - Memory: {:.1}%, CPU: {:.1}%, {} active allocations",
606 memory_usage * 100.0,
607 cpu_usage * 100.0,
608 allocations_count
609 )))
610 };
611
612 Ok(status
613 .with_metric(
614 "memory_usage_percent".to_string(),
615 format!("{:.2}", memory_usage * 100.0),
616 )
617 .with_metric(
618 "cpu_usage_percent".to_string(),
619 format!("{:.2}", cpu_usage * 100.0),
620 )
621 .with_metric(
622 "active_allocations".to_string(),
623 allocations_count.to_string(),
624 )
625 .with_metric(
626 "available_memory_mb".to_string(),
627 (system_status.available_memory / (1024 * 1024)).to_string(),
628 )
629 .with_metric(
630 "available_cpu_cores".to_string(),
631 system_status.available_cpu_cores.to_string(),
632 ))
633 }
634}
635
636#[derive(Debug, Clone)]
638struct SystemResources {
639 available_memory: usize,
640 available_cpu_cores: u32,
641 available_disk_space: usize,
642 available_network_bandwidth: usize,
643 reserved_memory: usize,
644 reserved_cpu_cores: u32,
645 reserved_disk_space: usize,
646 reserved_network_bandwidth: usize,
647}
648
649impl SystemResources {
650 fn new(config: &ResourceManagerConfig) -> Self {
651 let reservation_factor = config.resource_reservation_percentage;
652
653 let reserved_cpu = if reservation_factor > 0.0 {
655 ((config.total_cpu_cores as f32 * reservation_factor).ceil() as u32).max(1)
656 } else {
657 0
658 };
659 let available_cpu = config.total_cpu_cores.saturating_sub(reserved_cpu);
660
661 Self {
662 available_memory: config.total_memory
663 - (config.total_memory as f32 * reservation_factor) as usize,
664 available_cpu_cores: available_cpu,
665 available_disk_space: config.total_disk_space
666 - (config.total_disk_space as f32 * reservation_factor) as usize,
667 available_network_bandwidth: config.total_network_bandwidth
668 - (config.total_network_bandwidth as f32 * reservation_factor) as usize,
669 reserved_memory: (config.total_memory as f32 * reservation_factor) as usize,
670 reserved_cpu_cores: reserved_cpu,
671 reserved_disk_space: (config.total_disk_space as f32 * reservation_factor) as usize,
672 reserved_network_bandwidth: (config.total_network_bandwidth as f32 * reservation_factor)
673 as usize,
674 }
675 }
676
677 fn can_allocate(&self, requirements: &ResourceRequirements) -> bool {
678 self.available_memory >= requirements.max_memory_mb * 1024 * 1024
679 && self.available_cpu_cores >= requirements.max_cpu_cores as u32
680 && self.available_disk_space >= requirements.disk_space_mb * 1024 * 1024
681 && self.available_network_bandwidth >= requirements.network_bandwidth_mbps * 1024 * 1024
682 }
683
684 fn allocate(&mut self, requirements: &ResourceRequirements) -> ResourceAllocation {
685 let memory_bytes = requirements.max_memory_mb * 1024 * 1024;
686 let disk_bytes = requirements.disk_space_mb * 1024 * 1024;
687 let network_bytes = requirements.network_bandwidth_mbps * 1024 * 1024;
688
689 self.available_memory -= memory_bytes;
690 self.available_cpu_cores -= requirements.max_cpu_cores as u32;
691 self.available_disk_space -= disk_bytes;
692 self.available_network_bandwidth -= network_bytes;
693
694 ResourceAllocation {
695 agent_id: AgentId::new(), allocated_memory: memory_bytes,
697 allocated_cpu_cores: requirements.max_cpu_cores,
698 allocated_disk_io: disk_bytes,
699 allocated_network_io: network_bytes,
700 allocation_time: SystemTime::now(),
701 }
702 }
703
704 fn deallocate(&mut self, allocation: &ResourceAllocation) {
705 self.available_memory += allocation.allocated_memory;
706 self.available_cpu_cores += allocation.allocated_cpu_cores as u32;
707 self.available_disk_space += allocation.allocated_disk_io;
708 self.available_network_bandwidth += allocation.allocated_network_io;
709 }
710
711 fn update_usage(&mut self, _usage: &ResourceUsage) {
712 }
715
716 fn get_resource_info(&self) -> ResourceInfo {
718 ResourceInfo {
719 available_memory: self.available_memory,
720 available_cpu_cores: self.available_cpu_cores,
721 available_disk_space: self.available_disk_space,
722 available_network_bandwidth: self.available_network_bandwidth,
723 reserved_memory: self.reserved_memory,
724 reserved_cpu_cores: self.reserved_cpu_cores,
725 reserved_disk_space: self.reserved_disk_space,
726 reserved_network_bandwidth: self.reserved_network_bandwidth,
727 }
728 }
729}
730
731#[derive(Debug, Clone)]
733pub struct ResourceSystemStatus {
734 pub total_memory: usize,
735 pub available_memory: usize,
736 pub total_cpu_cores: u32,
737 pub available_cpu_cores: u32,
738 pub total_disk_space: usize,
739 pub available_disk_space: usize,
740 pub total_network_bandwidth: usize,
741 pub available_network_bandwidth: usize,
742 pub active_allocations: usize,
743 pub last_updated: SystemTime,
744}
745
746#[derive(Debug, Clone)]
748pub struct ResourceInfo {
749 pub available_memory: usize,
750 pub available_cpu_cores: u32,
751 pub available_disk_space: usize,
752 pub available_network_bandwidth: usize,
753 pub reserved_memory: usize,
754 pub reserved_cpu_cores: u32,
755 pub reserved_disk_space: usize,
756 pub reserved_network_bandwidth: usize,
757}
758
759#[derive(Debug, Clone)]
761pub enum ResourceViolation {
762 Memory { used: usize, limit: usize },
763 Cpu { used: f32, limit: f32 },
764 DiskIo { used: usize, limit: usize },
765 NetworkIo { used: usize, limit: usize },
766}
767
768#[derive(Debug, Clone)]
770enum MonitoringEvent {
771 UsageUpdate {
772 agent_id: AgentId,
773 usage: ResourceUsage,
774 },
775 AllocationRequest {
776 agent_id: AgentId,
777 requirements: ResourceRequirements,
778 },
779 DeallocationRequest {
780 agent_id: AgentId,
781 },
782 LimitViolation {
783 agent_id: AgentId,
784 violations: Vec<ResourceViolation>,
785 },
786}
787
788#[cfg(test)]
789mod tests {
790 use super::*;
791
792 fn create_test_requirements() -> ResourceRequirements {
793 ResourceRequirements {
794 min_memory_mb: 1,
795 max_memory_mb: 1,
796 min_cpu_cores: 1.0,
797 max_cpu_cores: 1.0,
798 disk_space_mb: 1,
799 network_bandwidth_mbps: 1,
800 }
801 }
802
803 #[tokio::test]
804 async fn test_resource_allocation() {
805 let manager = DefaultResourceManager::new(ResourceManagerConfig::default())
806 .await
807 .unwrap();
808 let agent_id = AgentId::new();
809 let requirements = create_test_requirements();
810
811 let allocation = manager
812 .allocate_resources(agent_id, requirements)
813 .await
814 .unwrap();
815 assert_eq!(allocation.allocated_memory, 1024 * 1024);
816 assert_eq!(allocation.allocated_cpu_cores, 1.0);
817 }
818
819 #[tokio::test]
820 async fn test_resource_deallocation() {
821 let manager = DefaultResourceManager::new(ResourceManagerConfig::default())
822 .await
823 .unwrap();
824 let agent_id = AgentId::new();
825 let requirements = create_test_requirements();
826
827 manager
828 .allocate_resources(agent_id, requirements)
829 .await
830 .unwrap();
831 let result = manager.deallocate_resources(agent_id).await;
832 assert!(result.is_ok());
833 }
834
835 #[tokio::test]
836 async fn test_usage_tracking() {
837 let manager = DefaultResourceManager::new(ResourceManagerConfig::default())
838 .await
839 .unwrap();
840 let agent_id = AgentId::new();
841 let requirements = create_test_requirements();
842
843 manager
844 .allocate_resources(agent_id, requirements)
845 .await
846 .unwrap();
847
848 let usage = ResourceUsage {
849 memory_used: 512 * 1024, cpu_utilization: 0.5,
851 disk_io_rate: 512 * 1024,
852 network_io_rate: 512,
853 uptime: Duration::from_secs(60),
854 };
855
856 manager.update_usage(agent_id, usage.clone()).await.unwrap();
857
858 tokio::time::sleep(Duration::from_millis(20)).await;
859
860 let retrieved_usage = manager.get_usage(agent_id).await.unwrap();
861 assert_eq!(retrieved_usage.memory_used, usage.memory_used);
862 assert_eq!(retrieved_usage.cpu_utilization, usage.cpu_utilization);
863 }
864
865 #[tokio::test]
866 async fn test_system_status() {
867 let manager = DefaultResourceManager::new(ResourceManagerConfig::default())
868 .await
869 .unwrap();
870 let status = manager.get_system_status().await;
871
872 assert!(status.total_memory > 0);
873 assert!(status.available_memory <= status.total_memory);
874 assert!(status.total_cpu_cores > 0);
875 assert!(status.available_cpu_cores <= status.total_cpu_cores);
876 }
877
878 #[test]
879 fn test_resource_violations() {
880 let usage = ResourceUsage {
881 memory_used: 2 * 1024 * 1024, cpu_utilization: 2.0,
883 disk_io_rate: 2 * 1024 * 1024,
884 network_io_rate: 2 * 1024 * 1024, uptime: Duration::from_secs(60),
886 };
887
888 let limits = ResourceLimits {
889 memory_mb: 1,
890 cpu_cores: 1.0,
891 disk_io_mbps: 1,
892 network_io_mbps: 1,
893 execution_timeout: Duration::from_secs(3600),
894 idle_timeout: Duration::from_secs(300),
895 };
896
897 let violations = DefaultResourceManager::check_resource_violations(&usage, &limits);
898 assert_eq!(violations.len(), 4); }
900}