#![allow(missing_docs)]
use crate::types::RegionId;
use crate::types::pressure::SystemPressure;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ResourceMonitorError {
#[error("unknown resource type: {resource_type}")]
UnknownResourceType { resource_type: String },
#[error("resource monitoring is already active")]
AlreadyActive,
#[error("failed to access system resource: {reason}")]
SystemAccessFailed { reason: String },
#[error("invalid configuration: {details}")]
InvalidConfig { details: String },
#[error("degradation engine not initialized")]
EngineNotReady,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum ResourceType {
Memory,
FileDescriptors,
CpuLoad,
NetworkConnections,
Task,
Custom(String),
}
impl std::fmt::Display for ResourceType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Memory => write!(f, "memory"),
Self::FileDescriptors => write!(f, "file_descriptors"),
Self::CpuLoad => write!(f, "cpu_load"),
Self::NetworkConnections => write!(f, "network_connections"),
Self::Task => write!(f, "task"),
Self::Custom(name) => write!(f, "custom:{name}"),
}
}
}
#[derive(Debug, Clone)]
pub struct ResourceMeasurement {
pub current: u64,
pub soft_limit: u64,
pub hard_limit: u64,
pub max_limit: u64,
pub timestamp: Instant,
}
impl ResourceMeasurement {
#[must_use]
pub fn new(current: u64, soft_limit: u64, hard_limit: u64, max_limit: u64) -> Self {
Self {
current,
soft_limit,
hard_limit,
max_limit,
timestamp: Instant::now(),
}
}
#[must_use]
pub fn usage_ratio(&self) -> f64 {
if self.max_limit == 0 {
return 0.0;
}
(self.current as f64) / (self.max_limit as f64)
}
#[must_use]
pub fn is_soft_exceeded(&self) -> bool {
self.current >= self.soft_limit
}
#[must_use]
pub fn is_hard_exceeded(&self) -> bool {
self.current >= self.hard_limit
}
#[must_use]
pub fn is_critical(&self) -> bool {
self.current >= self.max_limit.saturating_sub(self.max_limit / 20) }
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum DegradationLevel {
None = 0,
Light = 1,
Moderate = 2,
Heavy = 3,
Emergency = 4,
}
impl DegradationLevel {
#[must_use]
pub fn to_headroom(self) -> f32 {
match self {
Self::None => 1.0,
Self::Light => 0.75,
Self::Moderate => 0.5,
Self::Heavy => 0.25,
Self::Emergency => 0.0,
}
}
#[must_use]
pub fn from_headroom(headroom: f32) -> Self {
if headroom > 0.875 {
Self::None
} else if headroom > 0.625 {
Self::Light
} else if headroom > 0.375 {
Self::Moderate
} else if headroom > 0.125 {
Self::Heavy
} else {
Self::Emergency
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TriggerConfig {
pub soft_threshold: f64,
pub hard_threshold: f64,
pub hysteresis: f64,
pub cooldown: Duration,
pub enabled: bool,
}
impl TriggerConfig {
#[must_use]
pub fn default_for_resource(resource_type: &ResourceType) -> Self {
match resource_type {
ResourceType::Memory => Self {
soft_threshold: 0.70, hard_threshold: 0.85, hysteresis: 0.05, cooldown: Duration::from_secs(5),
enabled: true,
},
ResourceType::FileDescriptors => Self {
soft_threshold: 0.75, hard_threshold: 0.90, hysteresis: 0.05,
cooldown: Duration::from_secs(2),
enabled: true,
},
ResourceType::CpuLoad => Self {
soft_threshold: 0.80, hard_threshold: 0.95, hysteresis: 0.10, cooldown: Duration::from_secs(3),
enabled: true,
},
ResourceType::NetworkConnections => Self {
soft_threshold: 0.70, hard_threshold: 0.85, hysteresis: 0.05,
cooldown: Duration::from_secs(1),
enabled: true,
},
ResourceType::Custom(_) => Self {
soft_threshold: 0.75, hard_threshold: 0.90,
hysteresis: 0.05,
cooldown: Duration::from_secs(5),
enabled: false, },
ResourceType::Task => Self {
soft_threshold: 0.80, hard_threshold: 0.95, hysteresis: 0.05,
cooldown: Duration::from_secs(1),
enabled: true,
},
}
}
#[must_use]
pub fn calculate_degradation(&self, measurement: &ResourceMeasurement) -> DegradationLevel {
let usage_ratio = measurement.usage_ratio();
if usage_ratio >= self.hard_threshold {
if measurement.is_critical() {
DegradationLevel::Emergency
} else {
DegradationLevel::Heavy
}
} else if usage_ratio >= self.soft_threshold {
if usage_ratio >= (self.hard_threshold - self.hysteresis) {
DegradationLevel::Moderate
} else {
DegradationLevel::Light
}
} else {
DegradationLevel::None
}
}
#[must_use]
pub fn apply_hysteresis(
&self,
new_level: DegradationLevel,
current_level: DegradationLevel,
last_change: Option<Instant>,
) -> DegradationLevel {
if let Some(last) = last_change {
if last.elapsed() < self.cooldown {
return current_level;
}
}
if new_level == DegradationLevel::Emergency {
return new_level;
}
if new_level < current_level {
let new_u8 = new_level as u8;
let current_u8 = current_level as u8;
if new_u8 <= current_u8.saturating_sub(1) {
new_level
} else {
current_level
}
} else {
new_level
}
}
}
#[derive(Debug)]
pub struct ResourcePressure {
measurements: RwLock<HashMap<ResourceType, ResourceMeasurement>>,
degradation_levels: RwLock<HashMap<ResourceType, DegradationLevel>>,
last_changes: RwLock<HashMap<ResourceType, Instant>>,
system_pressure: Arc<SystemPressure>,
monitoring_overhead: AtomicU64,
}
impl ResourcePressure {
#[must_use]
pub fn new() -> Self {
Self {
measurements: RwLock::new(HashMap::new()),
degradation_levels: RwLock::new(HashMap::new()),
last_changes: RwLock::new(HashMap::new()),
system_pressure: Arc::new(SystemPressure::new()),
monitoring_overhead: AtomicU64::new(0),
}
}
pub fn update_measurement(
&self,
resource_type: ResourceType,
measurement: ResourceMeasurement,
) {
let start = Instant::now();
{
let mut measurements = self.measurements.write();
measurements.insert(resource_type, measurement);
}
let elapsed_nanos = start.elapsed().as_nanos() as u64;
self.monitoring_overhead
.fetch_add(elapsed_nanos, Ordering::Relaxed);
}
pub fn get_measurement(&self, resource_type: &ResourceType) -> Option<ResourceMeasurement> {
self.measurements.read().get(resource_type).cloned()
}
pub fn update_degradation_level(&self, resource_type: ResourceType, level: DegradationLevel) {
let mut levels = self.degradation_levels.write();
let mut changes = self.last_changes.write();
levels.insert(resource_type.clone(), level);
changes.insert(resource_type, Instant::now());
let max_level = levels
.values()
.max()
.copied()
.unwrap_or(DegradationLevel::None);
self.system_pressure.set_headroom(max_level.to_headroom());
}
pub fn get_degradation_level(&self, resource_type: &ResourceType) -> DegradationLevel {
self.degradation_levels
.read()
.get(resource_type)
.copied()
.unwrap_or(DegradationLevel::None)
}
pub fn system_pressure(&self) -> Arc<SystemPressure> {
Arc::clone(&self.system_pressure)
}
pub fn monitoring_overhead_nanos(&self) -> u64 {
self.monitoring_overhead.load(Ordering::Relaxed)
}
pub fn composite_degradation_level(&self) -> DegradationLevel {
let levels = self.degradation_levels.read();
levels
.values()
.max()
.copied()
.unwrap_or(DegradationLevel::None)
}
}
impl Default for ResourcePressure {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
pub enum RegionPriority {
Critical = 0,
High = 1,
#[default]
Normal = 2,
Low = 3,
BestEffort = 4,
}
#[derive(Debug, Clone)]
pub enum SheddingDecision {
Keep,
Pause,
Cancel,
ForceCancel,
}
#[derive(Debug)]
pub struct DegradationEngine {
pressure: Arc<ResourcePressure>,
trigger_configs: RwLock<HashMap<ResourceType, TriggerConfig>>,
region_priorities: RwLock<HashMap<RegionId, RegionPriority>>,
active_policies: RwLock<HashMap<ResourceType, Vec<DegradationPolicy>>>,
stats: DegradationStats,
}
#[derive(Debug, Clone)]
pub struct DegradationPolicy {
pub resource_type: ResourceType,
pub trigger_level: DegradationLevel,
pub action: PolicyAction,
}
#[derive(Debug, Clone)]
pub enum PolicyAction {
RejectNewWork(RegionPriority),
CancelRegions(RegionPriority),
PauseRegions(RegionPriority),
ReduceLimits { factor: f64 },
Custom { name: String },
}
#[derive(Debug, Default)]
pub struct DegradationStats {
triggers_fired: AtomicU64,
regions_cancelled: AtomicU64,
regions_paused: AtomicU64,
requests_rejected: AtomicU64,
decision_time_nanos: AtomicU64,
}
impl DegradationEngine {
pub fn new(pressure: Arc<ResourcePressure>) -> Self {
let mut trigger_configs = HashMap::new();
for resource_type in [
ResourceType::Memory,
ResourceType::FileDescriptors,
ResourceType::CpuLoad,
ResourceType::NetworkConnections,
ResourceType::Task,
] {
trigger_configs.insert(
resource_type.clone(),
TriggerConfig::default_for_resource(&resource_type),
);
}
Self {
pressure,
trigger_configs: RwLock::new(trigger_configs),
region_priorities: RwLock::new(HashMap::new()),
active_policies: RwLock::new(HashMap::new()),
stats: DegradationStats::default(),
}
}
pub fn register_resource_type(
&self,
resource_type: ResourceType,
config: TriggerConfig,
) -> Result<(), ResourceMonitorError> {
let mut configs = self.trigger_configs.write();
configs.insert(resource_type, config);
Ok(())
}
pub fn set_region_priority(&self, region_id: RegionId, priority: RegionPriority) {
let mut priorities = self.region_priorities.write();
priorities.insert(region_id, priority);
}
pub fn add_policy(&self, policy: DegradationPolicy) {
let mut policies = self.active_policies.write();
policies
.entry(policy.resource_type.clone())
.or_default()
.push(policy);
}
pub fn process_measurements(
&self,
) -> Result<Vec<(ResourceType, DegradationLevel)>, ResourceMonitorError> {
let start = Instant::now();
let mut triggered_changes = Vec::new();
let configs = self.trigger_configs.read();
for (resource_type, config) in configs.iter() {
if !config.enabled {
continue;
}
if let Some(measurement) = self.pressure.get_measurement(resource_type) {
let new_level = config.calculate_degradation(&measurement);
let current_level = self.pressure.get_degradation_level(resource_type);
let last_change = self
.pressure
.last_changes
.read()
.get(resource_type)
.copied();
let final_level = config.apply_hysteresis(new_level, current_level, last_change);
if final_level != current_level {
self.pressure
.update_degradation_level(resource_type.clone(), final_level);
triggered_changes.push((resource_type.clone(), final_level));
self.stats.triggers_fired.fetch_add(1, Ordering::Relaxed);
self.apply_policies(resource_type, final_level)?;
}
}
}
let elapsed_nanos = start.elapsed().as_nanos() as u64;
self.stats
.decision_time_nanos
.fetch_add(elapsed_nanos, Ordering::Relaxed);
Ok(triggered_changes)
}
fn apply_policies(
&self,
resource_type: &ResourceType,
level: DegradationLevel,
) -> Result<(), ResourceMonitorError> {
let policies = self.active_policies.read();
if let Some(resource_policies) = policies.get(resource_type) {
for policy in resource_policies {
if level >= policy.trigger_level {
self.execute_policy_action(&policy.action, level)?;
}
}
}
Ok(())
}
fn execute_policy_action(
&self,
action: &PolicyAction,
_level: DegradationLevel,
) -> Result<(), ResourceMonitorError> {
match action {
PolicyAction::RejectNewWork(_priority_threshold) => {
self.stats.requests_rejected.fetch_add(1, Ordering::Relaxed);
}
PolicyAction::CancelRegions(_priority_threshold) => {
self.stats.regions_cancelled.fetch_add(1, Ordering::Relaxed);
}
PolicyAction::PauseRegions(_priority_threshold) => {
self.stats.regions_paused.fetch_add(1, Ordering::Relaxed);
}
PolicyAction::ReduceLimits { factor: _ } => {
}
PolicyAction::Custom { name: _name } => {
}
}
Ok(())
}
pub fn should_shed_region(&self, region_id: RegionId) -> SheddingDecision {
let composite_level = self.pressure.composite_degradation_level();
let priorities = self.region_priorities.read();
let region_priority = priorities.get(®ion_id).copied().unwrap_or_default();
match (composite_level, region_priority) {
(DegradationLevel::Emergency, RegionPriority::BestEffort) => {
SheddingDecision::ForceCancel
}
(DegradationLevel::Emergency, RegionPriority::Low) => SheddingDecision::Cancel,
(DegradationLevel::Emergency, RegionPriority::Normal) => SheddingDecision::Pause,
(DegradationLevel::Emergency, _) => SheddingDecision::Keep,
(DegradationLevel::Heavy, RegionPriority::BestEffort) => SheddingDecision::Cancel,
(DegradationLevel::Heavy, RegionPriority::Low) => SheddingDecision::Pause,
(DegradationLevel::Heavy, _) => SheddingDecision::Keep,
(DegradationLevel::Moderate, RegionPriority::BestEffort) => SheddingDecision::Pause,
(DegradationLevel::Moderate, _) => SheddingDecision::Keep,
(DegradationLevel::Light, RegionPriority::BestEffort) => SheddingDecision::Pause,
(DegradationLevel::Light, _) => SheddingDecision::Keep,
(DegradationLevel::None, _) => SheddingDecision::Keep,
}
}
pub fn stats(&self) -> DegradationStatsSnapshot {
DegradationStatsSnapshot {
triggers_fired: self.stats.triggers_fired.load(Ordering::Relaxed),
regions_cancelled: self.stats.regions_cancelled.load(Ordering::Relaxed),
regions_paused: self.stats.regions_paused.load(Ordering::Relaxed),
requests_rejected: self.stats.requests_rejected.load(Ordering::Relaxed),
decision_time_nanos: self.stats.decision_time_nanos.load(Ordering::Relaxed),
monitoring_overhead_nanos: self.pressure.monitoring_overhead_nanos(),
}
}
}
#[derive(Debug, Clone)]
pub struct DegradationStatsSnapshot {
pub triggers_fired: u64,
pub regions_cancelled: u64,
pub regions_paused: u64,
pub requests_rejected: u64,
pub decision_time_nanos: u64,
pub monitoring_overhead_nanos: u64,
}
impl DegradationStatsSnapshot {
#[must_use]
pub fn overhead_percentage(&self, total_runtime_nanos: u64) -> f64 {
if total_runtime_nanos == 0 {
return 0.0;
}
let total_overhead = self.decision_time_nanos + self.monitoring_overhead_nanos;
(total_overhead as f64) / (total_runtime_nanos as f64) * 100.0
}
}
fn cycle_overhead_percentage(elapsed: Duration, interval: Duration) -> f64 {
let interval_nanos = interval.as_nanos();
if interval_nanos == 0 {
return 0.0;
}
(elapsed.as_nanos() as f64) / (interval_nanos as f64) * 100.0
}
#[derive(Debug)]
#[allow(dead_code)]
pub struct SystemResourceCollector {
active: AtomicBool,
interval: Duration,
pressure: Arc<ResourcePressure>,
}
impl SystemResourceCollector {
pub fn new(pressure: Arc<ResourcePressure>, interval: Duration) -> Self {
Self {
active: AtomicBool::new(false),
interval,
pressure,
}
}
pub fn start(&self) -> Result<(), ResourceMonitorError> {
if self
.active
.compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
return Err(ResourceMonitorError::AlreadyActive);
}
Ok(())
}
pub fn stop(&self) {
self.active.store(false, Ordering::SeqCst);
}
pub fn collect_now(&self) -> Result<(), ResourceMonitorError> {
let _start = Instant::now();
if let Ok(memory_usage) = self.collect_memory_usage() {
self.pressure
.update_measurement(ResourceType::Memory, memory_usage);
}
if let Ok(fd_usage) = self.collect_fd_usage() {
self.pressure
.update_measurement(ResourceType::FileDescriptors, fd_usage);
}
if let Ok(cpu_load) = self.collect_cpu_load() {
self.pressure
.update_measurement(ResourceType::CpuLoad, cpu_load);
}
if let Ok(network_usage) = self.collect_network_usage() {
self.pressure
.update_measurement(ResourceType::NetworkConnections, network_usage);
}
Ok(())
}
fn collect_memory_usage(&self) -> Result<ResourceMeasurement, ResourceMonitorError> {
let current_bytes = 512 * 1024 * 1024; let soft_limit = 1024 * 1024 * 1024; let hard_limit = 1536 * 1024 * 1024; let max_limit = 2048 * 1024 * 1024;
Ok(ResourceMeasurement::new(
current_bytes,
soft_limit,
hard_limit,
max_limit,
))
}
fn collect_fd_usage(&self) -> Result<ResourceMeasurement, ResourceMonitorError> {
let current_fds = 128;
let soft_limit = 768; let hard_limit = 922; let max_limit = 1024;
Ok(ResourceMeasurement::new(
current_fds,
soft_limit,
hard_limit,
max_limit,
))
}
fn collect_cpu_load(&self) -> Result<ResourceMeasurement, ResourceMonitorError> {
let load_avg_1min = 80; let soft_limit = 80;
let hard_limit = 95;
let max_limit = 100;
Ok(ResourceMeasurement::new(
load_avg_1min,
soft_limit,
hard_limit,
max_limit,
))
}
fn collect_network_usage(&self) -> Result<ResourceMeasurement, ResourceMonitorError> {
let current_connections = 50;
let soft_limit = 350; let hard_limit = 425; let max_limit = 500;
Ok(ResourceMeasurement::new(
current_connections,
soft_limit,
hard_limit,
max_limit,
))
}
pub fn is_active(&self) -> bool {
self.active.load(Ordering::Relaxed)
}
}
#[derive(Debug)]
pub struct ResourceMonitor {
pressure: Arc<ResourcePressure>,
engine: Arc<DegradationEngine>,
collector: SystemResourceCollector,
config: RwLock<MonitorConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MonitorConfig {
pub collection_interval: Duration,
pub enable_auto_degradation: bool,
pub max_overhead_percent: f64,
}
impl Default for MonitorConfig {
fn default() -> Self {
Self {
collection_interval: Duration::from_secs(1),
enable_auto_degradation: true,
max_overhead_percent: 0.5, }
}
}
impl ResourceMonitor {
#[must_use]
pub fn new(config: MonitorConfig) -> Self {
let pressure = Arc::new(ResourcePressure::new());
let engine = Arc::new(DegradationEngine::new(Arc::clone(&pressure)));
let collector =
SystemResourceCollector::new(Arc::clone(&pressure), config.collection_interval);
Self {
pressure,
engine,
collector,
config: RwLock::new(config),
}
}
pub fn start(&self) -> Result<(), ResourceMonitorError> {
self.collector.start()
}
pub fn stop(&self) {
self.collector.stop();
}
pub fn pressure(&self) -> Arc<ResourcePressure> {
Arc::clone(&self.pressure)
}
pub fn engine(&self) -> Arc<DegradationEngine> {
Arc::clone(&self.engine)
}
pub fn update_config(&self, new_config: MonitorConfig) {
let mut config = self.config.write();
*config = new_config;
}
pub fn process_current_state(
&self,
) -> Result<Vec<(ResourceType, DegradationLevel)>, ResourceMonitorError> {
let cycle_start = Instant::now();
self.collector.collect_now()?;
let changes = self.engine.process_measurements()?;
let config = self.config.read();
if config.enable_auto_degradation {
let overhead_percent =
cycle_overhead_percentage(cycle_start.elapsed(), config.collection_interval);
if overhead_percent > config.max_overhead_percent {
crate::tracing_compat::warn!(
overhead_percent,
collection_interval_ms = config.collection_interval.as_millis(),
max_overhead_percent = config.max_overhead_percent,
"resource monitoring overhead exceeds configured limit"
);
}
}
Ok(changes)
}
pub fn status_report(&self) -> ResourceMonitorStatus {
let measurements: HashMap<ResourceType, ResourceMeasurement> =
self.pressure.measurements.read().clone();
let degradation_levels: HashMap<ResourceType, DegradationLevel> =
self.pressure.degradation_levels.read().clone();
ResourceMonitorStatus {
is_active: self.collector.is_active(),
composite_degradation_level: self.pressure.composite_degradation_level(),
measurements,
degradation_levels,
stats: self.engine.stats(),
config: self.config.read().clone(),
}
}
}
#[derive(Debug, Clone)]
pub struct ResourceMonitorStatus {
pub is_active: bool,
pub composite_degradation_level: DegradationLevel,
pub measurements: HashMap<ResourceType, ResourceMeasurement>,
pub degradation_levels: HashMap<ResourceType, DegradationLevel>,
pub stats: DegradationStatsSnapshot,
pub config: MonitorConfig,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_resource_measurement_ratios() {
let measurement = ResourceMeasurement::new(750, 800, 900, 1000);
assert_eq!(measurement.usage_ratio(), 0.75);
assert!(!measurement.is_soft_exceeded());
assert!(!measurement.is_hard_exceeded());
assert!(!measurement.is_critical());
}
#[test]
fn test_degradation_level_conversion() {
assert_eq!(DegradationLevel::None.to_headroom(), 1.0);
assert_eq!(DegradationLevel::Emergency.to_headroom(), 0.0);
assert_eq!(DegradationLevel::from_headroom(0.9), DegradationLevel::None);
assert_eq!(
DegradationLevel::from_headroom(0.1),
DegradationLevel::Emergency
);
}
#[test]
fn test_trigger_config_degradation_calculation() {
let config = TriggerConfig::default_for_resource(&ResourceType::Memory);
let measurement = ResourceMeasurement::new(800, 700, 850, 1000);
let level = config.calculate_degradation(&measurement);
assert_eq!(level, DegradationLevel::Moderate);
}
#[test]
fn test_resource_pressure_updates() {
let pressure = ResourcePressure::new();
let measurement = ResourceMeasurement::new(500, 700, 850, 1000);
pressure.update_measurement(ResourceType::Memory, measurement.clone());
let retrieved = pressure.get_measurement(&ResourceType::Memory).unwrap();
assert_eq!(retrieved.current, measurement.current);
}
#[test]
fn test_resource_pressure_system_pressure_matches_degradation_band() {
let pressure = ResourcePressure::new();
let system_pressure = pressure.system_pressure();
pressure.update_degradation_level(ResourceType::Memory, DegradationLevel::None);
assert!((system_pressure.headroom() - 1.0).abs() < f32::EPSILON);
assert_eq!(system_pressure.degradation_level(), 0);
assert_eq!(system_pressure.level_label(), "normal");
pressure.update_degradation_level(ResourceType::Memory, DegradationLevel::Light);
assert!((system_pressure.headroom() - 0.75).abs() < f32::EPSILON);
assert_eq!(system_pressure.degradation_level(), 1);
assert_eq!(system_pressure.level_label(), "light");
pressure.update_degradation_level(ResourceType::Memory, DegradationLevel::Moderate);
assert!((system_pressure.headroom() - 0.5).abs() < f32::EPSILON);
assert_eq!(system_pressure.degradation_level(), 2);
assert_eq!(system_pressure.level_label(), "moderate");
pressure.update_degradation_level(ResourceType::Memory, DegradationLevel::Heavy);
assert!((system_pressure.headroom() - 0.25).abs() < f32::EPSILON);
assert_eq!(system_pressure.degradation_level(), 3);
assert_eq!(system_pressure.level_label(), "heavy");
pressure.update_degradation_level(ResourceType::Memory, DegradationLevel::Emergency);
assert!(system_pressure.headroom().abs() < f32::EPSILON);
assert_eq!(system_pressure.degradation_level(), 4);
assert_eq!(system_pressure.level_label(), "emergency");
}
#[test]
fn test_degradation_engine_policies() {
let pressure = Arc::new(ResourcePressure::new());
let engine = DegradationEngine::new(Arc::clone(&pressure));
let policy = DegradationPolicy {
resource_type: ResourceType::Memory,
trigger_level: DegradationLevel::Moderate,
action: PolicyAction::RejectNewWork(RegionPriority::Low),
};
engine.add_policy(policy);
let region_id = RegionId::new_ephemeral();
engine.set_region_priority(region_id, RegionPriority::Low);
pressure.update_degradation_level(ResourceType::Memory, DegradationLevel::Heavy);
let decision = engine.should_shed_region(region_id);
assert!(matches!(decision, SheddingDecision::Pause));
}
#[test]
fn test_degradation_engine_monitors_task_pressure_by_default() {
let pressure = Arc::new(ResourcePressure::new());
let engine = DegradationEngine::new(Arc::clone(&pressure));
pressure.update_measurement(
ResourceType::Task,
ResourceMeasurement::new(960, 800, 950, 1000),
);
let changes = engine
.process_measurements()
.expect("task pressure should process");
assert_eq!(
changes,
vec![(ResourceType::Task, DegradationLevel::Emergency)]
);
assert_eq!(
pressure.get_degradation_level(&ResourceType::Task),
DegradationLevel::Emergency
);
}
#[test]
fn test_cycle_overhead_percentage_uses_configured_interval() {
let overhead =
cycle_overhead_percentage(Duration::from_millis(25), Duration::from_millis(100));
assert!((overhead - 25.0).abs() < f64::EPSILON);
}
#[test]
fn test_cycle_overhead_percentage_handles_zero_interval() {
assert_eq!(
cycle_overhead_percentage(Duration::from_millis(25), Duration::ZERO),
0.0
);
}
}