use super::{ DynamicConfig, versioning::ConfigChangeEvent };
use core::time::Duration;
use std::time::{ SystemTime, Instant };
use core::sync::atomic::{ AtomicU64, AtomicUsize, Ordering };
use std::sync::{ Arc, Mutex, RwLock };
use std::collections::{ HashMap, BTreeMap };
use tokio::sync::{ RwLock as AsyncRwLock, broadcast };
#[ derive( Debug ) ]
pub struct ConfigMetrics
{
pub total_updates : AtomicU64,
pub validation_cache_hits : AtomicU64,
pub validation_cache_misses : AtomicU64,
pub change_events_sent : AtomicU64,
pub rollback_operations : AtomicU64,
pub history_entries : AtomicUsize,
pub history_memory_bytes : AtomicUsize,
pub avg_update_time_us : AtomicU64,
pub failed_updates : AtomicU64,
}
impl Default for ConfigMetrics
{
fn default() -> Self
{
Self {
total_updates : AtomicU64::new( 0 ),
validation_cache_hits : AtomicU64::new( 0 ),
validation_cache_misses : AtomicU64::new( 0 ),
change_events_sent : AtomicU64::new( 0 ),
rollback_operations : AtomicU64::new( 0 ),
history_entries : AtomicUsize::new( 0 ),
history_memory_bytes : AtomicUsize::new( 0 ),
avg_update_time_us : AtomicU64::new( 0 ),
failed_updates : AtomicU64::new( 0 ),
}
}
}
impl ConfigMetrics
{
pub fn record_update( &self, duration_us : u64 )
{
self.total_updates.fetch_add( 1, Ordering::Relaxed );
self.update_avg_time( duration_us );
}
pub fn record_failed_update( &self )
{
self.failed_updates.fetch_add( 1, Ordering::Relaxed );
}
pub fn record_cache_hit( &self )
{
self.validation_cache_hits.fetch_add( 1, Ordering::Relaxed );
}
pub fn record_cache_miss( &self )
{
self.validation_cache_misses.fetch_add( 1, Ordering::Relaxed );
}
pub fn record_change_event( &self )
{
self.change_events_sent.fetch_add( 1, Ordering::Relaxed );
}
pub fn record_rollback( &self )
{
self.rollback_operations.fetch_add( 1, Ordering::Relaxed );
}
pub fn update_history_stats( &self, entry_count : usize, total_bytes : usize )
{
self.history_entries.store( entry_count, Ordering::Relaxed );
self.history_memory_bytes.store( total_bytes, Ordering::Relaxed );
}
fn update_avg_time( &self, new_time_us : u64 )
{
let current_avg = self.avg_update_time_us.load( Ordering::Relaxed );
let new_avg = ( ( current_avg as f64 * 0.9 ) + ( new_time_us as f64 * 0.1 ) ) as u64;
self.avg_update_time_us.store( new_avg, Ordering::Relaxed );
}
pub fn cache_hit_ratio( &self ) -> f64
{
let hits = self.validation_cache_hits.load( Ordering::Relaxed );
let misses = self.validation_cache_misses.load( Ordering::Relaxed );
let total = hits + misses;
if total == 0
{
0.0
} else {
( hits as f64 / total as f64 ) * 100.0
}
}
pub fn error_rate( &self ) -> f64
{
let total = self.total_updates.load( Ordering::Relaxed );
let failed = self.failed_updates.load( Ordering::Relaxed );
if total == 0
{
0.0
} else {
( failed as f64 / total as f64 ) * 100.0
}
}
pub fn memory_efficiency( &self ) -> f64
{
let total_memory = self.history_memory_bytes.load( Ordering::Relaxed );
if total_memory == 0
{
return 100.0; }
let entries = self.history_entries.load( Ordering::Relaxed );
if entries <= 1
{
return 100.0;
}
let estimated_uncompressed = ( total_memory as f64 * 1.7 ).round() as usize;
( total_memory as f64 / estimated_uncompressed as f64 ) * 100.0
}
pub fn generate_report( &self ) -> ConfigMetricsReport
{
ConfigMetricsReport {
total_updates : self.total_updates.load( Ordering::Relaxed ),
failed_updates : self.failed_updates.load( Ordering::Relaxed ),
error_rate : self.error_rate(),
avg_update_time_us : self.avg_update_time_us.load( Ordering::Relaxed ),
cache_hit_ratio : self.cache_hit_ratio(),
validation_cache_hits : self.validation_cache_hits.load( Ordering::Relaxed ),
validation_cache_misses : self.validation_cache_misses.load( Ordering::Relaxed ),
change_events_sent : self.change_events_sent.load( Ordering::Relaxed ),
rollback_operations : self.rollback_operations.load( Ordering::Relaxed ),
history_entries : self.history_entries.load( Ordering::Relaxed ),
history_memory_bytes : self.history_memory_bytes.load( Ordering::Relaxed ),
memory_efficiency : self.memory_efficiency(),
timestamp : SystemTime::now(),
}
}
pub fn reset( &self )
{
self.total_updates.store( 0, Ordering::Relaxed );
self.validation_cache_hits.store( 0, Ordering::Relaxed );
self.validation_cache_misses.store( 0, Ordering::Relaxed );
self.change_events_sent.store( 0, Ordering::Relaxed );
self.rollback_operations.store( 0, Ordering::Relaxed );
self.avg_update_time_us.store( 0, Ordering::Relaxed );
self.failed_updates.store( 0, Ordering::Relaxed );
}
pub fn to_prometheus_format( &self, instance_name : &str ) -> String
{
let report = self.generate_report();
format!(
"# HELP config_total_updates Total number of configuration updates\n\
# TYPE config_total_updates counter\n\
config_total_updates{{instance=\"{}\"}} {}\n\
\n\
# HELP config_failed_updates Total number of failed configuration updates\n\
# TYPE config_failed_updates counter\n\
config_failed_updates{{instance=\"{}\"}} {}\n\
\n\
# HELP config_error_rate Error rate as percentage\n\
# TYPE config_error_rate gauge\n\
config_error_rate{{instance=\"{}\"}} {:.2}\n\
\n\
# HELP config_avg_update_time_us Average update time in microseconds\n\
# TYPE config_avg_update_time_us gauge\n\
config_avg_update_time_us{{instance=\"{}\"}} {}\n\
\n\
# HELP config_cache_hit_ratio Cache hit ratio as percentage\n\
# TYPE config_cache_hit_ratio gauge\n\
config_cache_hit_ratio{{instance=\"{}\"}} {:.2}\n\
\n\
# HELP config_history_entries Number of history entries\n\
# TYPE config_history_entries gauge\n\
config_history_entries{{instance=\"{}\"}} {}\n\
\n\
# HELP config_history_memory_bytes Memory used by history in bytes\n\
# TYPE config_history_memory_bytes gauge\n\
config_history_memory_bytes{{instance=\"{}\"}} {}\n\
\n\
# HELP config_memory_efficiency Memory efficiency as percentage\n\
# TYPE config_memory_efficiency gauge\n\
config_memory_efficiency{{instance=\"{}\"}} {:.2}\n\
\n\
# HELP config_rollback_operations Total rollback operations\n\
# TYPE config_rollback_operations counter\n\
config_rollback_operations{{instance=\"{}\"}} {}\n",
instance_name, report.total_updates,
instance_name, report.failed_updates,
instance_name, report.error_rate,
instance_name, report.avg_update_time_us,
instance_name, report.cache_hit_ratio,
instance_name, report.history_entries,
instance_name, report.history_memory_bytes,
instance_name, report.memory_efficiency,
instance_name, report.rollback_operations
)
}
pub fn health_check( &self ) -> ConfigHealthStatus
{
let report = self.generate_report();
let mut issues = Vec::new();
let mut warnings = Vec::new();
if report.error_rate > 10.0
{
issues.push( format!( "High error rate : {:.1}%", report.error_rate ) );
} else if report.error_rate > 5.0
{
warnings.push( format!( "Elevated error rate : {:.1}%", report.error_rate ) );
}
if report.cache_hit_ratio < 50.0 && report.validation_cache_hits + report.validation_cache_misses > 10
{
issues.push( format!( "Low cache hit ratio : {:.1}%", report.cache_hit_ratio ) );
} else if report.cache_hit_ratio < 80.0 && report.validation_cache_hits + report.validation_cache_misses > 10
{
warnings.push( format!( "Suboptimal cache hit ratio : {:.1}%", report.cache_hit_ratio ) );
}
if report.avg_update_time_us > 5000 {
issues.push( format!( "Slow updates : {}μs average", report.avg_update_time_us ) );
}
else if report.avg_update_time_us > 2000 {
warnings.push( format!( "Slow updates : {}μs average", report.avg_update_time_us ) );
}
if report.history_memory_bytes > 10 * 1024 * 1024 {
warnings.push( format!( "High memory usage : {} bytes", report.history_memory_bytes ) );
}
if report.rollback_operations > report.total_updates / 4
{
warnings.push( format!( "High rollback rate : {} rollbacks vs {} updates", report.rollback_operations, report.total_updates ) );
}
if !issues.is_empty()
{
ConfigHealthStatus::Unhealthy { issues, warnings }
} else if !warnings.is_empty()
{
ConfigHealthStatus::Degraded { warnings }
} else {
ConfigHealthStatus::Healthy
}
}
}
#[ derive( Debug, Clone ) ]
pub struct ConfigMetricsReport
{
pub total_updates : u64,
pub failed_updates : u64,
pub error_rate : f64,
pub avg_update_time_us : u64,
pub cache_hit_ratio : f64,
pub validation_cache_hits : u64,
pub validation_cache_misses : u64,
pub change_events_sent : u64,
pub rollback_operations : u64,
pub history_entries : usize,
pub history_memory_bytes : usize,
pub memory_efficiency : f64,
pub timestamp : SystemTime,
}
#[ derive( Debug, Clone ) ]
pub enum ConfigHealthStatus
{
Healthy,
Degraded {
warnings : Vec< String >
},
Unhealthy {
issues : Vec< String >,
warnings : Vec< String >
},
}
impl ConfigHealthStatus
{
pub fn is_healthy( &self ) -> bool
{
matches!( self, ConfigHealthStatus::Healthy )
}
pub fn get_all_messages( &self ) -> Vec< String >
{
match self
{
ConfigHealthStatus::Healthy => Vec::new(),
ConfigHealthStatus::Degraded { warnings } => warnings.clone(),
ConfigHealthStatus::Unhealthy { issues, warnings } => {
let mut messages = issues.clone();
messages.extend( warnings.clone() );
messages
}
}
}
}
#[ derive( Debug, Clone ) ]
pub struct ConfigManagerOptions
{
pub max_history_entries : usize,
pub max_history_memory_bytes : usize,
pub enable_change_notifications : bool,
pub enable_validation_caching : bool,
pub cleanup_interval : Option< Duration >,
}
impl Default for ConfigManagerOptions
{
fn default() -> Self
{
Self {
max_history_entries : 1000, max_history_memory_bytes : 10 * 1024 * 1024, enable_change_notifications : true,
enable_validation_caching : true,
cleanup_interval : Some( Duration::from_secs( 3600 ) ), }
}
}
#[ derive( Debug, Clone, PartialEq, Eq ) ]
pub enum SyncStatus
{
Synchronized,
Pending,
Failed( String ),
InProgress,
}
#[ derive( Debug ) ]
pub struct ConfigCacheEntry
{
pub config : DynamicConfig,
pub cached_at : Instant,
pub config_hash : u64,
pub access_count : AtomicUsize,
pub last_accessed : Mutex< Instant >,
}
impl ConfigCacheEntry
{
pub fn new( config : DynamicConfig ) -> Self
{
let config_hash = config.compute_hash();
Self {
config,
cached_at : Instant::now(),
config_hash,
access_count : AtomicUsize::new( 0 ),
last_accessed : Mutex::new( Instant::now() ),
}
}
pub fn mark_accessed( &self )
{
self.access_count.fetch_add( 1, Ordering::Relaxed );
*self.last_accessed.lock().unwrap() = Instant::now();
}
pub fn is_expired( &self, ttl : Duration ) -> bool
{
self.cached_at.elapsed() > ttl
}
}
#[ allow( missing_debug_implementations ) ]
pub struct ConfigSyncContext
{
cache : AsyncRwLock< BTreeMap< String, ConfigCacheEntry > >,
change_broadcaster : broadcast::Sender< ConfigChangeEvent >,
sync_status : AsyncRwLock< SyncStatus >,
merge_strategies : RwLock< HashMap< String, Box< dyn Fn( &DynamicConfig, &DynamicConfig ) -> DynamicConfig + Send + Sync > > >,
cache_ttl : Duration,
max_cache_size : usize,
}
impl ConfigSyncContext
{
pub fn new( cache_ttl : Duration, max_cache_size : usize ) -> Self
{
let ( change_broadcaster, _ ) = broadcast::channel( 1000 );
Self {
cache : AsyncRwLock::new( BTreeMap::new() ),
change_broadcaster,
sync_status : AsyncRwLock::new( SyncStatus::Synchronized ),
merge_strategies : RwLock::new( HashMap::new() ),
cache_ttl,
max_cache_size,
}
}
pub async fn get_cached_config( &self, key : &str ) -> Option< DynamicConfig >
{
let cache = self.cache.read().await;
if let Some( entry ) = cache.get( key )
{
if !entry.is_expired( self.cache_ttl )
{
entry.mark_accessed();
return Some( entry.config.clone() );
}
}
None
}
pub async fn cache_config( &self, key : String, config : DynamicConfig )
{
let mut cache = self.cache.write().await;
cache.retain( | _, entry | !entry.is_expired( self.cache_ttl ) );
if cache.len() >= self.max_cache_size
{
if let Some( lru_key ) = cache.iter()
.min_by_key( | ( _, entry ) | entry.last_accessed.lock().unwrap().clone() )
.map( | ( k, _ ) | k.clone() )
{
cache.remove( &lru_key );
}
}
cache.insert( key, ConfigCacheEntry::new( config ) );
}
pub fn subscribe_to_changes( &self ) -> broadcast::Receiver< ConfigChangeEvent >
{
self.change_broadcaster.subscribe()
}
pub fn broadcast_change( &self, event : ConfigChangeEvent )
{
let _ = self.change_broadcaster.send( event );
}
pub async fn sync_status( &self ) -> SyncStatus
{
self.sync_status.read().await.clone()
}
pub async fn update_sync_status( &self, status : SyncStatus )
{
*self.sync_status.write().await = status;
}
pub fn register_merge_strategy< F >( &self, source : String, strategy : F )
where
F: Fn( &DynamicConfig, &DynamicConfig ) -> DynamicConfig + Send + Sync + 'static,
{
let mut strategies = self.merge_strategies.write().unwrap();
strategies.insert( source, Box::new( strategy ) );
}
pub fn merge_configs( &self, base : &DynamicConfig, overlay : &DynamicConfig, source : &str ) -> DynamicConfig
{
let strategies = self.merge_strategies.read().unwrap();
if let Some( strategy ) = strategies.get( source )
{
strategy( base, overlay )
} else {
base.merge_with( overlay )
}
}
pub async fn cache_stats( &self ) -> ( usize, usize )
{
let cache = self.cache.read().await;
let total_entries = cache.len();
let expired_entries = cache.values().filter( | e | e.is_expired( self.cache_ttl ) ).count();
( total_entries, expired_entries )
}
pub async fn cleanup_cache( &self ) -> usize
{
let mut cache = self.cache.write().await;
let initial_size = cache.len();
cache.retain( | _, entry | !entry.is_expired( self.cache_ttl ) );
initial_size - cache.len()
}
}
#[ derive( Debug ) ]
pub struct ConfigChangeListener
{
pub _handle : Arc< () >,
}
impl ConfigChangeListener
{
pub fn new() -> Self
{
Self {
_handle : Arc::new( () ),
}
}
}