use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
#[derive(Debug, Clone)]
pub struct PerformanceSample {
pub timestamp: Instant,
pub analysis_duration_ms: u64,
pub cache_hit_ratio: f32,
pub memory_usage_mb: f32,
pub cpu_utilization: f32,
pub queue_depth: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdaptiveConfig {
pub target_analysis_time_ms: u64,
pub min_cache_hit_ratio: f32,
pub max_memory_mb: f32,
pub max_cpu_utilization: f32,
pub sample_window_size: usize,
pub adjustment_sensitivity: f32,
}
impl Default for AdaptiveConfig {
fn default() -> Self {
Self {
target_analysis_time_ms: 100, min_cache_hit_ratio: 0.6, max_memory_mb: 512.0, max_cpu_utilization: 0.8, sample_window_size: 50, adjustment_sensitivity: 0.1, }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ThresholdAdjustment {
ScaleUp {
cache_factor: f32,
permit_factor: f32,
},
ScaleDown {
cache_factor: f32,
permit_factor: f32,
},
MoreCompression { compression_level: u8 },
LessCompression { compression_level: u8 },
Maintain,
}
pub struct AdaptiveThresholdManager {
config: AdaptiveConfig,
performance_history: Arc<RwLock<VecDeque<PerformanceSample>>>,
current_thresholds: Arc<RwLock<CurrentThresholds>>,
adjustment_history: Arc<RwLock<VecDeque<ThresholdAdjustment>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CurrentThresholds {
pub hot_cache_size: usize,
pub high_priority_permits: usize,
pub low_priority_permits: usize,
pub compression_level: u8,
pub archive_after_hours: u32,
pub cleanup_interval_minutes: u32,
}
impl Default for CurrentThresholds {
fn default() -> Self {
Self {
hot_cache_size: 1000,
high_priority_permits: 10,
low_priority_permits: 2,
compression_level: 4, archive_after_hours: 24 * 30, cleanup_interval_minutes: 60, }
}
}
impl AdaptiveThresholdManager {
#[must_use]
pub fn new(config: AdaptiveConfig) -> Self {
Self {
config,
performance_history: Arc::new(RwLock::new(VecDeque::new())),
current_thresholds: Arc::new(RwLock::new(CurrentThresholds::default())),
adjustment_history: Arc::new(RwLock::new(VecDeque::new())),
}
}
pub async fn record_sample(&self, sample: PerformanceSample) -> Result<()> {
let mut history = self.performance_history.write().await;
history.push_back(sample);
while history.len() > self.config.sample_window_size {
history.pop_front();
}
if history.len() >= 10 {
if let Some(adjustment) = self.calculate_adjustment(&history).await? {
self.apply_adjustment(adjustment).await?;
}
}
Ok(())
}
async fn calculate_adjustment(
&self,
history: &VecDeque<PerformanceSample>,
) -> Result<Option<ThresholdAdjustment>> {
if history.len() < 5 {
return Ok(None);
}
let avg_duration = history.iter().map(|s| s.analysis_duration_ms).sum::<u64>() as f32
/ history.len() as f32;
let avg_cache_hit =
history.iter().map(|s| s.cache_hit_ratio).sum::<f32>() / history.len() as f32;
let avg_memory =
history.iter().map(|s| s.memory_usage_mb).sum::<f32>() / history.len() as f32;
let avg_cpu = history.iter().map(|s| s.cpu_utilization).sum::<f32>() / history.len() as f32;
let avg_queue_depth =
history.iter().map(|s| s.queue_depth).sum::<usize>() as f32 / history.len() as f32;
if avg_duration > self.config.target_analysis_time_ms as f32 * 1.5 {
if avg_cache_hit < self.config.min_cache_hit_ratio {
return Ok(Some(ThresholdAdjustment::ScaleUp {
cache_factor: 1.0 + self.config.adjustment_sensitivity,
permit_factor: 1.0,
}));
} else if avg_queue_depth > 5.0 {
return Ok(Some(ThresholdAdjustment::ScaleUp {
cache_factor: 1.0,
permit_factor: 1.0 + self.config.adjustment_sensitivity,
}));
}
return Ok(Some(ThresholdAdjustment::LessCompression {
compression_level: 1,
}));
}
if avg_memory > self.config.max_memory_mb || avg_cpu > self.config.max_cpu_utilization {
if avg_cache_hit > 0.9
&& avg_duration < self.config.target_analysis_time_ms as f32 * 0.8
{
return Ok(Some(ThresholdAdjustment::ScaleDown {
cache_factor: 1.0 - self.config.adjustment_sensitivity,
permit_factor: 1.0,
}));
}
return Ok(Some(ThresholdAdjustment::MoreCompression {
compression_level: 6,
}));
}
if avg_duration < self.config.target_analysis_time_ms as f32 * 0.5
&& avg_cache_hit > 0.8
&& avg_memory < self.config.max_memory_mb * 0.5
{
return Ok(Some(ThresholdAdjustment::Maintain));
}
Ok(None)
}
async fn apply_adjustment(&self, adjustment: ThresholdAdjustment) -> Result<()> {
let mut thresholds = self.current_thresholds.write().await;
let mut adjustments = self.adjustment_history.write().await;
match adjustment.clone() {
ThresholdAdjustment::ScaleUp {
cache_factor,
permit_factor,
} => {
thresholds.hot_cache_size =
((thresholds.hot_cache_size as f32 * cache_factor) as usize).min(10000);
thresholds.high_priority_permits =
((thresholds.high_priority_permits as f32 * permit_factor) as usize).min(50);
thresholds.low_priority_permits =
((thresholds.low_priority_permits as f32 * permit_factor) as usize).min(20);
}
ThresholdAdjustment::ScaleDown {
cache_factor,
permit_factor,
} => {
thresholds.hot_cache_size =
((thresholds.hot_cache_size as f32 * cache_factor) as usize).max(100);
thresholds.high_priority_permits =
((thresholds.high_priority_permits as f32 * permit_factor) as usize).max(2);
thresholds.low_priority_permits =
((thresholds.low_priority_permits as f32 * permit_factor) as usize).max(1);
}
ThresholdAdjustment::MoreCompression { compression_level } => {
thresholds.compression_level = compression_level.min(9);
}
ThresholdAdjustment::LessCompression { compression_level } => {
thresholds.compression_level = compression_level.max(1);
}
ThresholdAdjustment::Maintain => {
}
}
adjustments.push_back(adjustment);
while adjustments.len() > 100 {
adjustments.pop_front();
}
Ok(())
}
pub async fn get_current_thresholds(&self) -> CurrentThresholds {
self.current_thresholds.read().await.clone()
}
pub async fn get_performance_stats(&self) -> PerformanceStatistics {
let history = self.performance_history.read().await;
let adjustments = self.adjustment_history.read().await;
if history.is_empty() {
return PerformanceStatistics::default();
}
let recent = history.iter().rev().take(10).collect::<Vec<_>>();
let avg_duration =
recent.iter().map(|s| s.analysis_duration_ms).sum::<u64>() as f32 / recent.len() as f32;
let avg_cache_hit =
recent.iter().map(|s| s.cache_hit_ratio).sum::<f32>() / recent.len() as f32;
let avg_memory =
recent.iter().map(|s| s.memory_usage_mb).sum::<f32>() / recent.len() as f32;
let avg_cpu = recent.iter().map(|s| s.cpu_utilization).sum::<f32>() / recent.len() as f32;
let recent_adjustments = adjustments.len().min(10);
PerformanceStatistics {
avg_analysis_duration_ms: avg_duration,
avg_cache_hit_ratio: avg_cache_hit,
avg_memory_usage_mb: avg_memory,
avg_cpu_utilization: avg_cpu,
total_samples: history.len(),
recent_adjustments_count: recent_adjustments,
performance_trend: self.calculate_trend(&history),
}
}
fn calculate_trend(&self, history: &VecDeque<PerformanceSample>) -> PerformanceTrend {
if history.len() < 10 {
return PerformanceTrend::Stable;
}
let history_vec: Vec<_> = history.iter().collect();
let mid_point = history_vec.len() / 2;
let recent_half = &history_vec[mid_point..];
let older_half = &history_vec[..mid_point];
let recent_avg = recent_half
.iter()
.map(|s| s.analysis_duration_ms)
.sum::<u64>() as f32
/ recent_half.len() as f32;
let older_avg = older_half
.iter()
.map(|s| s.analysis_duration_ms)
.sum::<u64>() as f32
/ older_half.len() as f32;
let change_ratio = (recent_avg - older_avg) / older_avg;
if change_ratio > 0.2 {
PerformanceTrend::Degrading
} else if change_ratio < -0.2 {
PerformanceTrend::Improving
} else {
PerformanceTrend::Stable
}
}
pub async fn create_sample(
&self,
duration: Duration,
cache_hit: bool,
queue_depth: usize,
) -> PerformanceSample {
PerformanceSample {
timestamp: Instant::now(),
analysis_duration_ms: duration.as_millis() as u64,
cache_hit_ratio: if cache_hit { 1.0 } else { 0.0 },
memory_usage_mb: self.get_memory_usage().await,
cpu_utilization: self.get_cpu_usage().await,
queue_depth,
}
}
async fn get_memory_usage(&self) -> f32 {
let thresholds = self.current_thresholds.read().await;
let estimated_cache_mb = (thresholds.hot_cache_size * 1024) as f32 / (1024.0 * 1024.0);
estimated_cache_mb + 50.0 }
async fn get_cpu_usage(&self) -> f32 {
let history = self.performance_history.read().await;
let recent_activity = history
.iter()
.rev()
.take(5)
.filter(|s| s.timestamp.elapsed() < Duration::from_secs(10))
.count();
(recent_activity as f32 * 0.1).min(1.0) }
pub async fn reset_to_defaults(&self) -> Result<()> {
let mut thresholds = self.current_thresholds.write().await;
*thresholds = CurrentThresholds::default();
let mut adjustments = self.adjustment_history.write().await;
adjustments.push_back(ThresholdAdjustment::Maintain);
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PerformanceTrend {
Improving,
Stable,
Degrading,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceStatistics {
pub avg_analysis_duration_ms: f32,
pub avg_cache_hit_ratio: f32,
pub avg_memory_usage_mb: f32,
pub avg_cpu_utilization: f32,
pub total_samples: usize,
pub recent_adjustments_count: usize,
pub performance_trend: PerformanceTrend,
}
impl Default for PerformanceStatistics {
fn default() -> Self {
Self {
avg_analysis_duration_ms: 0.0,
avg_cache_hit_ratio: 0.0,
avg_memory_usage_mb: 0.0,
avg_cpu_utilization: 0.0,
total_samples: 0,
recent_adjustments_count: 0,
performance_trend: PerformanceTrend::Stable,
}
}
}
impl PerformanceStatistics {
#[must_use]
pub fn format_diagnostic(&self) -> String {
let trend_indicator = match self.performance_trend {
PerformanceTrend::Improving => "📈 IMPROVING",
PerformanceTrend::Stable => "➡️ STABLE",
PerformanceTrend::Degrading => "📉 DEGRADING",
};
format!(
"Adaptive Thresholds:\n\
- Performance: {}\n\
- Avg duration: {:.1}ms\n\
- Cache hit ratio: {:.1}%\n\
- Memory usage: {:.1}MB\n\
- CPU utilization: {:.1}%\n\
- Total samples: {}\n\
- Recent adjustments: {}",
trend_indicator,
self.avg_analysis_duration_ms,
self.avg_cache_hit_ratio * 100.0,
self.avg_memory_usage_mb,
self.avg_cpu_utilization * 100.0,
self.total_samples,
self.recent_adjustments_count
)
}
}
pub struct AdaptiveThresholdFactory;
impl AdaptiveThresholdFactory {
#[must_use]
pub fn create_default() -> AdaptiveThresholdManager {
AdaptiveThresholdManager::new(AdaptiveConfig::default())
}
#[must_use]
pub fn create_dev_optimized() -> AdaptiveThresholdManager {
let config = AdaptiveConfig {
target_analysis_time_ms: 50, sample_window_size: 20, adjustment_sensitivity: 0.2, ..Default::default()
};
AdaptiveThresholdManager::new(config)
}
#[must_use]
pub fn create_prod_optimized() -> AdaptiveThresholdManager {
let config = AdaptiveConfig {
target_analysis_time_ms: 200, sample_window_size: 100, adjustment_sensitivity: 0.05, ..Default::default()
};
AdaptiveThresholdManager::new(config)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_sample(duration_ms: u64, cache_hit: bool, memory_mb: f32) -> PerformanceSample {
PerformanceSample {
timestamp: Instant::now(),
analysis_duration_ms: duration_ms,
cache_hit_ratio: if cache_hit { 1.0 } else { 0.0 },
memory_usage_mb: memory_mb,
cpu_utilization: 0.5,
queue_depth: 2,
}
}
#[tokio::test]
async fn test_threshold_manager_creation() {
let manager = AdaptiveThresholdManager::new(AdaptiveConfig::default());
let stats = manager.get_performance_stats().await;
assert_eq!(stats.total_samples, 0);
assert!(matches!(stats.performance_trend, PerformanceTrend::Stable));
}
#[tokio::test]
async fn test_performance_sample_recording() {
let manager = AdaptiveThresholdManager::new(AdaptiveConfig::default());
let sample = create_sample(80, true, 100.0);
manager.record_sample(sample).await.unwrap();
let stats = manager.get_performance_stats().await;
assert_eq!(stats.total_samples, 1);
assert_eq!(stats.avg_analysis_duration_ms, 80.0);
}
#[tokio::test]
async fn test_sample_window_management() {
let config = AdaptiveConfig {
sample_window_size: 3,
..Default::default()
};
let manager = AdaptiveThresholdManager::new(config);
for i in 0..5 {
let sample = create_sample(100 + i * 10, true, 100.0);
manager.record_sample(sample).await.unwrap();
}
let stats = manager.get_performance_stats().await;
assert_eq!(stats.total_samples, 3); }
#[tokio::test]
async fn test_scale_up_adjustment() {
let config = AdaptiveConfig {
target_analysis_time_ms: 100,
min_cache_hit_ratio: 0.8,
sample_window_size: 10,
..Default::default()
};
let manager = AdaptiveThresholdManager::new(config);
for _ in 0..12 {
let sample = create_sample(200, false, 100.0); manager.record_sample(sample).await.unwrap();
}
let thresholds = manager.get_current_thresholds().await;
let stats = manager.get_performance_stats().await;
assert!(thresholds.hot_cache_size > 1000); assert!(stats.recent_adjustments_count > 0);
}
#[tokio::test]
async fn test_compression_adjustment() {
let config = AdaptiveConfig {
max_memory_mb: 200.0,
sample_window_size: 10,
..Default::default()
};
let manager = AdaptiveThresholdManager::new(config);
for _ in 0..12 {
let sample = create_sample(80, true, 300.0); manager.record_sample(sample).await.unwrap();
}
let thresholds = manager.get_current_thresholds().await;
assert!(thresholds.compression_level > 4);
}
#[tokio::test]
async fn test_factory_patterns() {
let default_mgr = AdaptiveThresholdFactory::create_default();
let dev_mgr = AdaptiveThresholdFactory::create_dev_optimized();
let prod_mgr = AdaptiveThresholdFactory::create_prod_optimized();
let sample = create_sample(100, true, 100.0);
default_mgr.record_sample(sample.clone()).await.unwrap();
dev_mgr.record_sample(sample.clone()).await.unwrap();
prod_mgr.record_sample(sample).await.unwrap();
let dev_stats = dev_mgr.get_performance_stats().await;
let prod_stats = prod_mgr.get_performance_stats().await;
assert_eq!(dev_stats.total_samples, 1);
assert_eq!(prod_stats.total_samples, 1);
}
#[tokio::test]
async fn test_trend_calculation() {
let manager = AdaptiveThresholdManager::new(AdaptiveConfig::default());
for i in 0..20 {
let duration = 200 - (i * 5); let sample = create_sample(duration, true, 100.0);
manager.record_sample(sample).await.unwrap();
}
let stats = manager.get_performance_stats().await;
assert!(matches!(
stats.performance_trend,
PerformanceTrend::Improving
));
}
}
#[cfg(test)]
mod property_tests {
use proptest::prelude::*;
proptest! {
#[test]
fn basic_property_stability(_input in ".*") {
prop_assert!(true);
}
#[test]
fn module_consistency_check(_x in 0u32..1000) {
prop_assert!(_x < 1001);
}
}
}