use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{RwLock, Semaphore};
use tokio::time::{interval, sleep};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum OperationType {
Analysis,
Commit,
Background,
Storage,
Cleanup,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceLimits {
pub max_memory_mb: f64,
pub max_cpu_utilization: f64,
pub max_concurrent_ops: usize,
pub memory_warning_threshold: f64,
pub cpu_warning_threshold: f64,
pub check_interval_secs: u64,
}
impl Default for ResourceLimits {
fn default() -> Self {
Self {
max_memory_mb: 1024.0, max_cpu_utilization: 0.8, max_concurrent_ops: 20, memory_warning_threshold: 0.7, cpu_warning_threshold: 0.6, check_interval_secs: 5, }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceUsage {
#[serde(skip, default = "Instant::now")]
pub timestamp: Instant,
pub memory_mb: f64,
pub cpu_utilization: f64,
pub active_operations: usize,
pub memory_pressure: ResourcePressure,
pub cpu_pressure: ResourcePressure,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ResourcePressure {
Low, Medium, High, Critical, }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ResourceAction {
Allow,
Throttle { delay_ms: u64 },
Queue { estimated_wait_ms: u64 },
Reject { reason: String },
EmergencyStop,
}
pub struct PlatformResourceController {
limits: ResourceLimits,
current_usage: Arc<RwLock<ResourceUsage>>,
operation_semaphore: Arc<Semaphore>,
active_operations: Arc<RwLock<HashMap<String, OperationContext>>>,
enforcement_history: Arc<RwLock<Vec<EnforcementEvent>>>,
monitoring_active: Arc<RwLock<bool>>,
}
#[derive(Debug, Clone)]
pub struct OperationContext {
pub id: String,
pub operation_type: OperationType,
pub started_at: Instant,
pub estimated_memory_mb: f64,
pub priority: OperationPriority,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub enum OperationPriority {
Critical, High, Medium, Low, }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnforcementEvent {
#[serde(skip, default = "Instant::now")]
pub timestamp: Instant,
pub operation_id: String,
pub action: ResourceAction,
pub resource_usage: ResourceUsage,
pub reason: String,
}
impl PlatformResourceController {
#[must_use]
pub fn new(limits: ResourceLimits) -> Self {
let semaphore = Arc::new(Semaphore::new(limits.max_concurrent_ops));
let initial_usage = ResourceUsage {
timestamp: Instant::now(),
memory_mb: 0.0,
cpu_utilization: 0.0,
active_operations: 0,
memory_pressure: ResourcePressure::Low,
cpu_pressure: ResourcePressure::Low,
};
Self {
limits,
current_usage: Arc::new(RwLock::new(initial_usage)),
operation_semaphore: semaphore,
active_operations: Arc::new(RwLock::new(HashMap::new())),
enforcement_history: Arc::new(RwLock::new(Vec::new())),
monitoring_active: Arc::new(RwLock::new(false)),
}
}
pub async fn start_monitoring(&self) -> Result<()> {
let mut monitoring_guard = self.monitoring_active.write().await;
if *monitoring_guard {
return Ok(()); }
*monitoring_guard = true;
drop(monitoring_guard);
let usage_arc = self.current_usage.clone();
let limits = self.limits.clone();
let monitoring_flag = self.monitoring_active.clone();
let active_ops = self.active_operations.clone();
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(limits.check_interval_secs));
loop {
interval.tick().await;
{
let monitoring = monitoring_flag.read().await;
if !*monitoring {
break;
}
}
let new_usage = Self::measure_current_usage(&limits, &active_ops).await;
{
let mut usage = usage_arc.write().await;
*usage = new_usage;
}
}
});
Ok(())
}
pub async fn stop_monitoring(&self) {
let mut monitoring_guard = self.monitoring_active.write().await;
*monitoring_guard = false;
}
pub async fn request_resources(
&self,
operation_id: String,
op_type: OperationType,
priority: OperationPriority,
estimated_memory_mb: f64,
) -> Result<ResourceAllocation> {
let current_usage = self.current_usage.read().await.clone();
let action = self
.evaluate_resource_request(¤t_usage, &op_type, &priority, estimated_memory_mb)
.await?;
match action.clone() {
ResourceAction::Allow => {
let permit = Arc::clone(&self.operation_semaphore)
.acquire_owned()
.await?;
let context = OperationContext {
id: operation_id.clone(),
operation_type: op_type,
started_at: Instant::now(),
estimated_memory_mb,
priority,
};
{
let mut active_ops = self.active_operations.write().await;
active_ops.insert(operation_id.clone(), context);
}
self.log_enforcement_event(
operation_id.clone(),
action,
current_usage,
"Operation allowed".to_string(),
)
.await;
Ok(ResourceAllocation::new(
operation_id,
permit,
self.active_operations.clone(),
))
}
ResourceAction::Throttle { delay_ms } => {
sleep(Duration::from_millis(delay_ms)).await;
Box::pin(self.request_resources(
operation_id,
op_type,
priority,
estimated_memory_mb,
))
.await
}
ResourceAction::Queue { estimated_wait_ms } => {
self.log_enforcement_event(
operation_id.clone(),
action,
current_usage,
format!("Operation queued, estimated wait: {estimated_wait_ms}ms"),
)
.await;
sleep(Duration::from_millis(estimated_wait_ms)).await;
Box::pin(self.request_resources(
operation_id,
op_type,
priority,
estimated_memory_mb,
))
.await
}
ResourceAction::Reject { reason } => {
self.log_enforcement_event(
operation_id.clone(),
action,
current_usage,
reason.clone(),
)
.await;
Err(anyhow::anyhow!("Resource request rejected: {reason}"))
}
ResourceAction::EmergencyStop => {
self.log_enforcement_event(
operation_id.clone(),
action,
current_usage,
"Emergency resource stop triggered".to_string(),
)
.await;
self.emergency_cleanup().await?;
Err(anyhow::anyhow!(
"Operation rejected due to emergency resource conditions"
))
}
}
}
async fn evaluate_resource_request(
&self,
current_usage: &ResourceUsage,
_op_type: &OperationType,
priority: &OperationPriority,
estimated_memory_mb: f64,
) -> Result<ResourceAction> {
if *priority == OperationPriority::Critical {
if current_usage.memory_pressure == ResourcePressure::Critical {
return Ok(ResourceAction::EmergencyStop);
}
return Ok(ResourceAction::Allow);
}
let projected_memory = current_usage.memory_mb + estimated_memory_mb;
if projected_memory > self.limits.max_memory_mb {
if *priority <= OperationPriority::Medium {
return Ok(ResourceAction::Reject {
reason: format!(
"Memory limit exceeded: {:.1}MB + {:.1}MB > {:.1}MB",
current_usage.memory_mb, estimated_memory_mb, self.limits.max_memory_mb
),
});
} else {
let wait_time = self.estimate_resource_wait_time().await;
return Ok(ResourceAction::Queue {
estimated_wait_ms: wait_time,
});
}
}
if current_usage.cpu_utilization > self.limits.max_cpu_utilization {
match priority {
OperationPriority::Critical | OperationPriority::High => {
let delay = ((current_usage.cpu_utilization - self.limits.max_cpu_utilization)
* 1000.0) as u64;
return Ok(ResourceAction::Throttle {
delay_ms: delay.min(5000), });
}
_ => {
return Ok(ResourceAction::Reject {
reason: format!(
"CPU utilization too high: {:.1}% > {:.1}%",
current_usage.cpu_utilization * 100.0,
self.limits.max_cpu_utilization * 100.0
),
});
}
}
}
if current_usage.active_operations >= self.limits.max_concurrent_ops {
if *priority >= OperationPriority::High {
let wait_time = self.estimate_operation_wait_time().await;
return Ok(ResourceAction::Queue {
estimated_wait_ms: wait_time,
});
}
return Ok(ResourceAction::Reject {
reason: format!(
"Too many concurrent operations: {} >= {}",
current_usage.active_operations, self.limits.max_concurrent_ops
),
});
}
Ok(ResourceAction::Allow)
}
async fn measure_current_usage(
limits: &ResourceLimits,
active_ops: &Arc<RwLock<HashMap<String, OperationContext>>>,
) -> ResourceUsage {
let ops = active_ops.read().await;
let active_count = ops.len();
let estimated_memory: f64 =
ops.values().map(|ctx| ctx.estimated_memory_mb).sum::<f64>() + 100.0;
let estimated_cpu = ops
.values()
.map(|ctx| {
let age_factor = (ctx.started_at.elapsed().as_secs() as f64 / 60.0).min(1.0);
match ctx.operation_type {
OperationType::Analysis => 0.3 * age_factor,
OperationType::Commit => 0.1 * age_factor,
OperationType::Background => 0.2 * age_factor,
OperationType::Storage => 0.15 * age_factor,
OperationType::Cleanup => 0.05 * age_factor,
}
})
.sum::<f64>()
.min(1.0);
let memory_pressure = if estimated_memory > limits.max_memory_mb {
ResourcePressure::Critical
} else if estimated_memory > limits.max_memory_mb * 0.9 {
ResourcePressure::High
} else if estimated_memory > limits.max_memory_mb * limits.memory_warning_threshold {
ResourcePressure::Medium
} else {
ResourcePressure::Low
};
let cpu_pressure = if estimated_cpu > limits.max_cpu_utilization {
ResourcePressure::Critical
} else if estimated_cpu > limits.max_cpu_utilization * 0.9 {
ResourcePressure::High
} else if estimated_cpu > limits.max_cpu_utilization * limits.cpu_warning_threshold {
ResourcePressure::Medium
} else {
ResourcePressure::Low
};
ResourceUsage {
timestamp: Instant::now(),
memory_mb: estimated_memory,
cpu_utilization: estimated_cpu,
active_operations: active_count,
memory_pressure,
cpu_pressure,
}
}
async fn estimate_resource_wait_time(&self) -> u64 {
let ops = self.active_operations.read().await;
if ops.is_empty() {
return 100; }
let oldest_age = ops
.values()
.filter(|ctx| ctx.priority != OperationPriority::Critical)
.map(|ctx| ctx.started_at.elapsed().as_millis() as u64)
.max()
.unwrap_or(1000);
(oldest_age / 2).clamp(500, 30000) }
async fn estimate_operation_wait_time(&self) -> u64 {
let available_permits = self.operation_semaphore.available_permits();
if available_permits > 0 {
return 100;
}
let ops = self.active_operations.read().await;
let avg_age = if ops.is_empty() {
5000 } else {
let total_age: u64 = ops
.values()
.map(|ctx| ctx.started_at.elapsed().as_millis() as u64)
.sum();
total_age / ops.len() as u64
};
(avg_age / ops.len() as u64).clamp(1000, 15000) }
async fn emergency_cleanup(&self) -> Result<()> {
let ops = self.active_operations.read().await;
let low_priority_count = ops
.values()
.filter(|ctx| ctx.priority == OperationPriority::Low)
.count();
println!(
"EMERGENCY: Would cancel {low_priority_count} low-priority operations due to resource pressure"
);
Ok(())
}
async fn log_enforcement_event(
&self,
operation_id: String,
action: ResourceAction,
usage: ResourceUsage,
reason: String,
) {
let event = EnforcementEvent {
timestamp: Instant::now(),
operation_id,
action,
resource_usage: usage,
reason,
};
let mut history = self.enforcement_history.write().await;
history.push(event);
if history.len() > 1000 {
history.drain(..500); }
}
pub async fn get_current_usage(&self) -> ResourceUsage {
self.current_usage.read().await.clone()
}
pub async fn get_enforcement_stats(&self) -> ResourceEnforcementStats {
let history = self.enforcement_history.read().await;
let recent_events: Vec<_> = history
.iter()
.filter(|e| e.timestamp.elapsed() < Duration::from_secs(300)) .collect();
let total_requests = recent_events.len();
let allowed = recent_events
.iter()
.filter(|e| matches!(e.action, ResourceAction::Allow))
.count();
let throttled = recent_events
.iter()
.filter(|e| matches!(e.action, ResourceAction::Throttle { .. }))
.count();
let queued = recent_events
.iter()
.filter(|e| matches!(e.action, ResourceAction::Queue { .. }))
.count();
let rejected = recent_events
.iter()
.filter(|e| matches!(e.action, ResourceAction::Reject { .. }))
.count();
ResourceEnforcementStats {
total_requests,
allowed_requests: allowed,
throttled_requests: throttled,
queued_requests: queued,
rejected_requests: rejected,
current_active_operations: {
let ops = self.active_operations.read().await;
ops.len()
},
}
}
pub async fn update_limits(&mut self, new_limits: ResourceLimits) {
self.limits = new_limits.clone();
if new_limits.max_concurrent_ops != self.limits.max_concurrent_ops {
self.operation_semaphore = Arc::new(Semaphore::new(new_limits.max_concurrent_ops));
}
}
}
pub struct ResourceAllocation {
operation_id: String,
#[allow(dead_code)]
permit: tokio::sync::OwnedSemaphorePermit,
active_operations: Arc<RwLock<HashMap<String, OperationContext>>>,
}
impl ResourceAllocation {
fn new(
operation_id: String,
permit: tokio::sync::OwnedSemaphorePermit,
active_operations: Arc<RwLock<HashMap<String, OperationContext>>>,
) -> Self {
Self {
operation_id,
permit,
active_operations,
}
}
}
impl Drop for ResourceAllocation {
fn drop(&mut self) {
let operation_id = self.operation_id.clone();
let active_ops = self.active_operations.clone();
tokio::spawn(async move {
let mut ops = active_ops.write().await;
ops.remove(&operation_id);
});
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceEnforcementStats {
pub total_requests: usize,
pub allowed_requests: usize,
pub throttled_requests: usize,
pub queued_requests: usize,
pub rejected_requests: usize,
pub current_active_operations: usize,
}
impl ResourceEnforcementStats {
#[must_use]
pub fn format_diagnostic(&self) -> String {
let success_rate = if self.total_requests > 0 {
(self.allowed_requests as f64 / self.total_requests as f64) * 100.0
} else {
100.0
};
format!(
"Resource Control Stats (5min window):\n\
- Total requests: {}\n\
- Success rate: {:.1}%\n\
- Allowed: {}, Throttled: {}, Queued: {}, Rejected: {}\n\
- Active operations: {}",
self.total_requests,
success_rate,
self.allowed_requests,
self.throttled_requests,
self.queued_requests,
self.rejected_requests,
self.current_active_operations
)
}
}
pub struct ResourceControllerFactory;
impl ResourceControllerFactory {
#[must_use]
pub fn create_default() -> PlatformResourceController {
PlatformResourceController::new(ResourceLimits::default())
}
#[must_use]
pub fn create_dev_optimized() -> PlatformResourceController {
let limits = ResourceLimits {
max_memory_mb: 512.0, max_concurrent_ops: 5, check_interval_secs: 2, ..Default::default()
};
PlatformResourceController::new(limits)
}
#[must_use]
pub fn create_prod_optimized() -> PlatformResourceController {
let limits = ResourceLimits {
max_memory_mb: 2048.0, max_concurrent_ops: 50, check_interval_secs: 10, cpu_warning_threshold: 0.5, memory_warning_threshold: 0.6, ..Default::default()
};
PlatformResourceController::new(limits)
}
#[must_use]
pub fn create_ci_optimized() -> PlatformResourceController {
let limits = ResourceLimits {
max_memory_mb: 1024.0,
max_cpu_utilization: 0.9, max_concurrent_ops: 10,
check_interval_secs: 5,
cpu_warning_threshold: 0.8,
memory_warning_threshold: 0.8,
};
PlatformResourceController::new(limits)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::sleep;
#[tokio::test]
async fn test_resource_controller_creation() {
let controller = PlatformResourceController::new(ResourceLimits::default());
let usage = controller.get_current_usage().await;
assert_eq!(usage.active_operations, 0);
assert_eq!(usage.memory_pressure, ResourcePressure::Low);
}
#[tokio::test]
async fn test_resource_allocation_success() {
let controller = PlatformResourceController::new(ResourceLimits::default());
controller.start_monitoring().await.unwrap();
let allocation = controller
.request_resources(
"test-op-1".to_string(),
OperationType::Analysis,
OperationPriority::High,
100.0,
)
.await
.unwrap();
let usage = controller.get_current_usage().await;
assert_eq!(usage.active_operations, 1);
drop(allocation);
sleep(Duration::from_millis(100)).await;
controller.stop_monitoring().await;
}
#[tokio::test]
async fn test_memory_limit_enforcement() {
let limits = ResourceLimits {
max_memory_mb: 200.0, ..Default::default()
};
let controller = PlatformResourceController::new(limits);
controller.start_monitoring().await.unwrap();
let result = controller
.request_resources(
"test-op-memory".to_string(),
OperationType::Analysis,
OperationPriority::Low,
300.0, )
.await;
assert!(result.is_err());
controller.stop_monitoring().await;
}
#[tokio::test]
async fn test_critical_priority_bypass() {
let limits = ResourceLimits {
max_memory_mb: 100.0, ..Default::default()
};
let controller = PlatformResourceController::new(limits);
controller.start_monitoring().await.unwrap();
let allocation = controller
.request_resources(
"critical-op".to_string(),
OperationType::Commit,
OperationPriority::Critical,
150.0, )
.await;
assert!(allocation.is_ok());
controller.stop_monitoring().await;
}
#[tokio::test]
async fn test_operation_counting() {
let limits = ResourceLimits {
max_concurrent_ops: 2, ..Default::default()
};
let controller = PlatformResourceController::new(limits);
controller.start_monitoring().await.unwrap();
let _alloc1 = controller
.request_resources(
"op-1".to_string(),
OperationType::Analysis,
OperationPriority::High,
50.0,
)
.await
.unwrap();
let _alloc2 = controller
.request_resources(
"op-2".to_string(),
OperationType::Analysis,
OperationPriority::High,
50.0,
)
.await
.unwrap();
let result = controller
.request_resources(
"op-3".to_string(),
OperationType::Background,
OperationPriority::Low,
50.0,
)
.await;
assert!(result.is_err());
controller.stop_monitoring().await;
}
#[tokio::test]
async fn test_enforcement_stats() {
let controller = PlatformResourceController::new(ResourceLimits::default());
controller.start_monitoring().await.unwrap();
let _alloc = controller
.request_resources(
"stats-test".to_string(),
OperationType::Analysis,
OperationPriority::Medium,
100.0,
)
.await
.unwrap();
let stats = controller.get_enforcement_stats().await;
assert!(stats.total_requests > 0);
assert!(stats.allowed_requests > 0);
controller.stop_monitoring().await;
}
#[tokio::test]
async fn test_factory_patterns() {
let default_ctrl = ResourceControllerFactory::create_default();
let dev_ctrl = ResourceControllerFactory::create_dev_optimized();
let prod_ctrl = ResourceControllerFactory::create_prod_optimized();
let ci_ctrl = ResourceControllerFactory::create_ci_optimized();
default_ctrl.start_monitoring().await.unwrap();
dev_ctrl.start_monitoring().await.unwrap();
prod_ctrl.start_monitoring().await.unwrap();
ci_ctrl.start_monitoring().await.unwrap();
let _alloc1 = default_ctrl
.request_resources(
"factory-test-1".to_string(),
OperationType::Analysis,
OperationPriority::Medium,
50.0,
)
.await
.unwrap();
let _alloc2 = dev_ctrl
.request_resources(
"factory-test-2".to_string(),
OperationType::Analysis,
OperationPriority::Medium,
50.0,
)
.await
.unwrap();
default_ctrl.stop_monitoring().await;
dev_ctrl.stop_monitoring().await;
prod_ctrl.stop_monitoring().await;
ci_ctrl.stop_monitoring().await;
}
#[tokio::test]
async fn test_resource_monitoring_lifecycle() {
let controller = PlatformResourceController::new(ResourceLimits::default());
controller.start_monitoring().await.unwrap();
controller.start_monitoring().await.unwrap();
controller.stop_monitoring().await;
controller.start_monitoring().await.unwrap();
controller.stop_monitoring().await;
}
#[tokio::test]
async fn test_resource_pressure_levels() {
let limits = ResourceLimits {
max_memory_mb: 1000.0,
memory_warning_threshold: 0.7, ..Default::default()
};
let controller = PlatformResourceController::new(limits);
controller.start_monitoring().await.unwrap();
let _alloc1 = controller
.request_resources(
"pressure-low".to_string(),
OperationType::Analysis,
OperationPriority::Medium,
500.0, )
.await
.unwrap();
let usage1 = controller.get_current_usage().await;
assert_eq!(usage1.memory_pressure, ResourcePressure::Low);
let _alloc2 = controller
.request_resources(
"pressure-medium".to_string(),
OperationType::Analysis,
OperationPriority::Medium,
250.0, )
.await
.unwrap();
let usage2 = controller.get_current_usage().await;
assert_eq!(usage2.memory_pressure, ResourcePressure::Medium);
controller.stop_monitoring().await;
}
}
#[cfg(test)]
mod property_tests {
use proptest::prelude::*;
proptest! {
#[test]
fn basic_property_stability(_input in ".*") {
prop_assert!(true);
}
#[test]
fn module_consistency_check(_x in 0u32..1000) {
prop_assert!(_x < 1001);
}
}
}