1use async_trait::async_trait;
6use parking_lot::RwLock;
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, SystemTime};
10use tokio::sync::{mpsc, Notify};
11use tokio::time::interval;
12
13use crate::integrations::policy_engine::{
14 PolicyEnforcementFactory, PolicyEnforcementPoint, ResourceAccessConfig,
15 ResourceAllocationRequest,
16};
17use crate::types::*;
18
19#[async_trait]
21pub trait ResourceManager {
22 async fn allocate_resources(
24 &self,
25 agent_id: AgentId,
26 requirements: ResourceRequirements,
27 ) -> Result<ResourceAllocation, ResourceError>;
28
29 async fn deallocate_resources(&self, agent_id: AgentId) -> Result<(), ResourceError>;
31
32 async fn update_usage(
34 &self,
35 agent_id: AgentId,
36 usage: ResourceUsage,
37 ) -> Result<(), ResourceError>;
38
39 async fn get_usage(&self, agent_id: AgentId) -> Result<ResourceUsage, ResourceError>;
41
42 async fn get_system_status(&self) -> ResourceSystemStatus;
44
45 async fn set_limits(
47 &self,
48 agent_id: AgentId,
49 limits: ResourceLimits,
50 ) -> Result<(), ResourceError>;
51
52 async fn check_limits(&self, agent_id: AgentId) -> Result<bool, ResourceError>;
54
55 async fn check_resource_violations(
57 &self,
58 agent_id: AgentId,
59 ) -> Result<Vec<ResourceViolation>, ResourceError>;
60
61 async fn shutdown(&self) -> Result<(), ResourceError>;
63
64 async fn check_health(&self) -> Result<ComponentHealth, ResourceError>;
66}
67
68#[derive(Debug, Clone)]
70pub struct ResourceManagerConfig {
71 pub total_memory: usize,
72 pub total_cpu_cores: u32,
73 pub total_disk_space: usize,
74 pub total_network_bandwidth: usize,
75 pub monitoring_interval: Duration,
76 pub enforcement_enabled: bool,
77 pub auto_scaling_enabled: bool,
78 pub resource_reservation_percentage: f32,
79 pub policy_enforcement_config: ResourceAccessConfig,
80 pub violation_action: ViolationAction,
88 pub kill_after_sustained_violations: u32,
92}
93
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96pub enum ViolationAction {
97 LogOnly,
101 Throttle,
104 Kill,
106}
107
108impl Default for ResourceManagerConfig {
109 fn default() -> Self {
110 Self {
111 total_memory: 16 * 1024 * 1024 * 1024, total_cpu_cores: 8,
113 total_disk_space: 1024 * 1024 * 1024 * 1024, total_network_bandwidth: 1000 * 1024 * 1024, monitoring_interval: Duration::from_secs(5),
116 enforcement_enabled: true,
117 auto_scaling_enabled: false,
118 resource_reservation_percentage: 0.1, policy_enforcement_config: ResourceAccessConfig::default(),
120 violation_action: ViolationAction::Throttle,
121 kill_after_sustained_violations: 5,
122 }
123 }
124}
125
126pub struct DefaultResourceManager {
128 config: ResourceManagerConfig,
129 allocations: Arc<RwLock<HashMap<AgentId, ResourceAllocation>>>,
130 usage_tracker: Arc<RwLock<HashMap<AgentId, ResourceUsage>>>,
131 system_resources: Arc<RwLock<SystemResources>>,
132 monitoring_sender: mpsc::UnboundedSender<MonitoringEvent>,
133 shutdown_notify: Arc<Notify>,
134 is_running: Arc<RwLock<bool>>,
135 policy_enforcement: Arc<dyn PolicyEnforcementPoint>,
136 consecutive_violations: Arc<RwLock<HashMap<AgentId, u32>>>,
140}
141
142impl DefaultResourceManager {
143 pub async fn new(config: ResourceManagerConfig) -> Result<Self, ResourceError> {
145 let allocations = Arc::new(RwLock::new(HashMap::new()));
146 let usage_tracker = Arc::new(RwLock::new(HashMap::new()));
147 let system_resources = Arc::new(RwLock::new(SystemResources::new(&config)));
148 let (monitoring_sender, monitoring_receiver) = mpsc::unbounded_channel();
149 let shutdown_notify = Arc::new(Notify::new());
150 let is_running = Arc::new(RwLock::new(true));
151
152 let policy_enforcement = PolicyEnforcementFactory::create_enforcement_point(
154 config.policy_enforcement_config.clone(),
155 )
156 .await
157 .map_err(|e| {
158 ResourceError::PolicyError(format!("Failed to create policy enforcement: {}", e))
159 })?;
160
161 let manager = Self {
162 config,
163 allocations,
164 usage_tracker,
165 system_resources,
166 monitoring_sender,
167 shutdown_notify,
168 is_running,
169 policy_enforcement,
170 consecutive_violations: Arc::new(RwLock::new(HashMap::new())),
171 };
172
173 manager.start_monitoring_loop(monitoring_receiver).await;
175 manager.start_enforcement_loop().await;
176
177 Ok(manager)
178 }
179
180 async fn start_monitoring_loop(
182 &self,
183 mut monitoring_receiver: mpsc::UnboundedReceiver<MonitoringEvent>,
184 ) {
185 let usage_tracker = self.usage_tracker.clone();
186 let allocations = self.allocations.clone();
187 let system_resources = self.system_resources.clone();
188 let shutdown_notify = self.shutdown_notify.clone();
189
190 tokio::spawn(async move {
191 loop {
192 tokio::select! {
193 event = monitoring_receiver.recv() => {
194 if let Some(event) = event {
195 Self::process_monitoring_event(event, &usage_tracker, &allocations, &system_resources).await;
196 } else {
197 break;
198 }
199 }
200 _ = shutdown_notify.notified() => {
201 break;
202 }
203 }
204 }
205 });
206 }
207
208 async fn start_enforcement_loop(&self) {
210 let usage_tracker = self.usage_tracker.clone();
211 let allocations = self.allocations.clone();
212 let monitoring_sender = self.monitoring_sender.clone();
213 let shutdown_notify = self.shutdown_notify.clone();
214 let is_running = self.is_running.clone();
215 let monitoring_interval = self.config.monitoring_interval;
216 let enforcement_enabled = self.config.enforcement_enabled;
217 let violation_action = self.config.violation_action;
218 let kill_after_sustained = self.config.kill_after_sustained_violations;
219 let consecutive_violations = self.consecutive_violations.clone();
220
221 tokio::spawn(async move {
222 let mut interval = interval(monitoring_interval);
223
224 loop {
225 tokio::select! {
226 _ = interval.tick() => {
227 if !*is_running.read() {
228 break;
229 }
230
231 if enforcement_enabled {
232 Self::enforce_resource_limits(
233 &usage_tracker,
234 &allocations,
235 &monitoring_sender,
236 violation_action,
237 kill_after_sustained,
238 &consecutive_violations,
239 )
240 .await;
241 }
242 }
243 _ = shutdown_notify.notified() => {
244 break;
245 }
246 }
247 }
248 });
249 }
250
251 async fn process_monitoring_event(
253 event: MonitoringEvent,
254 usage_tracker: &Arc<RwLock<HashMap<AgentId, ResourceUsage>>>,
255 allocations: &Arc<RwLock<HashMap<AgentId, ResourceAllocation>>>,
256 system_resources: &Arc<RwLock<SystemResources>>,
257 ) {
258 match event {
259 MonitoringEvent::UsageUpdate { agent_id, usage } => {
260 usage_tracker.write().insert(agent_id, usage.clone());
261
262 system_resources.write().update_usage(&usage);
264
265 tracing::debug!("Updated resource usage for agent {}: {:?}", agent_id, usage);
266 }
267 MonitoringEvent::AllocationRequest {
268 agent_id,
269 requirements,
270 } => {
271 let mut system = system_resources.write();
272 if system.can_allocate(&requirements) {
273 let allocation = system.allocate(&requirements);
274 allocations.write().insert(agent_id, allocation.clone());
275
276 tracing::info!(
277 "Allocated resources for agent {}: {:?}",
278 agent_id,
279 allocation
280 );
281 } else {
282 tracing::warn!(
283 "Cannot allocate resources for agent {}: insufficient resources",
284 agent_id
285 );
286 }
287 }
288 MonitoringEvent::DeallocationRequest { agent_id } => {
289 if let Some(allocation) = allocations.write().remove(&agent_id) {
290 system_resources.write().deallocate(&allocation);
291 usage_tracker.write().remove(&agent_id);
292
293 tracing::info!("Deallocated resources for agent {}", agent_id);
294 }
295 }
296 MonitoringEvent::LimitViolation {
297 agent_id,
298 violations,
299 } => {
300 tracing::warn!(
303 "Resource limit violation detected for agent {}: {:?}",
304 agent_id,
305 violations
306 );
307 }
308 MonitoringEvent::ThrottleRequested {
309 agent_id,
310 consecutive_violations,
311 violations,
312 } => {
313 tracing::warn!(
317 %agent_id,
318 consecutive_violations,
319 ?violations,
320 "ThrottleRequested event emitted — orchestrator should slow this agent"
321 );
322 }
323 MonitoringEvent::KillRequested {
324 agent_id,
325 violations,
326 reason,
327 } => {
328 tracing::error!(
332 %agent_id,
333 ?violations,
334 reason,
335 "KillRequested event emitted — orchestrator should terminate this agent"
336 );
337 }
338 }
339 }
340
341 async fn enforce_resource_limits(
359 usage_tracker: &Arc<RwLock<HashMap<AgentId, ResourceUsage>>>,
360 allocations: &Arc<RwLock<HashMap<AgentId, ResourceAllocation>>>,
361 monitoring_sender: &mpsc::UnboundedSender<MonitoringEvent>,
362 violation_action: ViolationAction,
363 kill_after_sustained: u32,
364 consecutive_violations: &Arc<RwLock<HashMap<AgentId, u32>>>,
365 ) {
366 let samples: Vec<(AgentId, ResourceUsage, ResourceAllocation)> = {
369 let usage_map = usage_tracker.read();
370 let allocations_map = allocations.read();
371 usage_map
372 .iter()
373 .filter_map(|(agent_id, usage)| {
374 allocations_map
375 .get(agent_id)
376 .map(|alloc| (*agent_id, usage.clone(), alloc.clone()))
377 })
378 .collect()
379 };
380
381 for (agent_id, usage, allocation) in samples {
382 let limits = ResourceLimits {
383 memory_mb: allocation.allocated_memory / (1024 * 1024),
384 cpu_cores: allocation.allocated_cpu_cores,
385 disk_io_mbps: allocation.allocated_disk_io / (1024 * 1024),
386 network_io_mbps: allocation.allocated_network_io / (1024 * 1024),
387 execution_timeout: Duration::from_secs(3600),
388 idle_timeout: Duration::from_secs(300),
389 };
390 let violations = Self::check_resource_violations(&usage, &limits);
391
392 if violations.is_empty() {
393 consecutive_violations.write().remove(&agent_id);
397 continue;
398 }
399
400 let new_count = {
403 let mut counters = consecutive_violations.write();
404 let entry = counters.entry(agent_id).or_insert(0);
405 *entry += 1;
406 *entry
407 };
408
409 tracing::warn!(
410 %agent_id,
411 violations = ?violations,
412 consecutive = new_count,
413 action = ?violation_action,
414 "Agent violated resource limits"
415 );
416
417 let _ = monitoring_sender.send(MonitoringEvent::LimitViolation {
419 agent_id,
420 violations: violations.clone(),
421 });
422
423 match violation_action {
424 ViolationAction::LogOnly => { }
425 ViolationAction::Kill => {
426 let _ = monitoring_sender.send(MonitoringEvent::KillRequested {
427 agent_id,
428 violations: violations.clone(),
429 reason: "ViolationAction::Kill configured",
430 });
431 }
432 ViolationAction::Throttle => {
433 if new_count >= kill_after_sustained {
434 let _ = monitoring_sender.send(MonitoringEvent::KillRequested {
435 agent_id,
436 violations: violations.clone(),
437 reason: "sustained limit violations exceeded threshold",
438 });
439 consecutive_violations.write().remove(&agent_id);
443 } else {
444 let _ = monitoring_sender.send(MonitoringEvent::ThrottleRequested {
445 agent_id,
446 consecutive_violations: new_count,
447 violations,
448 });
449 }
450 }
451 }
452 }
453 }
454
455 fn check_resource_violations(
457 usage: &ResourceUsage,
458 limits: &ResourceLimits,
459 ) -> Vec<ResourceViolation> {
460 let mut violations = Vec::new();
461
462 if usage.memory_used > limits.memory_mb * 1024 * 1024 {
463 violations.push(ResourceViolation::Memory {
464 used: usage.memory_used,
465 limit: limits.memory_mb * 1024 * 1024,
466 });
467 }
468
469 if usage.cpu_utilization > limits.cpu_cores {
470 violations.push(ResourceViolation::Cpu {
471 used: usage.cpu_utilization,
472 limit: limits.cpu_cores,
473 });
474 }
475
476 if usage.disk_io_rate > limits.disk_io_mbps * 1024 * 1024 {
477 violations.push(ResourceViolation::DiskIo {
478 used: usage.disk_io_rate,
479 limit: limits.disk_io_mbps * 1024 * 1024,
480 });
481 }
482
483 if usage.network_io_rate > limits.network_io_mbps * 1024 * 1024 {
484 violations.push(ResourceViolation::NetworkIo {
485 used: usage.network_io_rate,
486 limit: limits.network_io_mbps * 1024 * 1024,
487 });
488 }
489
490 violations
491 }
492
493 fn send_monitoring_event(&self, event: MonitoringEvent) -> Result<(), ResourceError> {
495 self.monitoring_sender.send(event).map_err(|_| {
496 ResourceError::MonitoringFailed("Failed to send monitoring event".to_string())
497 })
498 }
499}
500
501#[async_trait]
502impl ResourceManager for DefaultResourceManager {
503 async fn allocate_resources(
504 &self,
505 agent_id: AgentId,
506 requirements: ResourceRequirements,
507 ) -> Result<ResourceAllocation, ResourceError> {
508 if !*self.is_running.read() {
509 return Err(ResourceError::ShuttingDown);
510 }
511
512 if self.allocations.read().contains_key(&agent_id) {
514 return Err(ResourceError::AllocationExists { agent_id });
515 }
516
517 let allocation_request = ResourceAllocationRequest {
519 agent_id,
520 requirements: requirements.clone(),
521 priority: Priority::Normal,
522 justification: None,
523 max_duration: None,
524 timestamp: SystemTime::now(),
525 };
526
527 let policy_decision = self
528 .policy_enforcement
529 .validate_resource_allocation(agent_id, &allocation_request)
530 .await
531 .map_err(|e| ResourceError::PolicyError(format!("Policy validation failed: {}", e)))?;
532
533 let final_requirements = match policy_decision.decision {
534 crate::integrations::policy_engine::AllocationResult::Approve => requirements,
535 crate::integrations::policy_engine::AllocationResult::Deny => {
536 return Err(ResourceError::PolicyViolation {
537 reason: policy_decision.reason.into(),
538 });
539 }
540 crate::integrations::policy_engine::AllocationResult::Modified => {
541 policy_decision
543 .modified_requirements
544 .unwrap_or(requirements)
545 }
546 crate::integrations::policy_engine::AllocationResult::Queued => {
547 return Err(ResourceError::AllocationQueued {
548 reason: policy_decision.reason.into(),
549 });
550 }
551 crate::integrations::policy_engine::AllocationResult::Escalate => {
552 return Err(ResourceError::EscalationRequired {
553 reason: policy_decision.reason.into(),
554 });
555 }
556 };
557
558 self.send_monitoring_event(MonitoringEvent::AllocationRequest {
560 agent_id,
561 requirements: final_requirements.clone(),
562 })?;
563
564 tokio::time::sleep(Duration::from_millis(10)).await;
566
567 self.allocations.read().get(&agent_id).cloned().ok_or(
569 ResourceError::InsufficientResources {
570 requirements: "Insufficient system resources".into(),
571 },
572 )
573 }
574
575 async fn deallocate_resources(&self, agent_id: AgentId) -> Result<(), ResourceError> {
576 self.send_monitoring_event(MonitoringEvent::DeallocationRequest { agent_id })?;
577
578 tokio::time::sleep(Duration::from_millis(10)).await;
580
581 Ok(())
582 }
583
584 async fn update_usage(
585 &self,
586 agent_id: AgentId,
587 usage: ResourceUsage,
588 ) -> Result<(), ResourceError> {
589 self.send_monitoring_event(MonitoringEvent::UsageUpdate { agent_id, usage })?;
590 Ok(())
591 }
592
593 async fn get_usage(&self, agent_id: AgentId) -> Result<ResourceUsage, ResourceError> {
594 self.usage_tracker
595 .read()
596 .get(&agent_id)
597 .cloned()
598 .ok_or(ResourceError::AgentNotFound { agent_id })
599 }
600
601 async fn get_system_status(&self) -> ResourceSystemStatus {
602 let system = self.system_resources.read();
603 let allocations_count = self.allocations.read().len();
604
605 let resource_info = system.get_resource_info();
607
608 ResourceSystemStatus {
609 total_memory: self.config.total_memory,
610 available_memory: resource_info.available_memory,
611 total_cpu_cores: self.config.total_cpu_cores,
612 available_cpu_cores: resource_info.available_cpu_cores,
613 total_disk_space: self.config.total_disk_space,
614 available_disk_space: resource_info.available_disk_space,
615 total_network_bandwidth: self.config.total_network_bandwidth,
616 available_network_bandwidth: resource_info.available_network_bandwidth,
617 active_allocations: allocations_count,
618 last_updated: SystemTime::now(),
619 }
620 }
621
622 async fn set_limits(
623 &self,
624 agent_id: AgentId,
625 limits: ResourceLimits,
626 ) -> Result<(), ResourceError> {
627 let mut allocations = self.allocations.write();
628 if let Some(allocation) = allocations.get_mut(&agent_id) {
629 allocation.allocated_memory = limits.memory_mb * 1024 * 1024;
631 allocation.allocated_cpu_cores = limits.cpu_cores;
632 allocation.allocated_disk_io = limits.disk_io_mbps * 1024 * 1024;
633 allocation.allocated_network_io = limits.network_io_mbps * 1024 * 1024;
634 Ok(())
635 } else {
636 Err(ResourceError::AgentNotFound { agent_id })
637 }
638 }
639
640 async fn check_limits(&self, agent_id: AgentId) -> Result<bool, ResourceError> {
641 let usage_map = self.usage_tracker.read();
642 let allocations_map = self.allocations.read();
643
644 if let Some(allocation) = allocations_map.get(&agent_id) {
646 if let Some(usage) = usage_map.get(&agent_id) {
648 let limits = ResourceLimits {
649 memory_mb: allocation.allocated_memory / (1024 * 1024),
650 cpu_cores: allocation.allocated_cpu_cores,
651 disk_io_mbps: allocation.allocated_disk_io / (1024 * 1024),
652 network_io_mbps: allocation.allocated_network_io / (1024 * 1024),
653 execution_timeout: Duration::from_secs(3600),
654 idle_timeout: Duration::from_secs(300),
655 };
656 let violations = Self::check_resource_violations(usage, &limits);
657 Ok(violations.is_empty())
658 } else {
659 Ok(true)
661 }
662 } else {
663 Err(ResourceError::AgentNotFound { agent_id })
664 }
665 }
666
667 async fn check_resource_violations(
668 &self,
669 agent_id: AgentId,
670 ) -> Result<Vec<ResourceViolation>, ResourceError> {
671 let usage_map = self.usage_tracker.read();
672 let allocations_map = self.allocations.read();
673
674 if let (Some(usage), Some(allocation)) =
675 (usage_map.get(&agent_id), allocations_map.get(&agent_id))
676 {
677 let limits = ResourceLimits {
679 memory_mb: allocation.allocated_memory / (1024 * 1024),
680 cpu_cores: allocation.allocated_cpu_cores,
681 disk_io_mbps: allocation.allocated_disk_io / (1024 * 1024),
682 network_io_mbps: allocation.allocated_network_io / (1024 * 1024),
683 execution_timeout: Duration::from_secs(3600),
684 idle_timeout: Duration::from_secs(300),
685 };
686 Ok(Self::check_resource_violations(usage, &limits))
687 } else {
688 Err(ResourceError::AgentNotFound { agent_id })
689 }
690 }
691
692 async fn shutdown(&self) -> Result<(), ResourceError> {
693 tracing::info!("Shutting down resource manager");
694
695 *self.is_running.write() = false;
696 self.shutdown_notify.notify_waiters();
697
698 let agent_ids: Vec<AgentId> = self.allocations.read().keys().copied().collect();
700
701 for agent_id in agent_ids {
702 if let Err(e) = self.deallocate_resources(agent_id).await {
703 tracing::error!(
704 "Failed to deallocate resources for agent {} during shutdown: {}",
705 agent_id,
706 e
707 );
708 }
709 }
710
711 Ok(())
712 }
713
714 async fn check_health(&self) -> Result<ComponentHealth, ResourceError> {
715 let is_running = *self.is_running.read();
716 if !is_running {
717 return Ok(ComponentHealth::unhealthy(
718 "Resource manager is shut down".to_string(),
719 ));
720 }
721
722 let system_status = self.get_system_status().await;
723 let allocations_count = self.allocations.read().len();
724
725 let memory_usage = if system_status.total_memory > 0 {
727 (system_status.total_memory - system_status.available_memory) as f64
728 / system_status.total_memory as f64
729 } else {
730 0.0
731 };
732
733 let cpu_usage = if system_status.total_cpu_cores > 0 {
734 (system_status.total_cpu_cores - system_status.available_cpu_cores) as f64
735 / system_status.total_cpu_cores as f64
736 } else {
737 0.0
738 };
739
740 let status = if memory_usage > 0.9 || cpu_usage > 0.9 {
741 ComponentHealth::unhealthy(format!(
742 "Critical resource usage - Memory: {:.1}%, CPU: {:.1}%",
743 memory_usage * 100.0,
744 cpu_usage * 100.0
745 ))
746 } else if memory_usage > 0.8 || cpu_usage > 0.8 {
747 ComponentHealth::degraded(format!(
748 "High resource usage - Memory: {:.1}%, CPU: {:.1}%",
749 memory_usage * 100.0,
750 cpu_usage * 100.0
751 ))
752 } else {
753 ComponentHealth::healthy(Some(format!(
754 "Resources available - Memory: {:.1}%, CPU: {:.1}%, {} active allocations",
755 memory_usage * 100.0,
756 cpu_usage * 100.0,
757 allocations_count
758 )))
759 };
760
761 Ok(status
762 .with_metric(
763 "memory_usage_percent".to_string(),
764 format!("{:.2}", memory_usage * 100.0),
765 )
766 .with_metric(
767 "cpu_usage_percent".to_string(),
768 format!("{:.2}", cpu_usage * 100.0),
769 )
770 .with_metric(
771 "active_allocations".to_string(),
772 allocations_count.to_string(),
773 )
774 .with_metric(
775 "available_memory_mb".to_string(),
776 (system_status.available_memory / (1024 * 1024)).to_string(),
777 )
778 .with_metric(
779 "available_cpu_cores".to_string(),
780 system_status.available_cpu_cores.to_string(),
781 ))
782 }
783}
784
785#[derive(Debug, Clone)]
787struct SystemResources {
788 available_memory: usize,
789 available_cpu_cores: u32,
790 available_disk_space: usize,
791 available_network_bandwidth: usize,
792 reserved_memory: usize,
793 reserved_cpu_cores: u32,
794 reserved_disk_space: usize,
795 reserved_network_bandwidth: usize,
796}
797
798impl SystemResources {
799 fn new(config: &ResourceManagerConfig) -> Self {
800 let reservation_factor = config.resource_reservation_percentage;
801
802 let reserved_cpu = if reservation_factor > 0.0 {
804 ((config.total_cpu_cores as f32 * reservation_factor).ceil() as u32).max(1)
805 } else {
806 0
807 };
808 let available_cpu = config.total_cpu_cores.saturating_sub(reserved_cpu);
809
810 Self {
811 available_memory: config.total_memory
812 - (config.total_memory as f32 * reservation_factor) as usize,
813 available_cpu_cores: available_cpu,
814 available_disk_space: config.total_disk_space
815 - (config.total_disk_space as f32 * reservation_factor) as usize,
816 available_network_bandwidth: config.total_network_bandwidth
817 - (config.total_network_bandwidth as f32 * reservation_factor) as usize,
818 reserved_memory: (config.total_memory as f32 * reservation_factor) as usize,
819 reserved_cpu_cores: reserved_cpu,
820 reserved_disk_space: (config.total_disk_space as f32 * reservation_factor) as usize,
821 reserved_network_bandwidth: (config.total_network_bandwidth as f32 * reservation_factor)
822 as usize,
823 }
824 }
825
826 fn can_allocate(&self, requirements: &ResourceRequirements) -> bool {
827 self.available_memory >= requirements.max_memory_mb * 1024 * 1024
828 && self.available_cpu_cores >= requirements.max_cpu_cores as u32
829 && self.available_disk_space >= requirements.disk_space_mb * 1024 * 1024
830 && self.available_network_bandwidth >= requirements.network_bandwidth_mbps * 1024 * 1024
831 }
832
833 fn allocate(&mut self, requirements: &ResourceRequirements) -> ResourceAllocation {
834 let memory_bytes = requirements.max_memory_mb * 1024 * 1024;
835 let disk_bytes = requirements.disk_space_mb * 1024 * 1024;
836 let network_bytes = requirements.network_bandwidth_mbps * 1024 * 1024;
837
838 self.available_memory -= memory_bytes;
839 self.available_cpu_cores -= requirements.max_cpu_cores as u32;
840 self.available_disk_space -= disk_bytes;
841 self.available_network_bandwidth -= network_bytes;
842
843 ResourceAllocation {
844 agent_id: AgentId::new(), allocated_memory: memory_bytes,
846 allocated_cpu_cores: requirements.max_cpu_cores,
847 allocated_disk_io: disk_bytes,
848 allocated_network_io: network_bytes,
849 allocation_time: SystemTime::now(),
850 }
851 }
852
853 fn deallocate(&mut self, allocation: &ResourceAllocation) {
854 self.available_memory += allocation.allocated_memory;
855 self.available_cpu_cores += allocation.allocated_cpu_cores as u32;
856 self.available_disk_space += allocation.allocated_disk_io;
857 self.available_network_bandwidth += allocation.allocated_network_io;
858 }
859
860 fn update_usage(&mut self, _usage: &ResourceUsage) {
861 }
864
865 fn get_resource_info(&self) -> ResourceInfo {
867 ResourceInfo {
868 available_memory: self.available_memory,
869 available_cpu_cores: self.available_cpu_cores,
870 available_disk_space: self.available_disk_space,
871 available_network_bandwidth: self.available_network_bandwidth,
872 reserved_memory: self.reserved_memory,
873 reserved_cpu_cores: self.reserved_cpu_cores,
874 reserved_disk_space: self.reserved_disk_space,
875 reserved_network_bandwidth: self.reserved_network_bandwidth,
876 }
877 }
878}
879
880#[derive(Debug, Clone)]
882pub struct ResourceSystemStatus {
883 pub total_memory: usize,
884 pub available_memory: usize,
885 pub total_cpu_cores: u32,
886 pub available_cpu_cores: u32,
887 pub total_disk_space: usize,
888 pub available_disk_space: usize,
889 pub total_network_bandwidth: usize,
890 pub available_network_bandwidth: usize,
891 pub active_allocations: usize,
892 pub last_updated: SystemTime,
893}
894
895#[derive(Debug, Clone)]
897pub struct ResourceInfo {
898 pub available_memory: usize,
899 pub available_cpu_cores: u32,
900 pub available_disk_space: usize,
901 pub available_network_bandwidth: usize,
902 pub reserved_memory: usize,
903 pub reserved_cpu_cores: u32,
904 pub reserved_disk_space: usize,
905 pub reserved_network_bandwidth: usize,
906}
907
908#[derive(Debug, Clone)]
910pub enum ResourceViolation {
911 Memory { used: usize, limit: usize },
912 Cpu { used: f32, limit: f32 },
913 DiskIo { used: usize, limit: usize },
914 NetworkIo { used: usize, limit: usize },
915}
916
917#[derive(Debug, Clone)]
919enum MonitoringEvent {
920 UsageUpdate {
921 agent_id: AgentId,
922 usage: ResourceUsage,
923 },
924 AllocationRequest {
925 agent_id: AgentId,
926 requirements: ResourceRequirements,
927 },
928 DeallocationRequest {
929 agent_id: AgentId,
930 },
931 LimitViolation {
932 agent_id: AgentId,
933 violations: Vec<ResourceViolation>,
934 },
935 ThrottleRequested {
940 agent_id: AgentId,
941 consecutive_violations: u32,
942 violations: Vec<ResourceViolation>,
943 },
944 KillRequested {
949 agent_id: AgentId,
950 violations: Vec<ResourceViolation>,
951 reason: &'static str,
952 },
953}
954
955#[cfg(test)]
956mod tests {
957 use super::*;
958
959 fn create_test_requirements() -> ResourceRequirements {
960 ResourceRequirements {
961 min_memory_mb: 1,
962 max_memory_mb: 1,
963 min_cpu_cores: 1.0,
964 max_cpu_cores: 1.0,
965 disk_space_mb: 1,
966 network_bandwidth_mbps: 1,
967 }
968 }
969
970 #[tokio::test]
971 async fn test_resource_allocation() {
972 let manager = DefaultResourceManager::new(ResourceManagerConfig::default())
973 .await
974 .unwrap();
975 let agent_id = AgentId::new();
976 let requirements = create_test_requirements();
977
978 let allocation = manager
979 .allocate_resources(agent_id, requirements)
980 .await
981 .unwrap();
982 assert_eq!(allocation.allocated_memory, 1024 * 1024);
983 assert_eq!(allocation.allocated_cpu_cores, 1.0);
984 }
985
986 #[tokio::test]
987 async fn test_resource_deallocation() {
988 let manager = DefaultResourceManager::new(ResourceManagerConfig::default())
989 .await
990 .unwrap();
991 let agent_id = AgentId::new();
992 let requirements = create_test_requirements();
993
994 manager
995 .allocate_resources(agent_id, requirements)
996 .await
997 .unwrap();
998 let result = manager.deallocate_resources(agent_id).await;
999 assert!(result.is_ok());
1000 }
1001
1002 #[tokio::test]
1003 async fn test_usage_tracking() {
1004 let manager = DefaultResourceManager::new(ResourceManagerConfig::default())
1005 .await
1006 .unwrap();
1007 let agent_id = AgentId::new();
1008 let requirements = create_test_requirements();
1009
1010 manager
1011 .allocate_resources(agent_id, requirements)
1012 .await
1013 .unwrap();
1014
1015 let usage = ResourceUsage {
1016 memory_used: 512 * 1024, cpu_utilization: 0.5,
1018 disk_io_rate: 512 * 1024,
1019 network_io_rate: 512,
1020 uptime: Duration::from_secs(60),
1021 };
1022
1023 manager.update_usage(agent_id, usage.clone()).await.unwrap();
1024
1025 tokio::time::sleep(Duration::from_millis(20)).await;
1026
1027 let retrieved_usage = manager.get_usage(agent_id).await.unwrap();
1028 assert_eq!(retrieved_usage.memory_used, usage.memory_used);
1029 assert_eq!(retrieved_usage.cpu_utilization, usage.cpu_utilization);
1030 }
1031
1032 #[tokio::test]
1033 async fn test_system_status() {
1034 let manager = DefaultResourceManager::new(ResourceManagerConfig::default())
1035 .await
1036 .unwrap();
1037 let status = manager.get_system_status().await;
1038
1039 assert!(status.total_memory > 0);
1040 assert!(status.available_memory <= status.total_memory);
1041 assert!(status.total_cpu_cores > 0);
1042 assert!(status.available_cpu_cores <= status.total_cpu_cores);
1043 }
1044
1045 #[test]
1046 fn test_resource_violations() {
1047 let usage = ResourceUsage {
1048 memory_used: 2 * 1024 * 1024, cpu_utilization: 2.0,
1050 disk_io_rate: 2 * 1024 * 1024,
1051 network_io_rate: 2 * 1024 * 1024, uptime: Duration::from_secs(60),
1053 };
1054
1055 let limits = ResourceLimits {
1056 memory_mb: 1,
1057 cpu_cores: 1.0,
1058 disk_io_mbps: 1,
1059 network_io_mbps: 1,
1060 execution_timeout: Duration::from_secs(3600),
1061 idle_timeout: Duration::from_secs(300),
1062 };
1063
1064 let violations = DefaultResourceManager::check_resource_violations(&usage, &limits);
1065 assert_eq!(violations.len(), 4); }
1067}