use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct CpuQuota {
pub max_execution_time_us: u64,
pub max_time_per_second_us: u64,
pub max_instructions: u64,
pub priority: u8,
}
impl CpuQuota {
pub fn unlimited() -> Self {
Self {
max_execution_time_us: u64::MAX,
max_time_per_second_us: 1_000_000,
max_instructions: u64::MAX,
priority: 50,
}
}
pub fn with_limits(execution_time_us: u64, time_per_second_us: u64) -> Self {
Self {
max_execution_time_us: execution_time_us,
max_time_per_second_us: time_per_second_us,
max_instructions: u64::MAX,
priority: 50,
}
}
pub fn with_priority(mut self, priority: u8) -> Self {
self.priority = priority.min(100);
self
}
pub fn max_execution_duration(&self) -> Duration {
Duration::from_micros(self.max_execution_time_us)
}
pub fn utilization_percent(&self) -> u8 {
((self.max_time_per_second_us as f64 / 1_000_000.0) * 100.0).min(100.0) as u8
}
}
#[derive(Debug)]
pub struct ResourceMonitor {
agent_id: [u8; 16],
history: ResourceHistory,
detector: AnomalyDetector,
predictor: ResourcePredictor,
}
impl ResourceMonitor {
pub fn new(agent_id: [u8; 16]) -> Self {
Self {
agent_id,
history: ResourceHistory::new(agent_id, HistoryConfig::default()),
detector: AnomalyDetector::new(AnomalyConfig::default()),
predictor: ResourcePredictor::new(),
}
}
pub fn with_config(
agent_id: [u8; 16],
history_config: HistoryConfig,
anomaly_config: AnomalyConfig,
) -> Self {
Self {
agent_id,
history: ResourceHistory::new(agent_id, history_config),
detector: AnomalyDetector::new(anomaly_config),
predictor: ResourcePredictor::new(),
}
}
pub fn agent_id(&self) -> &[u8; 16] {
&self.agent_id
}
pub fn history(&self) -> &ResourceHistory {
&self.history
}
pub fn detector(&self) -> &AnomalyDetector {
&self.detector
}
pub fn predictor(&self) -> &ResourcePredictor {
&self.predictor
}
pub fn record_and_analyze(
&mut self,
usage: &ResourceUsage,
current_time_us: u64,
) -> Vec<Anomaly> {
self.history.record(usage, current_time_us);
self.detector.analyze(&self.history, current_time_us)
}
pub fn maybe_record_and_analyze(
&mut self,
usage: &ResourceUsage,
current_time_us: u64,
) -> Vec<Anomaly> {
if self.history.maybe_record(usage, current_time_us) {
self.detector.analyze(&self.history, current_time_us)
} else {
Vec::new()
}
}
pub fn predict(&self, horizon_us: u64) -> Option<ResourcePrediction> {
self.predictor.predict(&self.history, horizon_us)
}
pub fn predict_quota_breach(&self, quota: &ResourceQuota) -> Option<u64> {
self.predictor.predict_quota_breach(&self.history, quota)
}
pub fn summary(&self) -> MonitorSummary {
let peak_memory = self.history.peak_memory();
let peak_cpu = self.history.peak_cpu();
let avg = self.history.average();
let latest = self.history.latest().cloned();
MonitorSummary {
agent_id: self.agent_id,
snapshot_count: self.history.len(),
peak_memory_bytes: peak_memory,
peak_cpu_time_us: peak_cpu,
average_memory_bytes: avg.as_ref().map(|a| a.memory_bytes).unwrap_or(0),
average_cpu_time_us: avg.as_ref().map(|a| a.cpu_time_us).unwrap_or(0),
latest_snapshot: latest,
anomaly_count: self.detector.anomalies().len(),
high_severity_anomalies: self.detector.high_severity_anomalies(0.7).len(),
}
}
pub fn clear(&mut self) {
self.history.clear();
self.detector.clear();
}
}
#[derive(Debug)]
pub struct QuotaEnforcer {
quota: ResourceQuota,
usage: ResourceUsage,
last_second_reset: Instant,
violation_count: u64,
}
impl QuotaEnforcer {
pub fn new(quota: ResourceQuota) -> Self {
Self {
quota,
usage: ResourceUsage::default(),
last_second_reset: Instant::now(),
violation_count: 0,
}
}
pub fn quota(&self) -> &ResourceQuota {
&self.quota
}
pub fn set_quota(&mut self, quota: ResourceQuota) {
self.quota = quota;
}
pub fn usage(&self) -> &ResourceUsage {
&self.usage
}
fn maybe_reset_second_counters(&mut self) {
if self.last_second_reset.elapsed() >= Duration::from_secs(1) {
self.usage.cpu_time_this_second_us = 0;
self.usage.bytes_sent_this_second = 0;
self.usage.bytes_received_this_second = 0;
self.usage.messages_this_second = 0;
self.last_second_reset = Instant::now();
}
}
pub fn check(&mut self) -> QuotaCheckResult {
self.maybe_reset_second_counters();
let mut result = QuotaCheckResult::ok();
if !self.quota.enforced {
return result;
}
if self.usage.heap_bytes > self.quota.memory.max_heap_bytes {
result = result.with_violation(QuotaViolation::HeapMemoryExceeded);
}
if self.usage.stack_bytes > self.quota.memory.max_stack_bytes {
result = result.with_violation(QuotaViolation::StackMemoryExceeded);
}
if self.usage.total_memory_bytes > self.quota.memory.max_total_bytes {
result = result.with_violation(QuotaViolation::TotalMemoryExceeded);
}
if self.usage.total_memory_bytes > self.quota.memory.total_soft_limit() {
result = result.with_violation(QuotaViolation::MemorySoftLimitReached);
}
if self.usage.cpu_time_this_second_us > self.quota.cpu.max_time_per_second_us {
result = result.with_violation(QuotaViolation::CpuRateLimitExceeded);
}
if self.usage.instructions_executed > self.quota.cpu.max_instructions {
result = result.with_violation(QuotaViolation::InstructionLimitExceeded);
}
if self.usage.bytes_sent_this_second > self.quota.network.max_send_bytes_per_sec {
result = result.with_violation(QuotaViolation::SendBandwidthExceeded);
}
if self.usage.bytes_received_this_second > self.quota.network.max_recv_bytes_per_sec {
result = result.with_violation(QuotaViolation::RecvBandwidthExceeded);
}
if self.usage.network_bytes_this_second() > self.quota.network.max_total_bytes_per_sec {
result = result.with_violation(QuotaViolation::TotalBandwidthExceeded);
}
if self.usage.active_connections > self.quota.network.max_connections {
result = result.with_violation(QuotaViolation::ConnectionLimitExceeded);
}
if self.usage.messages_this_second > self.quota.network.max_messages_per_sec {
result = result.with_violation(QuotaViolation::MessageRateExceeded);
}
if self.usage.persistent_storage_bytes > self.quota.storage.max_persistent_bytes {
result = result.with_violation(QuotaViolation::PersistentStorageExceeded);
}
if self.usage.temp_storage_bytes > self.quota.storage.max_temp_bytes {
result = result.with_violation(QuotaViolation::TempStorageExceeded);
}
if self.usage.file_count > self.quota.storage.max_files {
result = result.with_violation(QuotaViolation::FileCountExceeded);
}
if !result.within_limits {
self.violation_count += 1;
}
result
}
pub fn can_allocate_memory(&self, bytes: u64) -> bool {
if !self.quota.enforced {
return true;
}
self.usage.heap_bytes + bytes <= self.quota.memory.max_heap_bytes
&& self.usage.total_memory_bytes + bytes <= self.quota.memory.max_total_bytes
}
pub fn record_allocation(&mut self, bytes: u64) {
self.usage.heap_bytes += bytes;
self.usage.total_memory_bytes += bytes;
}
pub fn record_deallocation(&mut self, bytes: u64) {
self.usage.heap_bytes = self.usage.heap_bytes.saturating_sub(bytes);
self.usage.total_memory_bytes = self.usage.total_memory_bytes.saturating_sub(bytes);
}
pub fn can_use_cpu(&mut self, time_us: u64) -> bool {
self.maybe_reset_second_counters();
if !self.quota.enforced {
return true;
}
self.usage.cpu_time_this_second_us + time_us <= self.quota.cpu.max_time_per_second_us
}
pub fn record_cpu_time(&mut self, time_us: u64) {
self.maybe_reset_second_counters();
self.usage.cpu_time_this_second_us += time_us;
self.usage.total_cpu_time_us += time_us;
}
pub fn can_send(&mut self, bytes: u64) -> bool {
self.maybe_reset_second_counters();
if !self.quota.enforced {
return true;
}
self.usage.bytes_sent_this_second + bytes <= self.quota.network.max_send_bytes_per_sec
}
pub fn record_send(&mut self, bytes: u64) {
self.maybe_reset_second_counters();
self.usage.bytes_sent_this_second += bytes;
self.usage.total_bytes_sent += bytes;
}
pub fn can_receive(&mut self, bytes: u64) -> bool {
self.maybe_reset_second_counters();
if !self.quota.enforced {
return true;
}
self.usage.bytes_received_this_second + bytes <= self.quota.network.max_recv_bytes_per_sec
}
pub fn record_receive(&mut self, bytes: u64) {
self.maybe_reset_second_counters();
self.usage.bytes_received_this_second += bytes;
self.usage.total_bytes_received += bytes;
}
pub fn can_send_message(&mut self) -> bool {
self.maybe_reset_second_counters();
if !self.quota.enforced {
return true;
}
self.usage.messages_this_second < self.quota.network.max_messages_per_sec
}
pub fn record_message(&mut self) {
self.maybe_reset_second_counters();
self.usage.messages_this_second += 1;
}
pub fn can_open_connection(&self) -> bool {
if !self.quota.enforced {
return true;
}
self.usage.active_connections < self.quota.network.max_connections
}
pub fn record_connection_open(&mut self) {
self.usage.active_connections += 1;
}
pub fn record_connection_close(&mut self) {
self.usage.active_connections = self.usage.active_connections.saturating_sub(1);
}
pub fn can_use_storage(&self, bytes: u64, persistent: bool) -> bool {
if !self.quota.enforced {
return true;
}
if persistent {
self.usage.persistent_storage_bytes + bytes <= self.quota.storage.max_persistent_bytes
} else {
self.usage.temp_storage_bytes + bytes <= self.quota.storage.max_temp_bytes
}
}
pub fn record_storage(&mut self, bytes: u64, persistent: bool) {
if persistent {
self.usage.persistent_storage_bytes += bytes;
} else {
self.usage.temp_storage_bytes += bytes;
}
}
pub fn record_storage_freed(&mut self, bytes: u64, persistent: bool) {
if persistent {
self.usage.persistent_storage_bytes =
self.usage.persistent_storage_bytes.saturating_sub(bytes);
} else {
self.usage.temp_storage_bytes = self.usage.temp_storage_bytes.saturating_sub(bytes);
}
}
pub fn can_create_file(&self, size: u64) -> bool {
if !self.quota.enforced {
return true;
}
self.usage.file_count < self.quota.storage.max_files
&& size <= self.quota.storage.max_file_size
}
pub fn record_file_created(&mut self) {
self.usage.file_count += 1;
}
pub fn record_file_deleted(&mut self) {
self.usage.file_count = self.usage.file_count.saturating_sub(1);
}
pub fn violation_count(&self) -> u64 {
self.violation_count
}
pub fn reset_usage(&mut self) {
self.usage = ResourceUsage::default();
self.last_second_reset = Instant::now();
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceSnapshot {
pub timestamp_us: u64,
pub memory_bytes: u64,
pub cpu_time_us: u64,
pub network_sent_bytes: u64,
pub network_recv_bytes: u64,
pub storage_bytes: u64,
}
impl ResourceSnapshot {
pub fn from_usage(usage: &ResourceUsage, timestamp_us: u64) -> Self {
Self {
timestamp_us,
memory_bytes: usage.total_memory_bytes,
cpu_time_us: usage.total_cpu_time_us,
network_sent_bytes: usage.total_bytes_sent,
network_recv_bytes: usage.total_bytes_received,
storage_bytes: usage.total_storage_bytes(),
}
}
pub fn rate_from(&self, other: &ResourceSnapshot) -> Option<ResourceRate> {
let time_delta_us = self.timestamp_us.checked_sub(other.timestamp_us)?;
if time_delta_us == 0 {
return None;
}
let time_delta_secs = time_delta_us as f64 / 1_000_000.0;
Some(ResourceRate {
memory_bytes_per_sec: (self.memory_bytes as f64 - other.memory_bytes as f64)
/ time_delta_secs,
cpu_percent: ((self.cpu_time_us - other.cpu_time_us) as f64 / time_delta_us as f64)
* 100.0,
network_sent_bytes_per_sec: (self.network_sent_bytes - other.network_sent_bytes) as f64
/ time_delta_secs,
network_recv_bytes_per_sec: (self.network_recv_bytes - other.network_recv_bytes) as f64
/ time_delta_secs,
storage_bytes_per_sec: (self.storage_bytes as f64 - other.storage_bytes as f64)
/ time_delta_secs,
})
}
}
#[derive(Debug, Clone, Copy)]
pub struct HistoryConfig {
pub max_snapshots: usize,
pub min_interval_us: u64,
}
impl HistoryConfig {
pub fn high_resolution() -> Self {
Self {
max_snapshots: 10000,
min_interval_us: 100_000,
}
}
pub fn low_resolution() -> Self {
Self {
max_snapshots: 100,
min_interval_us: 60_000_000,
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct MemoryQuota {
pub max_heap_bytes: u64,
pub max_stack_bytes: u64,
pub max_total_bytes: u64,
pub soft_limit_percent: u8,
}
impl MemoryQuota {
pub fn unlimited() -> Self {
Self {
max_heap_bytes: u64::MAX,
max_stack_bytes: u64::MAX,
max_total_bytes: u64::MAX,
soft_limit_percent: 100,
}
}
pub fn with_limits(heap: u64, stack: u64, total: u64) -> Self {
Self {
max_heap_bytes: heap,
max_stack_bytes: stack,
max_total_bytes: total,
soft_limit_percent: 80,
}
}
pub fn heap_soft_limit(&self) -> u64 {
self.max_heap_bytes * u64::from(self.soft_limit_percent) / 100
}
pub fn total_soft_limit(&self) -> u64 {
self.max_total_bytes * u64::from(self.soft_limit_percent) / 100
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct NetworkQuota {
pub max_send_bytes_per_sec: u64,
pub max_recv_bytes_per_sec: u64,
pub max_total_bytes_per_sec: u64,
pub max_connections: u32,
pub max_messages_per_sec: u32,
}
impl NetworkQuota {
pub fn unlimited() -> Self {
Self {
max_send_bytes_per_sec: u64::MAX,
max_recv_bytes_per_sec: u64::MAX,
max_total_bytes_per_sec: u64::MAX,
max_connections: u32::MAX,
max_messages_per_sec: u32::MAX,
}
}
pub fn with_bandwidth(send: u64, recv: u64) -> Self {
Self {
max_send_bytes_per_sec: send,
max_recv_bytes_per_sec: recv,
max_total_bytes_per_sec: send + recv,
max_connections: 100,
max_messages_per_sec: 1000,
}
}
pub fn with_connection_limit(mut self, max_connections: u32) -> Self {
self.max_connections = max_connections;
self
}
}
#[derive(Debug, Default)]
pub struct AtomicResourceCounter {
value: AtomicU64,
}
impl AtomicResourceCounter {
pub fn new(initial: u64) -> Self {
Self {
value: AtomicU64::new(initial),
}
}
pub fn get(&self) -> u64 {
self.value.load(Ordering::Relaxed)
}
pub fn add(&self, amount: u64) {
self.value.fetch_add(amount, Ordering::Relaxed);
}
pub fn sub(&self, amount: u64) {
let mut current = self.value.load(Ordering::Relaxed);
loop {
let new = current.saturating_sub(amount);
match self.value.compare_exchange_weak(
current,
new,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => current = x,
}
}
}
pub fn reset(&self) {
self.value.store(0, Ordering::Relaxed);
}
}
#[derive(Debug, Clone)]
pub struct QuotaCheckResult {
pub within_limits: bool,
pub violations: Vec<QuotaViolation>,
pub warnings: Vec<QuotaViolation>,
}
impl QuotaCheckResult {
pub fn ok() -> Self {
Self {
within_limits: true,
violations: Vec::new(),
warnings: Vec::new(),
}
}
pub fn with_violation(mut self, violation: QuotaViolation) -> Self {
if violation.is_warning() {
self.warnings.push(violation);
} else {
self.within_limits = false;
self.violations.push(violation);
}
self
}
pub fn has_warnings(&self) -> bool {
!self.warnings.is_empty()
}
}
#[derive(Debug)]
pub struct ResourceHistory {
agent_id: [u8; 16],
config: HistoryConfig,
snapshots: Vec<ResourceSnapshot>,
last_snapshot_us: u64,
}
impl ResourceHistory {
pub fn new(agent_id: [u8; 16], config: HistoryConfig) -> Self {
Self {
agent_id,
config,
snapshots: Vec::new(),
last_snapshot_us: 0,
}
}
pub fn agent_id(&self) -> &[u8; 16] {
&self.agent_id
}
pub fn len(&self) -> usize {
self.snapshots.len()
}
pub fn is_empty(&self) -> bool {
self.snapshots.is_empty()
}
pub fn maybe_record(&mut self, usage: &ResourceUsage, current_time_us: u64) -> bool {
if current_time_us.saturating_sub(self.last_snapshot_us) < self.config.min_interval_us {
return false;
}
let snapshot = ResourceSnapshot::from_usage(usage, current_time_us);
self.snapshots.push(snapshot);
self.last_snapshot_us = current_time_us;
while self.snapshots.len() > self.config.max_snapshots {
self.snapshots.remove(0);
}
true
}
pub fn record(&mut self, usage: &ResourceUsage, current_time_us: u64) {
let snapshot = ResourceSnapshot::from_usage(usage, current_time_us);
self.snapshots.push(snapshot);
self.last_snapshot_us = current_time_us;
while self.snapshots.len() > self.config.max_snapshots {
self.snapshots.remove(0);
}
}
pub fn latest(&self) -> Option<&ResourceSnapshot> {
self.snapshots.last()
}
pub fn snapshots(&self) -> &[ResourceSnapshot] {
&self.snapshots
}
pub fn snapshots_in_range(&self, start_us: u64, end_us: u64) -> Vec<&ResourceSnapshot> {
self.snapshots
.iter()
.filter(|s| s.timestamp_us >= start_us && s.timestamp_us <= end_us)
.collect()
}
pub fn average(&self) -> Option<ResourceSnapshot> {
if self.snapshots.is_empty() {
return None;
}
let count = self.snapshots.len() as u64;
let sum_memory: u64 = self.snapshots.iter().map(|s| s.memory_bytes).sum();
let sum_cpu: u64 = self.snapshots.iter().map(|s| s.cpu_time_us).sum();
let sum_net_sent: u64 = self.snapshots.iter().map(|s| s.network_sent_bytes).sum();
let sum_net_recv: u64 = self.snapshots.iter().map(|s| s.network_recv_bytes).sum();
let sum_storage: u64 = self.snapshots.iter().map(|s| s.storage_bytes).sum();
Some(ResourceSnapshot {
timestamp_us: self.last_snapshot_us,
memory_bytes: sum_memory / count,
cpu_time_us: sum_cpu / count,
network_sent_bytes: sum_net_sent / count,
network_recv_bytes: sum_net_recv / count,
storage_bytes: sum_storage / count,
})
}
pub fn peak_memory(&self) -> u64 {
self.snapshots
.iter()
.map(|s| s.memory_bytes)
.max()
.unwrap_or(0)
}
pub fn peak_cpu(&self) -> u64 {
self.snapshots
.iter()
.map(|s| s.cpu_time_us)
.max()
.unwrap_or(0)
}
pub fn clear(&mut self) {
self.snapshots.clear();
self.last_snapshot_us = 0;
}
}
#[derive(Debug, Clone)]
pub struct Anomaly {
pub anomaly_type: AnomalyType,
pub detected_at_us: u64,
pub severity: f64,
pub current_value: f64,
pub expected_value: f64,
pub description: String,
}
#[derive(Debug, Clone, Copy)]
pub struct AnomalyConfig {
pub memory_spike_threshold: f64,
pub cpu_spike_threshold: f64,
pub network_spike_threshold: f64,
pub storage_spike_threshold: f64,
pub leak_detection_samples: usize,
pub leak_growth_threshold: f64,
}
impl AnomalyConfig {
pub fn strict() -> Self {
Self {
memory_spike_threshold: 1.5,
cpu_spike_threshold: 2.0,
network_spike_threshold: 3.0,
storage_spike_threshold: 1.5,
leak_detection_samples: 5,
leak_growth_threshold: 0.8,
}
}
pub fn lenient() -> Self {
Self {
memory_spike_threshold: 3.0,
cpu_spike_threshold: 5.0,
network_spike_threshold: 10.0,
storage_spike_threshold: 3.0,
leak_detection_samples: 20,
leak_growth_threshold: 0.98,
}
}
}
#[derive(Debug)]
pub struct ResourceManager {
enforcers: HashMap<[u8; 16], QuotaEnforcer>,
default_quota: ResourceQuota,
global_memory_limit: u64,
total_memory_used: AtomicResourceCounter,
}
impl ResourceManager {
pub fn new() -> Self {
Self {
enforcers: HashMap::new(),
default_quota: ResourceQuota::default(),
global_memory_limit: 64 * 1024 * 1024 * 1024,
total_memory_used: AtomicResourceCounter::new(0),
}
}
pub fn set_default_quota(&mut self, quota: ResourceQuota) {
self.default_quota = quota;
}
pub fn set_global_memory_limit(&mut self, limit: u64) {
self.global_memory_limit = limit;
}
pub fn register_agent(&mut self, agent_id: [u8; 16]) {
self.register_agent_with_quota(agent_id, self.default_quota.clone());
}
pub fn register_agent_with_quota(&mut self, agent_id: [u8; 16], quota: ResourceQuota) {
self.enforcers.insert(agent_id, QuotaEnforcer::new(quota));
}
pub fn unregister_agent(&mut self, agent_id: &[u8; 16]) {
if let Some(enforcer) = self.enforcers.remove(agent_id) {
self.total_memory_used
.sub(enforcer.usage().total_memory_bytes);
}
}
pub fn get_enforcer(&self, agent_id: &[u8; 16]) -> Option<&QuotaEnforcer> {
self.enforcers.get(agent_id)
}
pub fn get_enforcer_mut(&mut self, agent_id: &[u8; 16]) -> Option<&mut QuotaEnforcer> {
self.enforcers.get_mut(agent_id)
}
pub fn update_quota(&mut self, agent_id: &[u8; 16], quota: ResourceQuota) {
if let Some(enforcer) = self.enforcers.get_mut(agent_id) {
enforcer.set_quota(quota);
}
}
pub fn can_allocate_global(&self, bytes: u64) -> bool {
self.total_memory_used.get() + bytes <= self.global_memory_limit
}
pub fn record_global_allocation(&self, bytes: u64) {
self.total_memory_used.add(bytes);
}
pub fn record_global_deallocation(&self, bytes: u64) {
self.total_memory_used.sub(bytes);
}
pub fn total_memory_used(&self) -> u64 {
self.total_memory_used.get()
}
pub fn agent_count(&self) -> usize {
self.enforcers.len()
}
pub fn usage_summary(&self) -> Vec<([u8; 16], ResourceUsage)> {
self.enforcers
.iter()
.map(|(id, e)| (*id, e.usage().clone()))
.collect()
}
}
#[derive(Debug, Clone)]
pub struct MonitorSummary {
pub agent_id: [u8; 16],
pub snapshot_count: usize,
pub peak_memory_bytes: u64,
pub peak_cpu_time_us: u64,
pub average_memory_bytes: u64,
pub average_cpu_time_us: u64,
pub latest_snapshot: Option<ResourceSnapshot>,
pub anomaly_count: usize,
pub high_severity_anomalies: usize,
}
#[derive(Debug)]
pub struct AnomalyDetector {
config: AnomalyConfig,
anomalies: Vec<Anomaly>,
max_anomalies: usize,
}
impl AnomalyDetector {
pub fn new(config: AnomalyConfig) -> Self {
Self {
config,
anomalies: Vec::new(),
max_anomalies: 1000,
}
}
pub fn config(&self) -> &AnomalyConfig {
&self.config
}
pub fn set_config(&mut self, config: AnomalyConfig) {
self.config = config;
}
pub fn analyze(&mut self, history: &ResourceHistory, current_time_us: u64) -> Vec<Anomaly> {
let mut new_anomalies = Vec::new();
if history.len() < 2 {
return new_anomalies;
}
let avg = match history.average() {
Some(a) => a,
None => return new_anomalies,
};
let latest = match history.latest() {
Some(l) => l,
None => return new_anomalies,
};
if avg.memory_bytes > 0 {
let ratio = latest.memory_bytes as f64 / avg.memory_bytes as f64;
if ratio > self.config.memory_spike_threshold {
let severity = ((ratio - 1.0) / self.config.memory_spike_threshold).min(1.0);
new_anomalies.push(Anomaly {
anomaly_type: AnomalyType::MemorySpike,
detected_at_us: current_time_us,
severity,
current_value: latest.memory_bytes as f64,
expected_value: avg.memory_bytes as f64,
description: format!("Memory usage is {:.1}x the average", ratio),
});
}
}
if history.len() >= self.config.leak_detection_samples {
let samples: Vec<_> = history
.snapshots()
.iter()
.rev()
.take(self.config.leak_detection_samples)
.collect();
let growing_count = samples
.windows(2)
.filter(|w| w[0].memory_bytes > w[1].memory_bytes)
.count();
let growth_ratio = growing_count as f64 / (samples.len() - 1) as f64;
if growth_ratio >= self.config.leak_growth_threshold {
let first_mem = samples.last().map(|s| s.memory_bytes).unwrap_or(0);
let last_mem = samples.first().map(|s| s.memory_bytes).unwrap_or(0);
let total_growth = if first_mem > 0 {
(last_mem as f64 / first_mem as f64) - 1.0
} else {
0.0
};
new_anomalies.push(Anomaly {
anomaly_type: AnomalyType::MemoryLeak,
detected_at_us: current_time_us,
severity: growth_ratio,
current_value: last_mem as f64,
expected_value: first_mem as f64,
description: format!(
"Memory grew {:.1}% over {} samples ({:.1}% of samples showed growth)",
total_growth * 100.0,
samples.len(),
growth_ratio * 100.0
),
});
}
}
let avg_network = avg.network_sent_bytes + avg.network_recv_bytes;
let latest_network = latest.network_sent_bytes + latest.network_recv_bytes;
if avg_network > 0 {
let ratio = latest_network as f64 / avg_network as f64;
if ratio > self.config.network_spike_threshold {
let severity = ((ratio - 1.0) / self.config.network_spike_threshold).min(1.0);
new_anomalies.push(Anomaly {
anomaly_type: AnomalyType::NetworkSpike,
detected_at_us: current_time_us,
severity,
current_value: latest_network as f64,
expected_value: avg_network as f64,
description: format!("Network traffic is {:.1}x the average", ratio),
});
}
}
if avg.storage_bytes > 0 {
let ratio = latest.storage_bytes as f64 / avg.storage_bytes as f64;
if ratio > self.config.storage_spike_threshold {
let severity = ((ratio - 1.0) / self.config.storage_spike_threshold).min(1.0);
new_anomalies.push(Anomaly {
anomaly_type: AnomalyType::StorageSpike,
detected_at_us: current_time_us,
severity,
current_value: latest.storage_bytes as f64,
expected_value: avg.storage_bytes as f64,
description: format!("Storage usage is {:.1}x the average", ratio),
});
}
}
for anomaly in &new_anomalies {
self.anomalies.push(anomaly.clone());
}
while self.anomalies.len() > self.max_anomalies {
self.anomalies.remove(0);
}
new_anomalies
}
pub fn anomalies(&self) -> &[Anomaly] {
&self.anomalies
}
pub fn anomalies_of_type(&self, anomaly_type: AnomalyType) -> Vec<&Anomaly> {
self.anomalies
.iter()
.filter(|a| a.anomaly_type == anomaly_type)
.collect()
}
pub fn anomalies_in_range(&self, start_us: u64, end_us: u64) -> Vec<&Anomaly> {
self.anomalies
.iter()
.filter(|a| a.detected_at_us >= start_us && a.detected_at_us <= end_us)
.collect()
}
pub fn high_severity_anomalies(&self, threshold: f64) -> Vec<&Anomaly> {
self.anomalies
.iter()
.filter(|a| a.severity >= threshold)
.collect()
}
pub fn clear(&mut self) {
self.anomalies.clear();
}
}
#[derive(Debug)]
pub struct ResourcePredictor {
pub(super) min_samples: usize,
}
impl ResourcePredictor {
pub fn new() -> Self {
Self { min_samples: 5 }
}
pub fn with_min_samples(mut self, min_samples: usize) -> Self {
self.min_samples = min_samples.max(2);
self
}
pub fn predict(
&self,
history: &ResourceHistory,
horizon_us: u64,
) -> Option<ResourcePrediction> {
if history.len() < self.min_samples {
return None;
}
let snapshots = history.snapshots();
let n = snapshots.len();
let memory_pred = self.linear_predict(
snapshots
.iter()
.map(|s| (s.timestamp_us as f64, s.memory_bytes as f64)),
history.latest()?.timestamp_us as f64 + horizon_us as f64,
)?;
let storage_pred = self.linear_predict(
snapshots
.iter()
.map(|s| (s.timestamp_us as f64, s.storage_bytes as f64)),
history.latest()?.timestamp_us as f64 + horizon_us as f64,
)?;
let network_rate = if n >= 2 {
let first = &snapshots[0];
let last = &snapshots[n - 1];
let time_delta = (last.timestamp_us - first.timestamp_us) as f64 / 1_000_000.0;
if time_delta > 0.0 {
let total_net = (last.network_sent_bytes + last.network_recv_bytes) as f64
- (first.network_sent_bytes + first.network_recv_bytes) as f64;
(total_net / time_delta) as u64
} else {
0
}
} else {
0
};
let cpu_avg = if n >= 2 {
let rates: Vec<f64> = snapshots
.windows(2)
.filter_map(|w| {
let time_delta = w[1].timestamp_us.saturating_sub(w[0].timestamp_us);
if time_delta > 0 {
let cpu_delta = w[1].cpu_time_us.saturating_sub(w[0].cpu_time_us);
Some((cpu_delta as f64 / time_delta as f64) * 100.0)
} else {
None
}
})
.collect();
if rates.is_empty() {
0.0
} else {
rates.iter().sum::<f64>() / rates.len() as f64
}
} else {
0.0
};
let confidence = (n as f64 / 100.0).min(0.9);
Some(ResourcePrediction {
memory_bytes: memory_pred.max(0.0) as u64,
cpu_percent: cpu_avg.clamp(0.0, 100.0),
network_bytes_per_sec: network_rate,
storage_bytes: storage_pred.max(0.0) as u64,
confidence,
horizon_us,
})
}
fn linear_predict<I>(&self, points: I, target_x: f64) -> Option<f64>
where
I: Iterator<Item = (f64, f64)>,
{
let points: Vec<(f64, f64)> = points.collect();
let n = points.len() as f64;
if n < 2.0 {
return None;
}
let sum_x: f64 = points.iter().map(|(x, _)| x).sum();
let sum_y: f64 = points.iter().map(|(_, y)| y).sum();
let sum_xy: f64 = points.iter().map(|(x, y)| x * y).sum();
let sum_x2: f64 = points.iter().map(|(x, _)| x * x).sum();
let denominator = n * sum_x2 - sum_x * sum_x;
if denominator.abs() < 1e-10 {
return Some(sum_y / n);
}
let slope = (n * sum_xy - sum_x * sum_y) / denominator;
let intercept = (sum_y - slope * sum_x) / n;
Some(slope * target_x + intercept)
}
pub fn predict_quota_breach(
&self,
history: &ResourceHistory,
quota: &ResourceQuota,
) -> Option<u64> {
if history.len() < self.min_samples {
return None;
}
let latest = history.latest()?;
let snapshots = history.snapshots();
if snapshots.len() >= 2 {
let first = &snapshots[0];
let last = &snapshots[snapshots.len() - 1];
let time_delta = (last.timestamp_us - first.timestamp_us) as f64;
if time_delta > 0.0 && last.memory_bytes > first.memory_bytes {
let memory_rate = (last.memory_bytes - first.memory_bytes) as f64 / time_delta;
let remaining = quota
.memory
.max_total_bytes
.saturating_sub(latest.memory_bytes);
if memory_rate > 0.0 {
let time_to_breach = (remaining as f64 / memory_rate) as u64;
return Some(latest.timestamp_us + time_to_breach);
}
}
}
None
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum AnomalyType {
MemorySpike,
MemoryLeak,
CpuSpike,
NetworkSpike,
StorageSpike,
ActivityDrop,
}
impl AnomalyType {
pub fn description(&self) -> &'static str {
match self {
Self::MemorySpike => "Sudden increase in memory usage",
Self::MemoryLeak => "Continuous memory growth detected",
Self::CpuSpike => "Sudden increase in CPU usage",
Self::NetworkSpike => "Sudden increase in network traffic",
Self::StorageSpike => "Sudden increase in storage usage",
Self::ActivityDrop => "Sudden drop in activity",
}
}
}
#[derive(Debug, Clone)]
pub struct ResourcePrediction {
pub memory_bytes: u64,
pub cpu_percent: f64,
pub network_bytes_per_sec: u64,
pub storage_bytes: u64,
pub confidence: f64,
pub horizon_us: u64,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct StorageQuota {
pub max_persistent_bytes: u64,
pub max_temp_bytes: u64,
pub max_files: u32,
pub max_file_size: u64,
}
impl StorageQuota {
pub fn unlimited() -> Self {
Self {
max_persistent_bytes: u64::MAX,
max_temp_bytes: u64::MAX,
max_files: u32::MAX,
max_file_size: u64::MAX,
}
}
pub fn with_limits(persistent: u64, temp: u64) -> Self {
Self {
max_persistent_bytes: persistent,
max_temp_bytes: temp,
max_files: 10000,
max_file_size: persistent / 10,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum QuotaViolation {
HeapMemoryExceeded,
StackMemoryExceeded,
TotalMemoryExceeded,
MemorySoftLimitReached,
CpuExecutionTimeExceeded,
CpuRateLimitExceeded,
InstructionLimitExceeded,
SendBandwidthExceeded,
RecvBandwidthExceeded,
TotalBandwidthExceeded,
ConnectionLimitExceeded,
MessageRateExceeded,
PersistentStorageExceeded,
TempStorageExceeded,
FileCountExceeded,
FileSizeExceeded,
}
impl QuotaViolation {
pub fn is_warning(&self) -> bool {
matches!(self, Self::MemorySoftLimitReached)
}
pub fn description(&self) -> &'static str {
match self {
Self::HeapMemoryExceeded => "Heap memory limit exceeded",
Self::StackMemoryExceeded => "Stack memory limit exceeded",
Self::TotalMemoryExceeded => "Total memory limit exceeded",
Self::MemorySoftLimitReached => "Memory soft limit reached (warning)",
Self::CpuExecutionTimeExceeded => "CPU execution time limit exceeded",
Self::CpuRateLimitExceeded => "CPU rate limit exceeded",
Self::InstructionLimitExceeded => "Instruction limit exceeded",
Self::SendBandwidthExceeded => "Send bandwidth limit exceeded",
Self::RecvBandwidthExceeded => "Receive bandwidth limit exceeded",
Self::TotalBandwidthExceeded => "Total bandwidth limit exceeded",
Self::ConnectionLimitExceeded => "Connection limit exceeded",
Self::MessageRateExceeded => "Message rate limit exceeded",
Self::PersistentStorageExceeded => "Persistent storage limit exceeded",
Self::TempStorageExceeded => "Temporary storage limit exceeded",
Self::FileCountExceeded => "File count limit exceeded",
Self::FileSizeExceeded => "Maximum file size exceeded",
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ResourceUsage {
pub heap_bytes: u64,
pub stack_bytes: u64,
pub total_memory_bytes: u64,
pub cpu_time_this_second_us: u64,
pub total_cpu_time_us: u64,
pub instructions_executed: u64,
pub bytes_sent_this_second: u64,
pub bytes_received_this_second: u64,
pub total_bytes_sent: u64,
pub total_bytes_received: u64,
pub active_connections: u32,
pub messages_this_second: u32,
pub persistent_storage_bytes: u64,
pub temp_storage_bytes: u64,
pub file_count: u32,
}
impl ResourceUsage {
pub fn network_bytes_this_second(&self) -> u64 {
self.bytes_sent_this_second + self.bytes_received_this_second
}
pub fn total_storage_bytes(&self) -> u64 {
self.persistent_storage_bytes + self.temp_storage_bytes
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceQuota {
pub memory: MemoryQuota,
pub cpu: CpuQuota,
pub network: NetworkQuota,
pub storage: StorageQuota,
pub enforced: bool,
}
impl ResourceQuota {
pub fn unlimited() -> Self {
Self {
memory: MemoryQuota::unlimited(),
cpu: CpuQuota::unlimited(),
network: NetworkQuota::unlimited(),
storage: StorageQuota::unlimited(),
enforced: false,
}
}
pub fn minimal() -> Self {
Self {
memory: MemoryQuota::with_limits(16 * 1024 * 1024, 1024 * 1024, 32 * 1024 * 1024),
cpu: CpuQuota::with_limits(1_000_000, 100_000),
network: NetworkQuota::with_bandwidth(1024 * 1024, 1024 * 1024),
storage: StorageQuota::with_limits(10 * 1024 * 1024, 1024 * 1024),
enforced: true,
}
}
pub fn high_performance() -> Self {
Self {
memory: MemoryQuota::with_limits(
4 * 1024 * 1024 * 1024,
64 * 1024 * 1024,
8 * 1024 * 1024 * 1024,
),
cpu: CpuQuota::with_limits(60_000_000, 900_000).with_priority(80),
network: NetworkQuota::with_bandwidth(100 * 1024 * 1024, 100 * 1024 * 1024)
.with_connection_limit(1000),
storage: StorageQuota::with_limits(100 * 1024 * 1024 * 1024, 10 * 1024 * 1024 * 1024),
enforced: true,
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct ResourceRate {
pub memory_bytes_per_sec: f64,
pub cpu_percent: f64,
pub network_sent_bytes_per_sec: f64,
pub network_recv_bytes_per_sec: f64,
pub storage_bytes_per_sec: f64,
}
impl ResourceRate {
pub fn is_memory_growing(&self) -> bool {
self.memory_bytes_per_sec > 0.0
}
pub fn is_storage_growing(&self) -> bool {
self.storage_bytes_per_sec > 0.0
}
pub fn total_network_bytes_per_sec(&self) -> f64 {
self.network_sent_bytes_per_sec + self.network_recv_bytes_per_sec
}
}