impl PlatformResourceController {
async fn evaluate_resource_request(
&self,
current_usage: &ResourceUsage,
_op_type: &OperationType,
priority: &OperationPriority,
estimated_memory_mb: f64,
) -> Result<ResourceAction> {
if *priority == OperationPriority::Critical {
if current_usage.memory_pressure == ResourcePressure::Critical {
return Ok(ResourceAction::EmergencyStop);
}
return Ok(ResourceAction::Allow);
}
let projected_memory = current_usage.memory_mb + estimated_memory_mb;
if projected_memory > self.limits.max_memory_mb {
if *priority <= OperationPriority::Medium {
return Ok(ResourceAction::Reject {
reason: format!(
"Memory limit exceeded: {:.1}MB + {:.1}MB > {:.1}MB",
current_usage.memory_mb, estimated_memory_mb, self.limits.max_memory_mb
),
});
} else {
let wait_time = self.estimate_resource_wait_time().await;
return Ok(ResourceAction::Queue {
estimated_wait_ms: wait_time,
});
}
}
if current_usage.cpu_utilization > self.limits.max_cpu_utilization {
match priority {
OperationPriority::Critical | OperationPriority::High => {
let delay = ((current_usage.cpu_utilization - self.limits.max_cpu_utilization)
* 1000.0) as u64;
return Ok(ResourceAction::Throttle {
delay_ms: delay.min(5000), });
}
_ => {
return Ok(ResourceAction::Reject {
reason: format!(
"CPU utilization too high: {:.1}% > {:.1}%",
current_usage.cpu_utilization * 100.0,
self.limits.max_cpu_utilization * 100.0
),
});
}
}
}
if current_usage.active_operations >= self.limits.max_concurrent_ops {
if *priority >= OperationPriority::High {
let wait_time = self.estimate_operation_wait_time().await;
return Ok(ResourceAction::Queue {
estimated_wait_ms: wait_time,
});
}
return Ok(ResourceAction::Reject {
reason: format!(
"Too many concurrent operations: {} >= {}",
current_usage.active_operations, self.limits.max_concurrent_ops
),
});
}
Ok(ResourceAction::Allow)
}
async fn measure_current_usage(
limits: &ResourceLimits,
active_ops: &Arc<RwLock<HashMap<String, OperationContext>>>,
) -> ResourceUsage {
let ops = active_ops.read().await;
let active_count = ops.len();
let estimated_memory: f64 =
ops.values().map(|ctx| ctx.estimated_memory_mb).sum::<f64>() + 100.0;
let estimated_cpu = ops
.values()
.map(|ctx| {
let age_factor = (ctx.started_at.elapsed().as_secs() as f64 / 60.0).min(1.0);
match ctx.operation_type {
OperationType::Analysis => 0.3 * age_factor,
OperationType::Commit => 0.1 * age_factor,
OperationType::Background => 0.2 * age_factor,
OperationType::Storage => 0.15 * age_factor,
OperationType::Cleanup => 0.05 * age_factor,
}
})
.sum::<f64>()
.min(1.0);
let memory_pressure = if estimated_memory > limits.max_memory_mb {
ResourcePressure::Critical
} else if estimated_memory > limits.max_memory_mb * 0.9 {
ResourcePressure::High
} else if estimated_memory > limits.max_memory_mb * limits.memory_warning_threshold {
ResourcePressure::Medium
} else {
ResourcePressure::Low
};
let cpu_pressure = if estimated_cpu > limits.max_cpu_utilization {
ResourcePressure::Critical
} else if estimated_cpu > limits.max_cpu_utilization * 0.9 {
ResourcePressure::High
} else if estimated_cpu > limits.max_cpu_utilization * limits.cpu_warning_threshold {
ResourcePressure::Medium
} else {
ResourcePressure::Low
};
ResourceUsage {
timestamp: Instant::now(),
memory_mb: estimated_memory,
cpu_utilization: estimated_cpu,
active_operations: active_count,
memory_pressure,
cpu_pressure,
}
}
async fn estimate_resource_wait_time(&self) -> u64 {
let ops = self.active_operations.read().await;
if ops.is_empty() {
return 100; }
let oldest_age = ops
.values()
.filter(|ctx| ctx.priority != OperationPriority::Critical)
.map(|ctx| ctx.started_at.elapsed().as_millis() as u64)
.max()
.unwrap_or(1000);
(oldest_age / 2).clamp(500, 30000) }
async fn estimate_operation_wait_time(&self) -> u64 {
let available_permits = self.operation_semaphore.available_permits();
if available_permits > 0 {
return 100;
}
let ops = self.active_operations.read().await;
let avg_age = if ops.is_empty() {
5000 } else {
let total_age: u64 = ops
.values()
.map(|ctx| ctx.started_at.elapsed().as_millis() as u64)
.sum();
total_age / ops.len() as u64
};
(avg_age / ops.len() as u64).clamp(1000, 15000) }
async fn emergency_cleanup(&self) -> Result<()> {
let ops = self.active_operations.read().await;
let low_priority_count = ops
.values()
.filter(|ctx| ctx.priority == OperationPriority::Low)
.count();
println!(
"EMERGENCY: Would cancel {low_priority_count} low-priority operations due to resource pressure"
);
Ok(())
}
async fn log_enforcement_event(
&self,
operation_id: String,
action: ResourceAction,
usage: ResourceUsage,
reason: String,
) {
let event = EnforcementEvent {
timestamp: Instant::now(),
operation_id,
action,
resource_usage: usage,
reason,
};
let mut history = self.enforcement_history.write().await;
history.push(event);
if history.len() > 1000 {
history.drain(..500); }
}
pub async fn get_current_usage(&self) -> ResourceUsage {
self.current_usage.read().await.clone()
}
pub async fn get_enforcement_stats(&self) -> ResourceEnforcementStats {
let history = self.enforcement_history.read().await;
let recent_events: Vec<_> = history
.iter()
.filter(|e| e.timestamp.elapsed() < Duration::from_secs(300)) .collect();
let total_requests = recent_events.len();
let allowed = recent_events
.iter()
.filter(|e| matches!(e.action, ResourceAction::Allow))
.count();
let throttled = recent_events
.iter()
.filter(|e| matches!(e.action, ResourceAction::Throttle { .. }))
.count();
let queued = recent_events
.iter()
.filter(|e| matches!(e.action, ResourceAction::Queue { .. }))
.count();
let rejected = recent_events
.iter()
.filter(|e| matches!(e.action, ResourceAction::Reject { .. }))
.count();
ResourceEnforcementStats {
total_requests,
allowed_requests: allowed,
throttled_requests: throttled,
queued_requests: queued,
rejected_requests: rejected,
current_active_operations: {
let ops = self.active_operations.read().await;
ops.len()
},
}
}
pub async fn update_limits(&mut self, new_limits: ResourceLimits) {
self.limits = new_limits.clone();
if new_limits.max_concurrent_ops != self.limits.max_concurrent_ops {
self.operation_semaphore = Arc::new(Semaphore::new(new_limits.max_concurrent_ops));
}
}
}