Skip to main content

symbi_runtime/resource/
mod.rs

1//! Agent Resource Manager
2//!
3//! Manages resource allocation, monitoring, and enforcement for agents
4
5use async_trait::async_trait;
6use parking_lot::RwLock;
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, SystemTime};
10use tokio::sync::{mpsc, Notify};
11use tokio::time::interval;
12
13use crate::integrations::policy_engine::{
14    PolicyEnforcementFactory, PolicyEnforcementPoint, ResourceAccessConfig,
15    ResourceAllocationRequest,
16};
17use crate::types::*;
18
19/// Resource manager trait
20#[async_trait]
21pub trait ResourceManager {
22    /// Allocate resources for an agent
23    async fn allocate_resources(
24        &self,
25        agent_id: AgentId,
26        requirements: ResourceRequirements,
27    ) -> Result<ResourceAllocation, ResourceError>;
28
29    /// Deallocate resources for an agent
30    async fn deallocate_resources(&self, agent_id: AgentId) -> Result<(), ResourceError>;
31
32    /// Update resource usage for an agent
33    async fn update_usage(
34        &self,
35        agent_id: AgentId,
36        usage: ResourceUsage,
37    ) -> Result<(), ResourceError>;
38
39    /// Get current resource usage for an agent
40    async fn get_usage(&self, agent_id: AgentId) -> Result<ResourceUsage, ResourceError>;
41
42    /// Get system resource status
43    async fn get_system_status(&self) -> ResourceSystemStatus;
44
45    /// Set resource limits for an agent
46    async fn set_limits(
47        &self,
48        agent_id: AgentId,
49        limits: ResourceLimits,
50    ) -> Result<(), ResourceError>;
51
52    /// Check if agent is within resource limits
53    async fn check_limits(&self, agent_id: AgentId) -> Result<bool, ResourceError>;
54
55    /// Check resource access violations for an agent
56    async fn check_resource_violations(
57        &self,
58        agent_id: AgentId,
59    ) -> Result<Vec<ResourceViolation>, ResourceError>;
60
61    /// Shutdown the resource manager
62    async fn shutdown(&self) -> Result<(), ResourceError>;
63
64    /// Check the health of the resource manager
65    async fn check_health(&self) -> Result<ComponentHealth, ResourceError>;
66}
67
68/// Resource manager configuration
69#[derive(Debug, Clone)]
70pub struct ResourceManagerConfig {
71    pub total_memory: usize,
72    pub total_cpu_cores: u32,
73    pub total_disk_space: usize,
74    pub total_network_bandwidth: usize,
75    pub monitoring_interval: Duration,
76    pub enforcement_enabled: bool,
77    pub auto_scaling_enabled: bool,
78    pub resource_reservation_percentage: f32,
79    pub policy_enforcement_config: ResourceAccessConfig,
80    /// Action taken when an agent breaches its allocated resource limits.
81    ///
82    /// Defaults to [`ViolationAction::Throttle`] so a single noisy agent is
83    /// backed off rather than killed outright; operators who prefer fail-fast
84    /// semantics can swap in [`ViolationAction::Kill`]. `LogOnly` preserves
85    /// the pre-enforcement behaviour (emit a monitoring event, do nothing)
86    /// for debugging.
87    pub violation_action: ViolationAction,
88    /// Number of consecutive sampling intervals an agent must breach limits
89    /// before the manager escalates to [`ViolationAction::Kill`]. Only
90    /// consulted when `violation_action == Throttle`.
91    pub kill_after_sustained_violations: u32,
92}
93
94/// Action the resource manager takes when an agent exceeds its allocation.
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96pub enum ViolationAction {
97    /// Emit a `LimitViolation` monitoring event and do nothing else. Useful
98    /// for early rollouts where the operator wants visibility without the
99    /// risk of killing misbehaving-but-important agents.
100    LogOnly,
101    /// Throttle the agent (emit `ThrottleRequested`) and escalate to `Kill`
102    /// after `kill_after_sustained_violations` consecutive breaches.
103    Throttle,
104    /// Immediately emit `KillRequested` on any breach.
105    Kill,
106}
107
108impl Default for ResourceManagerConfig {
109    fn default() -> Self {
110        Self {
111            total_memory: 16 * 1024 * 1024 * 1024, // 16GB
112            total_cpu_cores: 8,
113            total_disk_space: 1024 * 1024 * 1024 * 1024, // 1TB
114            total_network_bandwidth: 1000 * 1024 * 1024, // 1Gbps
115            monitoring_interval: Duration::from_secs(5),
116            enforcement_enabled: true,
117            auto_scaling_enabled: false,
118            resource_reservation_percentage: 0.1, // 10% reserved
119            policy_enforcement_config: ResourceAccessConfig::default(),
120            violation_action: ViolationAction::Throttle,
121            kill_after_sustained_violations: 5,
122        }
123    }
124}
125
126/// Default implementation of the resource manager
127pub struct DefaultResourceManager {
128    config: ResourceManagerConfig,
129    allocations: Arc<RwLock<HashMap<AgentId, ResourceAllocation>>>,
130    usage_tracker: Arc<RwLock<HashMap<AgentId, ResourceUsage>>>,
131    system_resources: Arc<RwLock<SystemResources>>,
132    monitoring_sender: mpsc::UnboundedSender<MonitoringEvent>,
133    shutdown_notify: Arc<Notify>,
134    is_running: Arc<RwLock<bool>>,
135    policy_enforcement: Arc<dyn PolicyEnforcementPoint>,
136    /// Per-agent counter of consecutive sampling intervals during which
137    /// resource usage exceeded the agent's allocation. Cleared on the first
138    /// sample that comes back within limits.
139    consecutive_violations: Arc<RwLock<HashMap<AgentId, u32>>>,
140}
141
142impl DefaultResourceManager {
143    /// Create a new resource manager
144    pub async fn new(config: ResourceManagerConfig) -> Result<Self, ResourceError> {
145        let allocations = Arc::new(RwLock::new(HashMap::new()));
146        let usage_tracker = Arc::new(RwLock::new(HashMap::new()));
147        let system_resources = Arc::new(RwLock::new(SystemResources::new(&config)));
148        let (monitoring_sender, monitoring_receiver) = mpsc::unbounded_channel();
149        let shutdown_notify = Arc::new(Notify::new());
150        let is_running = Arc::new(RwLock::new(true));
151
152        // Create policy enforcement point
153        let policy_enforcement = PolicyEnforcementFactory::create_enforcement_point(
154            config.policy_enforcement_config.clone(),
155        )
156        .await
157        .map_err(|e| {
158            ResourceError::PolicyError(format!("Failed to create policy enforcement: {}", e))
159        })?;
160
161        let manager = Self {
162            config,
163            allocations,
164            usage_tracker,
165            system_resources,
166            monitoring_sender,
167            shutdown_notify,
168            is_running,
169            policy_enforcement,
170            consecutive_violations: Arc::new(RwLock::new(HashMap::new())),
171        };
172
173        // Start background tasks
174        manager.start_monitoring_loop(monitoring_receiver).await;
175        manager.start_enforcement_loop().await;
176
177        Ok(manager)
178    }
179
180    /// Start the resource monitoring loop
181    async fn start_monitoring_loop(
182        &self,
183        mut monitoring_receiver: mpsc::UnboundedReceiver<MonitoringEvent>,
184    ) {
185        let usage_tracker = self.usage_tracker.clone();
186        let allocations = self.allocations.clone();
187        let system_resources = self.system_resources.clone();
188        let shutdown_notify = self.shutdown_notify.clone();
189
190        tokio::spawn(async move {
191            loop {
192                tokio::select! {
193                    event = monitoring_receiver.recv() => {
194                        if let Some(event) = event {
195                            Self::process_monitoring_event(event, &usage_tracker, &allocations, &system_resources).await;
196                        } else {
197                            break;
198                        }
199                    }
200                    _ = shutdown_notify.notified() => {
201                        break;
202                    }
203                }
204            }
205        });
206    }
207
208    /// Start the resource enforcement loop
209    async fn start_enforcement_loop(&self) {
210        let usage_tracker = self.usage_tracker.clone();
211        let allocations = self.allocations.clone();
212        let monitoring_sender = self.monitoring_sender.clone();
213        let shutdown_notify = self.shutdown_notify.clone();
214        let is_running = self.is_running.clone();
215        let monitoring_interval = self.config.monitoring_interval;
216        let enforcement_enabled = self.config.enforcement_enabled;
217        let violation_action = self.config.violation_action;
218        let kill_after_sustained = self.config.kill_after_sustained_violations;
219        let consecutive_violations = self.consecutive_violations.clone();
220
221        tokio::spawn(async move {
222            let mut interval = interval(monitoring_interval);
223
224            loop {
225                tokio::select! {
226                    _ = interval.tick() => {
227                        if !*is_running.read() {
228                            break;
229                        }
230
231                        if enforcement_enabled {
232                            Self::enforce_resource_limits(
233                                &usage_tracker,
234                                &allocations,
235                                &monitoring_sender,
236                                violation_action,
237                                kill_after_sustained,
238                                &consecutive_violations,
239                            )
240                            .await;
241                        }
242                    }
243                    _ = shutdown_notify.notified() => {
244                        break;
245                    }
246                }
247            }
248        });
249    }
250
251    /// Process a monitoring event
252    async fn process_monitoring_event(
253        event: MonitoringEvent,
254        usage_tracker: &Arc<RwLock<HashMap<AgentId, ResourceUsage>>>,
255        allocations: &Arc<RwLock<HashMap<AgentId, ResourceAllocation>>>,
256        system_resources: &Arc<RwLock<SystemResources>>,
257    ) {
258        match event {
259            MonitoringEvent::UsageUpdate { agent_id, usage } => {
260                usage_tracker.write().insert(agent_id, usage.clone());
261
262                // Update system resource usage
263                system_resources.write().update_usage(&usage);
264
265                tracing::debug!("Updated resource usage for agent {}: {:?}", agent_id, usage);
266            }
267            MonitoringEvent::AllocationRequest {
268                agent_id,
269                requirements,
270            } => {
271                let mut system = system_resources.write();
272                if system.can_allocate(&requirements) {
273                    let allocation = system.allocate(&requirements);
274                    allocations.write().insert(agent_id, allocation.clone());
275
276                    tracing::info!(
277                        "Allocated resources for agent {}: {:?}",
278                        agent_id,
279                        allocation
280                    );
281                } else {
282                    tracing::warn!(
283                        "Cannot allocate resources for agent {}: insufficient resources",
284                        agent_id
285                    );
286                }
287            }
288            MonitoringEvent::DeallocationRequest { agent_id } => {
289                if let Some(allocation) = allocations.write().remove(&agent_id) {
290                    system_resources.write().deallocate(&allocation);
291                    usage_tracker.write().remove(&agent_id);
292
293                    tracing::info!("Deallocated resources for agent {}", agent_id);
294                }
295            }
296            MonitoringEvent::LimitViolation {
297                agent_id,
298                violations,
299            } => {
300                // Handle limit violation event - this is typically sent by the enforcement loop
301                // and processed by external systems, so we just log it here
302                tracing::warn!(
303                    "Resource limit violation detected for agent {}: {:?}",
304                    agent_id,
305                    violations
306                );
307            }
308            MonitoringEvent::ThrottleRequested {
309                agent_id,
310                consecutive_violations,
311                violations,
312            } => {
313                // Throttle enforcement belongs in the sandbox/orchestrator
314                // layer (e.g. tightening the Docker CPU quota). At the
315                // resource-manager layer we surface the signal for observers.
316                tracing::warn!(
317                    %agent_id,
318                    consecutive_violations,
319                    ?violations,
320                    "ThrottleRequested event emitted — orchestrator should slow this agent"
321                );
322            }
323            MonitoringEvent::KillRequested {
324                agent_id,
325                violations,
326                reason,
327            } => {
328                // Same story — actual termination is the orchestrator's
329                // responsibility. Log loudly so operators see the escalation
330                // regardless of whether a consumer is listening.
331                tracing::error!(
332                    %agent_id,
333                    ?violations,
334                    reason,
335                    "KillRequested event emitted — orchestrator should terminate this agent"
336                );
337            }
338        }
339    }
340
341    /// Enforce resource limits.
342    ///
343    /// For each agent with a known allocation, compares the most recent usage
344    /// sample against the allocation's ceiling and dispatches the configured
345    /// [`ViolationAction`]:
346    ///
347    /// - `LogOnly`: always emits a `LimitViolation` event for visibility.
348    /// - `Throttle`: emits `ThrottleRequested` on breach and escalates to
349    ///   `KillRequested` after `kill_after_sustained_violations` consecutive
350    ///   breaches. Sampling intervals with no breach reset the counter.
351    /// - `Kill`: emits `KillRequested` on any breach.
352    ///
353    /// The resource manager does not own the agent process handle, so actual
354    /// enforcement (sending SIGTERM, updating cgroup quotas, etc.) is the
355    /// responsibility of the orchestrator that consumes these monitoring
356    /// events. That separation keeps the manager pluggable across Docker,
357    /// E2B, and native-host backends.
358    async fn enforce_resource_limits(
359        usage_tracker: &Arc<RwLock<HashMap<AgentId, ResourceUsage>>>,
360        allocations: &Arc<RwLock<HashMap<AgentId, ResourceAllocation>>>,
361        monitoring_sender: &mpsc::UnboundedSender<MonitoringEvent>,
362        violation_action: ViolationAction,
363        kill_after_sustained: u32,
364        consecutive_violations: &Arc<RwLock<HashMap<AgentId, u32>>>,
365    ) {
366        // Snapshot the read-locked maps; we release the locks before sending
367        // events so a slow receiver can't stall the enforcement loop.
368        let samples: Vec<(AgentId, ResourceUsage, ResourceAllocation)> = {
369            let usage_map = usage_tracker.read();
370            let allocations_map = allocations.read();
371            usage_map
372                .iter()
373                .filter_map(|(agent_id, usage)| {
374                    allocations_map
375                        .get(agent_id)
376                        .map(|alloc| (*agent_id, usage.clone(), alloc.clone()))
377                })
378                .collect()
379        };
380
381        for (agent_id, usage, allocation) in samples {
382            let limits = ResourceLimits {
383                memory_mb: allocation.allocated_memory / (1024 * 1024),
384                cpu_cores: allocation.allocated_cpu_cores,
385                disk_io_mbps: allocation.allocated_disk_io / (1024 * 1024),
386                network_io_mbps: allocation.allocated_network_io / (1024 * 1024),
387                execution_timeout: Duration::from_secs(3600),
388                idle_timeout: Duration::from_secs(300),
389            };
390            let violations = Self::check_resource_violations(&usage, &limits);
391
392            if violations.is_empty() {
393                // Agent came back within limits; reset its consecutive-violation
394                // counter so a long-quiet period doesn't linger as "almost
395                // killed".
396                consecutive_violations.write().remove(&agent_id);
397                continue;
398            }
399
400            // Bump the consecutive-violation counter under a write lock, then
401            // emit the appropriate monitoring event after releasing the lock.
402            let new_count = {
403                let mut counters = consecutive_violations.write();
404                let entry = counters.entry(agent_id).or_insert(0);
405                *entry += 1;
406                *entry
407            };
408
409            tracing::warn!(
410                %agent_id,
411                violations = ?violations,
412                consecutive = new_count,
413                action = ?violation_action,
414                "Agent violated resource limits"
415            );
416
417            // Always emit the raw violation event so dashboards keep working.
418            let _ = monitoring_sender.send(MonitoringEvent::LimitViolation {
419                agent_id,
420                violations: violations.clone(),
421            });
422
423            match violation_action {
424                ViolationAction::LogOnly => { /* nothing further */ }
425                ViolationAction::Kill => {
426                    let _ = monitoring_sender.send(MonitoringEvent::KillRequested {
427                        agent_id,
428                        violations: violations.clone(),
429                        reason: "ViolationAction::Kill configured",
430                    });
431                }
432                ViolationAction::Throttle => {
433                    if new_count >= kill_after_sustained {
434                        let _ = monitoring_sender.send(MonitoringEvent::KillRequested {
435                            agent_id,
436                            violations: violations.clone(),
437                            reason: "sustained limit violations exceeded threshold",
438                        });
439                        // Reset the counter so we don't emit repeated kills for
440                        // the same escalation; the orchestrator will either
441                        // act on this event or the usage will recover.
442                        consecutive_violations.write().remove(&agent_id);
443                    } else {
444                        let _ = monitoring_sender.send(MonitoringEvent::ThrottleRequested {
445                            agent_id,
446                            consecutive_violations: new_count,
447                            violations,
448                        });
449                    }
450                }
451            }
452        }
453    }
454
455    /// Check for resource limit violations
456    fn check_resource_violations(
457        usage: &ResourceUsage,
458        limits: &ResourceLimits,
459    ) -> Vec<ResourceViolation> {
460        let mut violations = Vec::new();
461
462        if usage.memory_used > limits.memory_mb * 1024 * 1024 {
463            violations.push(ResourceViolation::Memory {
464                used: usage.memory_used,
465                limit: limits.memory_mb * 1024 * 1024,
466            });
467        }
468
469        if usage.cpu_utilization > limits.cpu_cores {
470            violations.push(ResourceViolation::Cpu {
471                used: usage.cpu_utilization,
472                limit: limits.cpu_cores,
473            });
474        }
475
476        if usage.disk_io_rate > limits.disk_io_mbps * 1024 * 1024 {
477            violations.push(ResourceViolation::DiskIo {
478                used: usage.disk_io_rate,
479                limit: limits.disk_io_mbps * 1024 * 1024,
480            });
481        }
482
483        if usage.network_io_rate > limits.network_io_mbps * 1024 * 1024 {
484            violations.push(ResourceViolation::NetworkIo {
485                used: usage.network_io_rate,
486                limit: limits.network_io_mbps * 1024 * 1024,
487            });
488        }
489
490        violations
491    }
492
493    /// Send a monitoring event
494    fn send_monitoring_event(&self, event: MonitoringEvent) -> Result<(), ResourceError> {
495        self.monitoring_sender.send(event).map_err(|_| {
496            ResourceError::MonitoringFailed("Failed to send monitoring event".to_string())
497        })
498    }
499}
500
501#[async_trait]
502impl ResourceManager for DefaultResourceManager {
503    async fn allocate_resources(
504        &self,
505        agent_id: AgentId,
506        requirements: ResourceRequirements,
507    ) -> Result<ResourceAllocation, ResourceError> {
508        if !*self.is_running.read() {
509            return Err(ResourceError::ShuttingDown);
510        }
511
512        // Check if agent already has allocation
513        if self.allocations.read().contains_key(&agent_id) {
514            return Err(ResourceError::AllocationExists { agent_id });
515        }
516
517        // Check policy for resource allocation
518        let allocation_request = ResourceAllocationRequest {
519            agent_id,
520            requirements: requirements.clone(),
521            priority: Priority::Normal,
522            justification: None,
523            max_duration: None,
524            timestamp: SystemTime::now(),
525        };
526
527        let policy_decision = self
528            .policy_enforcement
529            .validate_resource_allocation(agent_id, &allocation_request)
530            .await
531            .map_err(|e| ResourceError::PolicyError(format!("Policy validation failed: {}", e)))?;
532
533        let final_requirements = match policy_decision.decision {
534            crate::integrations::policy_engine::AllocationResult::Approve => requirements,
535            crate::integrations::policy_engine::AllocationResult::Deny => {
536                return Err(ResourceError::PolicyViolation {
537                    reason: policy_decision.reason.into(),
538                });
539            }
540            crate::integrations::policy_engine::AllocationResult::Modified => {
541                // Use modified requirements if provided
542                policy_decision
543                    .modified_requirements
544                    .unwrap_or(requirements)
545            }
546            crate::integrations::policy_engine::AllocationResult::Queued => {
547                return Err(ResourceError::AllocationQueued {
548                    reason: policy_decision.reason.into(),
549                });
550            }
551            crate::integrations::policy_engine::AllocationResult::Escalate => {
552                return Err(ResourceError::EscalationRequired {
553                    reason: policy_decision.reason.into(),
554                });
555            }
556        };
557
558        // Send allocation request
559        self.send_monitoring_event(MonitoringEvent::AllocationRequest {
560            agent_id,
561            requirements: final_requirements.clone(),
562        })?;
563
564        // Give the monitoring loop time to process
565        tokio::time::sleep(Duration::from_millis(10)).await;
566
567        // Check if allocation was successful
568        self.allocations.read().get(&agent_id).cloned().ok_or(
569            ResourceError::InsufficientResources {
570                requirements: "Insufficient system resources".into(),
571            },
572        )
573    }
574
575    async fn deallocate_resources(&self, agent_id: AgentId) -> Result<(), ResourceError> {
576        self.send_monitoring_event(MonitoringEvent::DeallocationRequest { agent_id })?;
577
578        // Give the monitoring loop time to process
579        tokio::time::sleep(Duration::from_millis(10)).await;
580
581        Ok(())
582    }
583
584    async fn update_usage(
585        &self,
586        agent_id: AgentId,
587        usage: ResourceUsage,
588    ) -> Result<(), ResourceError> {
589        self.send_monitoring_event(MonitoringEvent::UsageUpdate { agent_id, usage })?;
590        Ok(())
591    }
592
593    async fn get_usage(&self, agent_id: AgentId) -> Result<ResourceUsage, ResourceError> {
594        self.usage_tracker
595            .read()
596            .get(&agent_id)
597            .cloned()
598            .ok_or(ResourceError::AgentNotFound { agent_id })
599    }
600
601    async fn get_system_status(&self) -> ResourceSystemStatus {
602        let system = self.system_resources.read();
603        let allocations_count = self.allocations.read().len();
604
605        // Use the resource info to access reserved fields
606        let resource_info = system.get_resource_info();
607
608        ResourceSystemStatus {
609            total_memory: self.config.total_memory,
610            available_memory: resource_info.available_memory,
611            total_cpu_cores: self.config.total_cpu_cores,
612            available_cpu_cores: resource_info.available_cpu_cores,
613            total_disk_space: self.config.total_disk_space,
614            available_disk_space: resource_info.available_disk_space,
615            total_network_bandwidth: self.config.total_network_bandwidth,
616            available_network_bandwidth: resource_info.available_network_bandwidth,
617            active_allocations: allocations_count,
618            last_updated: SystemTime::now(),
619        }
620    }
621
622    async fn set_limits(
623        &self,
624        agent_id: AgentId,
625        limits: ResourceLimits,
626    ) -> Result<(), ResourceError> {
627        let mut allocations = self.allocations.write();
628        if let Some(allocation) = allocations.get_mut(&agent_id) {
629            // Update allocation fields based on limits
630            allocation.allocated_memory = limits.memory_mb * 1024 * 1024;
631            allocation.allocated_cpu_cores = limits.cpu_cores;
632            allocation.allocated_disk_io = limits.disk_io_mbps * 1024 * 1024;
633            allocation.allocated_network_io = limits.network_io_mbps * 1024 * 1024;
634            Ok(())
635        } else {
636            Err(ResourceError::AgentNotFound { agent_id })
637        }
638    }
639
640    async fn check_limits(&self, agent_id: AgentId) -> Result<bool, ResourceError> {
641        let usage_map = self.usage_tracker.read();
642        let allocations_map = self.allocations.read();
643
644        // Agent must have allocation to check limits
645        if let Some(allocation) = allocations_map.get(&agent_id) {
646            // If there's usage data, check for violations
647            if let Some(usage) = usage_map.get(&agent_id) {
648                let limits = ResourceLimits {
649                    memory_mb: allocation.allocated_memory / (1024 * 1024),
650                    cpu_cores: allocation.allocated_cpu_cores,
651                    disk_io_mbps: allocation.allocated_disk_io / (1024 * 1024),
652                    network_io_mbps: allocation.allocated_network_io / (1024 * 1024),
653                    execution_timeout: Duration::from_secs(3600),
654                    idle_timeout: Duration::from_secs(300),
655                };
656                let violations = Self::check_resource_violations(usage, &limits);
657                Ok(violations.is_empty())
658            } else {
659                // No usage data yet - assume within limits since no actual usage recorded
660                Ok(true)
661            }
662        } else {
663            Err(ResourceError::AgentNotFound { agent_id })
664        }
665    }
666
667    async fn check_resource_violations(
668        &self,
669        agent_id: AgentId,
670    ) -> Result<Vec<ResourceViolation>, ResourceError> {
671        let usage_map = self.usage_tracker.read();
672        let allocations_map = self.allocations.read();
673
674        if let (Some(usage), Some(allocation)) =
675            (usage_map.get(&agent_id), allocations_map.get(&agent_id))
676        {
677            // Create limits from allocation for violation checking
678            let limits = ResourceLimits {
679                memory_mb: allocation.allocated_memory / (1024 * 1024),
680                cpu_cores: allocation.allocated_cpu_cores,
681                disk_io_mbps: allocation.allocated_disk_io / (1024 * 1024),
682                network_io_mbps: allocation.allocated_network_io / (1024 * 1024),
683                execution_timeout: Duration::from_secs(3600),
684                idle_timeout: Duration::from_secs(300),
685            };
686            Ok(Self::check_resource_violations(usage, &limits))
687        } else {
688            Err(ResourceError::AgentNotFound { agent_id })
689        }
690    }
691
692    async fn shutdown(&self) -> Result<(), ResourceError> {
693        tracing::info!("Shutting down resource manager");
694
695        *self.is_running.write() = false;
696        self.shutdown_notify.notify_waiters();
697
698        // Deallocate all resources
699        let agent_ids: Vec<AgentId> = self.allocations.read().keys().copied().collect();
700
701        for agent_id in agent_ids {
702            if let Err(e) = self.deallocate_resources(agent_id).await {
703                tracing::error!(
704                    "Failed to deallocate resources for agent {} during shutdown: {}",
705                    agent_id,
706                    e
707                );
708            }
709        }
710
711        Ok(())
712    }
713
714    async fn check_health(&self) -> Result<ComponentHealth, ResourceError> {
715        let is_running = *self.is_running.read();
716        if !is_running {
717            return Ok(ComponentHealth::unhealthy(
718                "Resource manager is shut down".to_string(),
719            ));
720        }
721
722        let system_status = self.get_system_status().await;
723        let allocations_count = self.allocations.read().len();
724
725        // Calculate resource utilization percentages
726        let memory_usage = if system_status.total_memory > 0 {
727            (system_status.total_memory - system_status.available_memory) as f64
728                / system_status.total_memory as f64
729        } else {
730            0.0
731        };
732
733        let cpu_usage = if system_status.total_cpu_cores > 0 {
734            (system_status.total_cpu_cores - system_status.available_cpu_cores) as f64
735                / system_status.total_cpu_cores as f64
736        } else {
737            0.0
738        };
739
740        let status = if memory_usage > 0.9 || cpu_usage > 0.9 {
741            ComponentHealth::unhealthy(format!(
742                "Critical resource usage - Memory: {:.1}%, CPU: {:.1}%",
743                memory_usage * 100.0,
744                cpu_usage * 100.0
745            ))
746        } else if memory_usage > 0.8 || cpu_usage > 0.8 {
747            ComponentHealth::degraded(format!(
748                "High resource usage - Memory: {:.1}%, CPU: {:.1}%",
749                memory_usage * 100.0,
750                cpu_usage * 100.0
751            ))
752        } else {
753            ComponentHealth::healthy(Some(format!(
754                "Resources available - Memory: {:.1}%, CPU: {:.1}%, {} active allocations",
755                memory_usage * 100.0,
756                cpu_usage * 100.0,
757                allocations_count
758            )))
759        };
760
761        Ok(status
762            .with_metric(
763                "memory_usage_percent".to_string(),
764                format!("{:.2}", memory_usage * 100.0),
765            )
766            .with_metric(
767                "cpu_usage_percent".to_string(),
768                format!("{:.2}", cpu_usage * 100.0),
769            )
770            .with_metric(
771                "active_allocations".to_string(),
772                allocations_count.to_string(),
773            )
774            .with_metric(
775                "available_memory_mb".to_string(),
776                (system_status.available_memory / (1024 * 1024)).to_string(),
777            )
778            .with_metric(
779                "available_cpu_cores".to_string(),
780                system_status.available_cpu_cores.to_string(),
781            ))
782    }
783}
784
785/// System resources tracking
786#[derive(Debug, Clone)]
787struct SystemResources {
788    available_memory: usize,
789    available_cpu_cores: u32,
790    available_disk_space: usize,
791    available_network_bandwidth: usize,
792    reserved_memory: usize,
793    reserved_cpu_cores: u32,
794    reserved_disk_space: usize,
795    reserved_network_bandwidth: usize,
796}
797
798impl SystemResources {
799    fn new(config: &ResourceManagerConfig) -> Self {
800        let reservation_factor = config.resource_reservation_percentage;
801
802        // Use ceiling operation to ensure we always reserve at least 1 CPU when factor > 0
803        let reserved_cpu = if reservation_factor > 0.0 {
804            ((config.total_cpu_cores as f32 * reservation_factor).ceil() as u32).max(1)
805        } else {
806            0
807        };
808        let available_cpu = config.total_cpu_cores.saturating_sub(reserved_cpu);
809
810        Self {
811            available_memory: config.total_memory
812                - (config.total_memory as f32 * reservation_factor) as usize,
813            available_cpu_cores: available_cpu,
814            available_disk_space: config.total_disk_space
815                - (config.total_disk_space as f32 * reservation_factor) as usize,
816            available_network_bandwidth: config.total_network_bandwidth
817                - (config.total_network_bandwidth as f32 * reservation_factor) as usize,
818            reserved_memory: (config.total_memory as f32 * reservation_factor) as usize,
819            reserved_cpu_cores: reserved_cpu,
820            reserved_disk_space: (config.total_disk_space as f32 * reservation_factor) as usize,
821            reserved_network_bandwidth: (config.total_network_bandwidth as f32 * reservation_factor)
822                as usize,
823        }
824    }
825
826    fn can_allocate(&self, requirements: &ResourceRequirements) -> bool {
827        self.available_memory >= requirements.max_memory_mb * 1024 * 1024
828            && self.available_cpu_cores >= requirements.max_cpu_cores as u32
829            && self.available_disk_space >= requirements.disk_space_mb * 1024 * 1024
830            && self.available_network_bandwidth >= requirements.network_bandwidth_mbps * 1024 * 1024
831    }
832
833    fn allocate(&mut self, requirements: &ResourceRequirements) -> ResourceAllocation {
834        let memory_bytes = requirements.max_memory_mb * 1024 * 1024;
835        let disk_bytes = requirements.disk_space_mb * 1024 * 1024;
836        let network_bytes = requirements.network_bandwidth_mbps * 1024 * 1024;
837
838        self.available_memory -= memory_bytes;
839        self.available_cpu_cores -= requirements.max_cpu_cores as u32;
840        self.available_disk_space -= disk_bytes;
841        self.available_network_bandwidth -= network_bytes;
842
843        ResourceAllocation {
844            agent_id: AgentId::new(), // Will be set by caller
845            allocated_memory: memory_bytes,
846            allocated_cpu_cores: requirements.max_cpu_cores,
847            allocated_disk_io: disk_bytes,
848            allocated_network_io: network_bytes,
849            allocation_time: SystemTime::now(),
850        }
851    }
852
853    fn deallocate(&mut self, allocation: &ResourceAllocation) {
854        self.available_memory += allocation.allocated_memory;
855        self.available_cpu_cores += allocation.allocated_cpu_cores as u32;
856        self.available_disk_space += allocation.allocated_disk_io;
857        self.available_network_bandwidth += allocation.allocated_network_io;
858    }
859
860    fn update_usage(&mut self, _usage: &ResourceUsage) {
861        // In a real implementation, this would update current usage metrics
862        // For now, we just track allocations vs available resources
863    }
864
865    /// Get system resource information including reservations
866    fn get_resource_info(&self) -> ResourceInfo {
867        ResourceInfo {
868            available_memory: self.available_memory,
869            available_cpu_cores: self.available_cpu_cores,
870            available_disk_space: self.available_disk_space,
871            available_network_bandwidth: self.available_network_bandwidth,
872            reserved_memory: self.reserved_memory,
873            reserved_cpu_cores: self.reserved_cpu_cores,
874            reserved_disk_space: self.reserved_disk_space,
875            reserved_network_bandwidth: self.reserved_network_bandwidth,
876        }
877    }
878}
879
880/// Resource system status
881#[derive(Debug, Clone)]
882pub struct ResourceSystemStatus {
883    pub total_memory: usize,
884    pub available_memory: usize,
885    pub total_cpu_cores: u32,
886    pub available_cpu_cores: u32,
887    pub total_disk_space: usize,
888    pub available_disk_space: usize,
889    pub total_network_bandwidth: usize,
890    pub available_network_bandwidth: usize,
891    pub active_allocations: usize,
892    pub last_updated: SystemTime,
893}
894
895/// Resource information including reservations
896#[derive(Debug, Clone)]
897pub struct ResourceInfo {
898    pub available_memory: usize,
899    pub available_cpu_cores: u32,
900    pub available_disk_space: usize,
901    pub available_network_bandwidth: usize,
902    pub reserved_memory: usize,
903    pub reserved_cpu_cores: u32,
904    pub reserved_disk_space: usize,
905    pub reserved_network_bandwidth: usize,
906}
907
908/// Resource violations
909#[derive(Debug, Clone)]
910pub enum ResourceViolation {
911    Memory { used: usize, limit: usize },
912    Cpu { used: f32, limit: f32 },
913    DiskIo { used: usize, limit: usize },
914    NetworkIo { used: usize, limit: usize },
915}
916
917/// Monitoring events for internal processing
918#[derive(Debug, Clone)]
919enum MonitoringEvent {
920    UsageUpdate {
921        agent_id: AgentId,
922        usage: ResourceUsage,
923    },
924    AllocationRequest {
925        agent_id: AgentId,
926        requirements: ResourceRequirements,
927    },
928    DeallocationRequest {
929        agent_id: AgentId,
930    },
931    LimitViolation {
932        agent_id: AgentId,
933        violations: Vec<ResourceViolation>,
934    },
935    /// Request the orchestrator (or sandbox backend) to throttle this agent:
936    /// reduce its CPU quota, pause it briefly, or otherwise slow it down.
937    /// The resource manager itself does not have the process handle, so
938    /// consumers of this event own the enforcement mechanism.
939    ThrottleRequested {
940        agent_id: AgentId,
941        consecutive_violations: u32,
942        violations: Vec<ResourceViolation>,
943    },
944    /// Request the orchestrator to kill this agent outright. Emitted either
945    /// immediately (`ViolationAction::Kill`) or after
946    /// `kill_after_sustained_violations` consecutive breaches under
947    /// `ViolationAction::Throttle`.
948    KillRequested {
949        agent_id: AgentId,
950        violations: Vec<ResourceViolation>,
951        reason: &'static str,
952    },
953}
954
955#[cfg(test)]
956mod tests {
957    use super::*;
958
959    fn create_test_requirements() -> ResourceRequirements {
960        ResourceRequirements {
961            min_memory_mb: 1,
962            max_memory_mb: 1,
963            min_cpu_cores: 1.0,
964            max_cpu_cores: 1.0,
965            disk_space_mb: 1,
966            network_bandwidth_mbps: 1,
967        }
968    }
969
970    #[tokio::test]
971    async fn test_resource_allocation() {
972        let manager = DefaultResourceManager::new(ResourceManagerConfig::default())
973            .await
974            .unwrap();
975        let agent_id = AgentId::new();
976        let requirements = create_test_requirements();
977
978        let allocation = manager
979            .allocate_resources(agent_id, requirements)
980            .await
981            .unwrap();
982        assert_eq!(allocation.allocated_memory, 1024 * 1024);
983        assert_eq!(allocation.allocated_cpu_cores, 1.0);
984    }
985
986    #[tokio::test]
987    async fn test_resource_deallocation() {
988        let manager = DefaultResourceManager::new(ResourceManagerConfig::default())
989            .await
990            .unwrap();
991        let agent_id = AgentId::new();
992        let requirements = create_test_requirements();
993
994        manager
995            .allocate_resources(agent_id, requirements)
996            .await
997            .unwrap();
998        let result = manager.deallocate_resources(agent_id).await;
999        assert!(result.is_ok());
1000    }
1001
1002    #[tokio::test]
1003    async fn test_usage_tracking() {
1004        let manager = DefaultResourceManager::new(ResourceManagerConfig::default())
1005            .await
1006            .unwrap();
1007        let agent_id = AgentId::new();
1008        let requirements = create_test_requirements();
1009
1010        manager
1011            .allocate_resources(agent_id, requirements)
1012            .await
1013            .unwrap();
1014
1015        let usage = ResourceUsage {
1016            memory_used: 512 * 1024, // 512KB
1017            cpu_utilization: 0.5,
1018            disk_io_rate: 512 * 1024,
1019            network_io_rate: 512,
1020            uptime: Duration::from_secs(60),
1021        };
1022
1023        manager.update_usage(agent_id, usage.clone()).await.unwrap();
1024
1025        tokio::time::sleep(Duration::from_millis(20)).await;
1026
1027        let retrieved_usage = manager.get_usage(agent_id).await.unwrap();
1028        assert_eq!(retrieved_usage.memory_used, usage.memory_used);
1029        assert_eq!(retrieved_usage.cpu_utilization, usage.cpu_utilization);
1030    }
1031
1032    #[tokio::test]
1033    async fn test_system_status() {
1034        let manager = DefaultResourceManager::new(ResourceManagerConfig::default())
1035            .await
1036            .unwrap();
1037        let status = manager.get_system_status().await;
1038
1039        assert!(status.total_memory > 0);
1040        assert!(status.available_memory <= status.total_memory);
1041        assert!(status.total_cpu_cores > 0);
1042        assert!(status.available_cpu_cores <= status.total_cpu_cores);
1043    }
1044
1045    #[test]
1046    fn test_resource_violations() {
1047        let usage = ResourceUsage {
1048            memory_used: 2 * 1024 * 1024, // 2MB
1049            cpu_utilization: 2.0,
1050            disk_io_rate: 2 * 1024 * 1024,
1051            network_io_rate: 2 * 1024 * 1024, // 2MB to exceed 1MB limit
1052            uptime: Duration::from_secs(60),
1053        };
1054
1055        let limits = ResourceLimits {
1056            memory_mb: 1,
1057            cpu_cores: 1.0,
1058            disk_io_mbps: 1,
1059            network_io_mbps: 1,
1060            execution_timeout: Duration::from_secs(3600),
1061            idle_timeout: Duration::from_secs(300),
1062        };
1063
1064        let violations = DefaultResourceManager::check_resource_violations(&usage, &limits);
1065        assert_eq!(violations.len(), 4); // All resources exceeded
1066    }
1067}