use super::alerts::{Alert, AlertManager};
use super::config::PerformanceMonitorConfig;
use super::history::PerformanceHistory;
use super::metrics::{PerformanceSample, ProcessorType, SystemMetrics};
use crate::adaptive_memory_compression::MemoryStats;
use crate::error::SparseResult;
use crate::neural_adaptive_sparse::NeuralProcessorStats;
use crate::quantum_inspired_sparse::QuantumProcessorStats;
use crate::quantum_neural_hybrid::QuantumNeuralHybridStats;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
pub struct RealTimePerformanceMonitor {
config: PerformanceMonitorConfig,
monitoring_active: Arc<AtomicBool>,
sample_counter: AtomicUsize,
performance_history: Arc<Mutex<PerformanceHistory>>,
system_metrics: Arc<Mutex<SystemMetrics>>,
alert_manager: Arc<Mutex<AlertManager>>,
processor_registry: Arc<Mutex<ProcessorRegistry>>,
}
pub struct ProcessorRegistry {
quantum_processors: HashMap<String, Box<dyn QuantumProcessorMonitor>>,
neural_processors: HashMap<String, Box<dyn NeuralProcessorMonitor>>,
hybrid_processors: HashMap<String, Box<dyn HybridProcessorMonitor>>,
memory_compressors: HashMap<String, Box<dyn MemoryCompressorMonitor>>,
}
pub trait QuantumProcessorMonitor: Send + Sync {
fn get_stats(&self) -> QuantumProcessorStats;
fn get_id(&self) -> &str;
}
pub trait NeuralProcessorMonitor: Send + Sync {
fn get_stats(&self) -> NeuralProcessorStats;
fn get_id(&self) -> &str;
}
pub trait HybridProcessorMonitor: Send + Sync {
fn get_stats(&self) -> QuantumNeuralHybridStats;
fn get_id(&self) -> &str;
}
pub trait MemoryCompressorMonitor: Send + Sync {
fn get_stats(&self) -> MemoryStats;
fn get_id(&self) -> &str;
}
#[derive(Debug, Clone)]
pub struct MonitoringSummary {
pub monitoring_active: bool,
pub total_samples: usize,
pub active_alerts: usize,
pub registered_processors: usize,
pub uptime_seconds: u64,
pub average_sampling_rate: f64,
pub system_health_score: f64,
}
impl RealTimePerformanceMonitor {
pub fn new(config: PerformanceMonitorConfig) -> Self {
let performance_history = PerformanceHistory::new(config.max_samples);
let system_metrics = SystemMetrics::new();
let alert_manager = AlertManager::new(config.max_alert_history);
let processor_registry = ProcessorRegistry::new();
Self {
config,
monitoring_active: Arc::new(AtomicBool::new(false)),
sample_counter: AtomicUsize::new(0),
performance_history: Arc::new(Mutex::new(performance_history)),
system_metrics: Arc::new(Mutex::new(system_metrics)),
alert_manager: Arc::new(Mutex::new(alert_manager)),
processor_registry: Arc::new(Mutex::new(processor_registry)),
}
}
pub fn start_monitoring(&self) -> SparseResult<()> {
if self.monitoring_active.swap(true, Ordering::Relaxed) {
return Ok(()); }
let monitoring_active = Arc::clone(&self.monitoring_active);
let config = self.config.clone();
let performance_history = Arc::clone(&self.performance_history);
let system_metrics = Arc::clone(&self.system_metrics);
let alert_manager = Arc::clone(&self.alert_manager);
let processor_registry = Arc::clone(&self.processor_registry);
let sample_counter = AtomicUsize::new(0);
std::thread::spawn(move || {
let interval = Duration::from_millis(config.monitoring_interval_ms);
while monitoring_active.load(Ordering::Relaxed) {
let start_time = std::time::Instant::now();
if config.enable_system_metrics {
if let Ok(mut metrics) = system_metrics.lock() {
Self::update_system_metrics(&mut metrics);
}
}
if let Ok(registry) = processor_registry.lock() {
let samples = Self::collect_processor_samples(®istry);
for sample in samples {
sample_counter.fetch_add(1, Ordering::Relaxed);
if let Ok(mut history) = performance_history.lock() {
history.add_sample(sample.clone());
}
if config.enable_alerts {
if let Ok(mut alerts) = alert_manager.lock() {
alerts.process_sample(&sample, None);
}
}
}
}
if sample_counter.load(Ordering::Relaxed).is_multiple_of(1000) {
if let Ok(mut history) = performance_history.lock() {
let retention_time = config.optimization_interval_s * 1000 * 10; history.cleanup_old_samples(retention_time);
}
}
let elapsed = start_time.elapsed();
if elapsed < interval {
std::thread::sleep(interval - elapsed);
}
}
});
Ok(())
}
pub fn stop_monitoring(&self) {
self.monitoring_active.store(false, Ordering::Relaxed);
}
pub fn is_monitoring_active(&self) -> bool {
self.monitoring_active.load(Ordering::Relaxed)
}
pub fn register_quantum_processor<T>(&self, processor: T) -> SparseResult<()>
where
T: QuantumProcessorMonitor + 'static,
{
if let Ok(mut registry) = self.processor_registry.lock() {
let id = processor.get_id().to_string();
registry.quantum_processors.insert(id, Box::new(processor));
}
Ok(())
}
pub fn register_neural_processor<T>(&self, processor: T) -> SparseResult<()>
where
T: NeuralProcessorMonitor + 'static,
{
if let Ok(mut registry) = self.processor_registry.lock() {
let id = processor.get_id().to_string();
registry.neural_processors.insert(id, Box::new(processor));
}
Ok(())
}
pub fn register_hybrid_processor<T>(&self, processor: T) -> SparseResult<()>
where
T: HybridProcessorMonitor + 'static,
{
if let Ok(mut registry) = self.processor_registry.lock() {
let id = processor.get_id().to_string();
registry.hybrid_processors.insert(id, Box::new(processor));
}
Ok(())
}
pub fn register_memory_compressor<T>(&self, compressor: T) -> SparseResult<()>
where
T: MemoryCompressorMonitor + 'static,
{
if let Ok(mut registry) = self.processor_registry.lock() {
let id = compressor.get_id().to_string();
registry.memory_compressors.insert(id, Box::new(compressor));
}
Ok(())
}
pub fn get_monitoring_summary(&self) -> MonitoringSummary {
let monitoring_active = self.is_monitoring_active();
let total_samples = self.sample_counter.load(Ordering::Relaxed);
let active_alerts = if let Ok(alerts) = self.alert_manager.lock() {
alerts.get_active_alerts().len()
} else {
0
};
let registered_processors = if let Ok(registry) = self.processor_registry.lock() {
registry.total_processor_count()
} else {
0
};
let system_health_score = if let Ok(metrics) = self.system_metrics.lock() {
metrics.health_score()
} else {
0.0
};
MonitoringSummary {
monitoring_active,
total_samples,
active_alerts,
registered_processors,
uptime_seconds: 0, average_sampling_rate: 0.0, system_health_score,
}
}
pub fn get_recent_samples(&self, count: usize) -> Vec<PerformanceSample> {
if let Ok(history) = self.performance_history.lock() {
history
.get_recent_samples(count)
.into_iter()
.cloned()
.collect()
} else {
Vec::new()
}
}
pub fn get_active_alerts(&self) -> Vec<Alert> {
if let Ok(alerts) = self.alert_manager.lock() {
alerts.get_active_alerts().into_iter().cloned().collect()
} else {
Vec::new()
}
}
pub fn get_system_metrics(&self) -> Option<SystemMetrics> {
self.system_metrics
.lock()
.ok()
.map(|metrics| metrics.clone())
}
pub fn record_sample(&self, sample: PerformanceSample) -> SparseResult<()> {
self.sample_counter.fetch_add(1, Ordering::Relaxed);
if let Ok(mut history) = self.performance_history.lock() {
history.add_sample(sample.clone());
}
if self.config.enable_alerts {
if let Ok(mut alerts) = self.alert_manager.lock() {
alerts.process_sample(&sample, None);
}
}
Ok(())
}
pub fn get_processor_summary(&self) -> Vec<super::history::ProcessorSummary> {
if let Ok(history) = self.performance_history.lock() {
history.get_processor_summary()
} else {
Vec::new()
}
}
pub fn clear_data(&self) -> SparseResult<()> {
if let Ok(mut history) = self.performance_history.lock() {
history.clear();
}
if let Ok(mut alerts) = self.alert_manager.lock() {
alerts.clear_all_alerts();
}
self.sample_counter.store(0, Ordering::Relaxed);
Ok(())
}
fn collect_processor_samples(registry: &ProcessorRegistry) -> Vec<PerformanceSample> {
let mut samples = Vec::new();
for (id, processor) in ®istry.quantum_processors {
let stats = processor.get_stats();
let sample = Self::quantum_stats_to_sample(id, &stats);
samples.push(sample);
}
for (id, processor) in ®istry.neural_processors {
let stats = processor.get_stats();
let sample = Self::neural_stats_to_sample(id, &stats);
samples.push(sample);
}
for (id, processor) in ®istry.hybrid_processors {
let stats = processor.get_stats();
let sample = Self::hybrid_stats_to_sample(id, &stats);
samples.push(sample);
}
for (id, compressor) in ®istry.memory_compressors {
let stats = compressor.get_stats();
let sample = Self::memory_stats_to_sample(id, &stats);
samples.push(sample);
}
samples
}
fn quantum_stats_to_sample(id: &str, stats: &QuantumProcessorStats) -> PerformanceSample {
PerformanceSample::new(ProcessorType::QuantumInspired, id.to_string())
.with_execution_time(stats.evolution_time * 1000.0) .with_throughput(stats.operations_count as f64)
.with_cache_hit_ratio(stats.cache_efficiency)
.with_error_rate(stats.decoherence_rate)
.with_quantum_coherence(stats.average_logical_fidelity)
}
fn neural_stats_to_sample(id: &str, stats: &NeuralProcessorStats) -> PerformanceSample {
PerformanceSample::new(ProcessorType::NeuralAdaptive, id.to_string())
.with_throughput(stats.total_operations as f64)
.with_cache_hit_ratio(stats.pattern_memory_hit_rate)
.with_neural_confidence(stats.neural_network_accuracy)
.with_custom_metric(
"performance_improvement".to_string(),
stats.average_performance_improvement,
)
.with_custom_metric("rl_reward".to_string(), stats.rl_agent_reward)
.with_custom_metric(
"attention_score".to_string(),
stats.transformer_attention_score,
)
}
fn hybrid_stats_to_sample(id: &str, stats: &QuantumNeuralHybridStats) -> PerformanceSample {
PerformanceSample::new(ProcessorType::QuantumNeuralHybrid, id.to_string())
.with_throughput(stats.total_operations as f64)
.with_memory_usage(stats.memory_utilization * 100.0) .with_quantum_coherence(stats.quantum_coherence)
.with_neural_confidence(stats.neural_confidence)
.with_custom_metric("hybrid_synchronization".to_string(), stats.hybrid_synchronization)
.with_custom_metric("entanglement_strength".to_string(), stats.entanglement_strength)
.with_custom_metric("average_performance".to_string(), stats.average_performance)
}
fn memory_stats_to_sample(id: &str, stats: &MemoryStats) -> PerformanceSample {
let compression_ratio = if stats.compression_stats.total_uncompressed_size > 0 {
stats.compression_stats.total_compressed_size as f64
/ stats.compression_stats.total_uncompressed_size as f64
} else {
1.0
};
PerformanceSample::new(ProcessorType::MemoryCompression, id.to_string())
.with_memory_usage(stats.current_memory_usage as f64 / (1024.0 * 1024.0))
.with_cache_hit_ratio(stats.cache_hit_ratio)
.with_compression_ratio(compression_ratio)
}
fn update_system_metrics(metrics: &mut SystemMetrics) {
metrics.update_timestamp();
metrics.cpu_usage = Self::get_cpu_usage();
metrics.memory_usage = Self::get_memory_usage();
metrics.gpu_usage = Self::get_gpu_usage();
metrics.system_load = Self::get_system_load();
metrics.temperature = Self::get_system_temperature();
}
fn get_cpu_usage() -> f64 {
0.0
}
fn get_memory_usage() -> f64 {
0.0
}
fn get_gpu_usage() -> f64 {
0.0
}
fn get_system_load() -> f64 {
0.0
}
fn get_system_temperature() -> f64 {
0.0
}
}
impl ProcessorRegistry {
fn new() -> Self {
Self {
quantum_processors: HashMap::new(),
neural_processors: HashMap::new(),
hybrid_processors: HashMap::new(),
memory_compressors: HashMap::new(),
}
}
fn total_processor_count(&self) -> usize {
self.quantum_processors.len()
+ self.neural_processors.len()
+ self.hybrid_processors.len()
+ self.memory_compressors.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
struct MockQuantumProcessor {
id: String,
}
impl QuantumProcessorMonitor for MockQuantumProcessor {
fn get_stats(&self) -> QuantumProcessorStats {
QuantumProcessorStats {
operations_count: 100,
coherence_time: 95.0,
decoherence_rate: 0.1,
entanglement_strength: 0.8,
cache_efficiency: 0.8,
error_correction_enabled: true,
active_error_syndromes: 2,
average_logical_fidelity: 0.9,
evolution_time: 1000.0,
}
}
fn get_id(&self) -> &str {
&self.id
}
}
#[test]
fn test_monitor_creation() {
let config = PerformanceMonitorConfig::default();
let monitor = RealTimePerformanceMonitor::new(config);
assert!(!monitor.is_monitoring_active());
}
#[test]
fn test_processor_registration() {
let config = PerformanceMonitorConfig::default();
let monitor = RealTimePerformanceMonitor::new(config);
let processor = MockQuantumProcessor {
id: "test-quantum".to_string(),
};
let result = monitor.register_quantum_processor(processor);
assert!(result.is_ok());
let summary = monitor.get_monitoring_summary();
assert_eq!(summary.registered_processors, 1);
}
#[test]
fn test_monitoring_summary() {
let config = PerformanceMonitorConfig::default();
let monitor = RealTimePerformanceMonitor::new(config);
let summary = monitor.get_monitoring_summary();
assert!(!summary.monitoring_active);
assert_eq!(summary.total_samples, 0);
assert_eq!(summary.active_alerts, 0);
assert_eq!(summary.registered_processors, 0);
}
#[test]
fn test_custom_sample_recording() {
let config = PerformanceMonitorConfig::default();
let monitor = RealTimePerformanceMonitor::new(config);
let sample = PerformanceSample::new(ProcessorType::QuantumInspired, "test".to_string())
.with_execution_time(100.0);
let result = monitor.record_sample(sample);
assert!(result.is_ok());
let recent_samples = monitor.get_recent_samples(10);
assert_eq!(recent_samples.len(), 1);
}
#[test]
fn test_data_clearing() {
let config = PerformanceMonitorConfig::default();
let monitor = RealTimePerformanceMonitor::new(config);
let sample = PerformanceSample::new(ProcessorType::QuantumInspired, "test".to_string());
let _ = monitor.record_sample(sample);
let result = monitor.clear_data();
assert!(result.is_ok());
let recent_samples = monitor.get_recent_samples(10);
assert_eq!(recent_samples.len(), 0);
}
}