use super::DynamicConfig;
use super::sources::{ ConfigSource, ConfigSourceEvent };
use core::time::Duration;
use core::sync::atomic::{ AtomicBool, AtomicU64, Ordering };
use std::sync::{ Arc, Mutex };
#[ cfg( feature = "dynamic_configuration" ) ]
#[ allow( missing_debug_implementations ) ] pub struct HotReloadManager
{
sources : Arc< Mutex< Vec< Box< dyn ConfigSource > > > >,
debounce_interval : Duration,
last_config : Arc< Mutex< Option< DynamicConfig > > >,
metrics : Arc< HotReloadMetrics >,
is_active : Arc< AtomicBool >,
}
#[ cfg( feature = "dynamic_configuration" ) ]
impl HotReloadManager
{
pub fn new( debounce_interval : Duration ) -> Self
{
Self {
sources : Arc::new( Mutex::new( Vec::new() ) ),
debounce_interval,
last_config : Arc::new( Mutex::new( None ) ),
metrics : Arc::new( HotReloadMetrics::default() ),
is_active : Arc::new( AtomicBool::new( false ) ),
}
}
pub fn add_source( &mut self, source : Box< dyn ConfigSource > )
{
let mut sources = self.sources.lock().unwrap();
sources.push( source );
}
pub fn clear_sources( &mut self )
{
let mut sources = self.sources.lock().unwrap();
sources.clear();
}
pub fn source_count( &self ) -> usize
{
self.sources.lock().unwrap().len()
}
pub fn is_active( &self ) -> bool
{
self.is_active.load( Ordering::Relaxed )
}
pub fn metrics( &self ) -> Arc< HotReloadMetrics >
{
self.metrics.clone()
}
pub async fn start_hot_reloading< F >(
&self,
on_config_update : F
) -> Result< HotReloadHandle, crate::error::Error >
where
F: Fn( DynamicConfig ) + Send + Sync + 'static,
{
if self.is_active.load( Ordering::Relaxed )
{
return Err( crate::error::Error::ConfigurationError(
"Hot-reloading is already active".to_string()
) );
}
self.is_active.store( true, Ordering::Relaxed );
self.metrics.record_start();
let ( event_sender, mut event_receiver ) = tokio::sync::mpsc::channel::< ConfigSourceEvent >( 1000 );
{
let sources = self.sources.lock().unwrap();
for source in sources.iter()
{
if source.supports_watching()
{
source.start_watching( event_sender.clone() ).await?;
}
}
}
let sources = self.sources.clone();
let debounce_interval = self.debounce_interval;
let last_config = self.last_config.clone();
let metrics = self.metrics.clone();
let is_active = self.is_active.clone();
let on_config_update = Arc::new( on_config_update );
let task_handle = tokio::spawn( async move {
let mut last_update = std::time::Instant::now();
let mut pending_update = false;
while is_active.load( Ordering::Relaxed )
{
let should_reload = if let Ok( event ) = tokio::time::timeout(
debounce_interval,
event_receiver.recv()
).await {
if event.is_some()
{
pending_update = true;
last_update = std::time::Instant::now();
false } else {
break; }
} else {
pending_update && last_update.elapsed() >= debounce_interval
};
if should_reload
{
pending_update = false;
metrics.record_reload_attempt();
match Self::load_and_merge_configs( &sources )
{
Ok( new_config ) => {
let mut last_config_guard = last_config.lock().unwrap();
let config_changed = last_config_guard.as_ref()
.map_or( true, | last | !last.has_changes( &new_config ) );
if config_changed
{
metrics.record_successful_reload();
*last_config_guard = Some( new_config.clone() );
drop( last_config_guard );
on_config_update( new_config );
} else {
metrics.record_no_change_reload();
}
},
Err( e ) => {
metrics.record_failed_reload();
tracing ::warn!( "Failed to reload configuration : {}", e );
}
}
}
}
} );
Ok( HotReloadHandle {
_task_handle : task_handle,
is_active : self.is_active.clone(),
metrics : self.metrics.clone(),
} )
}
pub async fn load_initial_config( &self ) -> Result< DynamicConfig, crate::error::Error >
{
let merged_config = Self::load_and_merge_configs( &self.sources )?;
{
let mut last_config = self.last_config.lock().unwrap();
*last_config = Some( merged_config.clone() );
}
Ok( merged_config )
}
fn load_and_merge_configs(
_sources : &Arc< Mutex< Vec< Box< dyn ConfigSource > > > >
) -> Result< DynamicConfig, crate::error::Error >
{
let mut configs = Vec::new();
let default_config = DynamicConfig::builder()
.timeout( Duration::from_secs( 30 ) )
.retry_attempts( 3 )
.base_url( "https://generativelanguage.googleapis.com".to_string() )
.build()
.map_err( | e | crate::error::Error::ConfigurationError( format!( "Failed to create default config : {}", e ) ) )?;
configs.push( ( default_config, 0 ) );
tracing ::info!( "Hot-reload configuration manager initialized with default configuration" );
if configs.is_empty()
{
return Err( crate::error::Error::ConfigurationError(
"No configuration sources could be loaded".to_string()
) );
}
configs.sort_by( | a, b | b.1.cmp( &a.1 ) );
let mut merged_config = configs.last().unwrap().0.clone();
for ( config, _ ) in configs.iter().rev().skip( 1 )
{
merged_config = merged_config.merge_with( config );
}
Ok( merged_config )
}
}
#[ cfg( feature = "dynamic_configuration" ) ]
#[ derive( Debug ) ]
pub struct HotReloadHandle
{
_task_handle : tokio::task::JoinHandle< () >,
is_active : Arc< AtomicBool >,
metrics : Arc< HotReloadMetrics >,
}
#[ cfg( feature = "dynamic_configuration" ) ]
impl Drop for HotReloadHandle
{
fn drop( &mut self )
{
self.is_active.store( false, Ordering::Relaxed );
self.metrics.record_stop();
}
}
#[ cfg( feature = "dynamic_configuration" ) ]
#[ derive( Debug ) ]
pub struct HotReloadMetrics
{
pub starts : AtomicU64,
pub stops : AtomicU64,
pub reload_attempts : AtomicU64,
pub successful_reloads : AtomicU64,
pub failed_reloads : AtomicU64,
pub no_change_reloads : AtomicU64,
pub last_reload_time : Mutex< Option< std::time::SystemTime > >,
}
#[ cfg( feature = "dynamic_configuration" ) ]
impl Default for HotReloadMetrics
{
fn default() -> Self
{
Self {
starts : AtomicU64::new( 0 ),
stops : AtomicU64::new( 0 ),
reload_attempts : AtomicU64::new( 0 ),
successful_reloads : AtomicU64::new( 0 ),
failed_reloads : AtomicU64::new( 0 ),
no_change_reloads : AtomicU64::new( 0 ),
last_reload_time : Mutex::new( None ),
}
}
}
#[ cfg( feature = "dynamic_configuration" ) ]
impl HotReloadMetrics
{
pub fn record_start( &self )
{
self.starts.fetch_add( 1, Ordering::Relaxed );
}
pub fn record_stop( &self )
{
self.stops.fetch_add( 1, Ordering::Relaxed );
}
pub fn record_reload_attempt( &self )
{
self.reload_attempts.fetch_add( 1, Ordering::Relaxed );
*self.last_reload_time.lock().unwrap() = Some( std::time::SystemTime::now() );
}
pub fn record_successful_reload( &self )
{
self.successful_reloads.fetch_add( 1, Ordering::Relaxed );
}
pub fn record_failed_reload( &self )
{
self.failed_reloads.fetch_add( 1, Ordering::Relaxed );
}
pub fn record_no_change_reload( &self )
{
self.no_change_reloads.fetch_add( 1, Ordering::Relaxed );
}
pub fn success_rate( &self ) -> f64
{
let attempts = self.reload_attempts.load( Ordering::Relaxed );
let successful = self.successful_reloads.load( Ordering::Relaxed );
if attempts == 0
{
0.0
} else {
( successful as f64 / attempts as f64 ) * 100.0
}
}
pub fn is_active( &self ) -> bool
{
let starts = self.starts.load( Ordering::Relaxed );
let stops = self.stops.load( Ordering::Relaxed );
starts > stops
}
pub fn generate_report( &self ) -> HotReloadMetricsReport
{
HotReloadMetricsReport {
starts : self.starts.load( Ordering::Relaxed ),
stops : self.stops.load( Ordering::Relaxed ),
reload_attempts : self.reload_attempts.load( Ordering::Relaxed ),
successful_reloads : self.successful_reloads.load( Ordering::Relaxed ),
failed_reloads : self.failed_reloads.load( Ordering::Relaxed ),
no_change_reloads : self.no_change_reloads.load( Ordering::Relaxed ),
success_rate : self.success_rate(),
is_active : self.is_active(),
last_reload_time : self.last_reload_time.lock().unwrap().clone(),
}
}
}
#[ cfg( feature = "dynamic_configuration" ) ]
#[ derive( Debug, Clone ) ]
pub struct HotReloadMetricsReport
{
pub starts : u64,
pub stops : u64,
pub reload_attempts : u64,
pub successful_reloads : u64,
pub failed_reloads : u64,
pub no_change_reloads : u64,
pub success_rate : f64,
pub is_active : bool,
pub last_reload_time : Option< std::time::SystemTime >,
}