use super::DynamicConfig;
use core::time::Duration;
use std::collections::HashMap;
use tokio::sync::mpsc::Sender;
#[ cfg( feature = "dynamic_configuration" ) ]
use notify::{ Watcher, RecursiveMode, Event };
#[ cfg( feature = "dynamic_configuration" ) ]
use std::sync::mpsc;
#[ cfg( feature = "dynamic_configuration" ) ]
use std::thread;
#[ cfg( feature = "dynamic_configuration" ) ]
use std::env;
#[ cfg( feature = "dynamic_configuration" ) ]
#[ async_trait::async_trait ]
pub trait ConfigSource : Send + Sync
{
async fn load_config( &self ) -> Result< DynamicConfig, crate::error::Error >;
fn source_id( &self ) -> &str;
fn priority( &self ) -> u8;
fn supports_watching( &self ) -> bool
{
false
}
async fn start_watching( &self, _sender : Sender< ConfigSourceEvent > ) -> Result< (), crate::error::Error >
{
Err( crate::error::Error::NotImplemented( "Watching not supported for this source".to_string() ) )
}
}
#[ cfg( feature = "dynamic_configuration" ) ]
#[ derive( Debug, Clone ) ]
pub struct ConfigSourceEvent
{
pub source_id : String,
pub event_type : ConfigSourceEventType,
pub config : Option< DynamicConfig >,
pub error : Option< String >,
}
#[ cfg( feature = "dynamic_configuration" ) ]
#[ derive( Debug, Clone, PartialEq, Eq ) ]
pub enum ConfigSourceEventType
{
ConfigChanged,
LoadError,
SourceUnavailable,
SourceAvailable,
}
#[ cfg( feature = "dynamic_configuration" ) ]
#[ derive( Debug ) ]
pub struct FileConfigSource
{
file_path : std::path::PathBuf,
priority : u8,
source_id : String,
}
#[ cfg( feature = "dynamic_configuration" ) ]
impl FileConfigSource
{
pub fn new< P: AsRef< std::path::Path > >( file_path : P, priority : u8 ) -> Self
{
let file_path = file_path.as_ref().to_path_buf();
let source_id = format!( "file:{}", file_path.display() );
Self {
file_path,
priority,
source_id,
}
}
}
#[ cfg( feature = "dynamic_configuration" ) ]
#[ async_trait::async_trait ]
impl ConfigSource for FileConfigSource
{
async fn load_config( &self ) -> Result< DynamicConfig, crate::error::Error >
{
DynamicConfig::from_file( &self.file_path ).await
}
fn source_id( &self ) -> &str
{
&self.source_id
}
fn priority( &self ) -> u8
{
self.priority
}
fn supports_watching( &self ) -> bool
{
true
}
async fn start_watching( &self, sender : Sender< ConfigSourceEvent > ) -> Result< (), crate::error::Error >
{
let file_path = self.file_path.clone();
let file_path_async = self.file_path.clone(); let source_id = self.source_id.clone();
let ( sync_tx, sync_rx ) = mpsc::channel();
thread ::spawn( move || {
let tx = sync_tx;
let watcher = notify::recommended_watcher( move | res | {
let _ = tx.send( res );
} );
if let Ok( mut watcher ) = watcher
{
if let Some( parent ) = file_path.parent()
{
if watcher.watch( parent, RecursiveMode::NonRecursive ).is_ok()
{
loop
{
std ::thread::sleep( std::time::Duration::from_secs( 1 ) );
}
}
}
}
} );
tokio ::spawn( async move {
while let Ok( res ) = sync_rx.recv()
{
match res
{
Ok( Event { paths, .. } ) => {
if paths.contains( &file_path_async )
{
let event = ConfigSourceEvent {
source_id : source_id.clone(),
event_type : ConfigSourceEventType::ConfigChanged,
config : None,
error : None,
};
if sender.send( event ).await.is_err()
{
break;
}
}
}
Err( e ) => {
let event = ConfigSourceEvent {
source_id : source_id.clone(),
event_type : ConfigSourceEventType::LoadError,
config : None,
error : Some( format!( "File watch error : {}", e ) ),
};
if sender.send( event ).await.is_err()
{
break;
}
}
}
}
} );
Ok( () )
}
}
#[ cfg( feature = "dynamic_configuration" ) ]
#[ derive( Debug ) ]
pub struct EnvironmentConfigSource
{
prefix : String,
priority : u8,
source_id : String,
}
#[ cfg( feature = "dynamic_configuration" ) ]
impl EnvironmentConfigSource
{
pub fn new( prefix : String, priority : u8 ) -> Self
{
let source_id = format!( "env:{}", prefix );
Self {
prefix,
priority,
source_id,
}
}
}
#[ cfg( feature = "dynamic_configuration" ) ]
#[ async_trait::async_trait ]
impl ConfigSource for EnvironmentConfigSource
{
async fn load_config( &self ) -> Result< DynamicConfig, crate::error::Error >
{
let mut builder = DynamicConfig::builder();
if let Ok( timeout ) = env::var( format!( "{}_TIMEOUT_SECONDS", self.prefix ) )
{
if let Ok( seconds ) = timeout.parse::< u64 >()
{
builder = builder.timeout( Duration::from_secs( seconds ) );
}
}
if let Ok( retries ) = env::var( format!( "{}_RETRY_ATTEMPTS", self.prefix ) )
{
if let Ok( attempts ) = retries.parse::< u32 >()
{
builder = builder.retry_attempts( attempts );
}
}
if let Ok( base_url ) = env::var( format!( "{}_BASE_URL", self.prefix ) )
{
builder = builder.base_url( base_url );
}
if let Ok( jitter ) = env::var( format!( "{}_ENABLE_JITTER", self.prefix ) )
{
if let Ok( enable ) = jitter.parse::< bool >()
{
builder = builder.enable_jitter( enable );
}
}
let config = builder
.source_priority( self.priority )
.tag( "source".to_string(), "environment".to_string() )
.tag( "prefix".to_string(), self.prefix.clone() )
.build()?;
Ok( config )
}
fn source_id( &self ) -> &str
{
&self.source_id
}
fn priority( &self ) -> u8
{
self.priority
}
}
#[ cfg( feature = "dynamic_configuration" ) ]
#[ derive( Debug ) ]
pub struct RemoteConfigSource
{
endpoint_url : String,
priority : u8,
source_id : String,
http_client : reqwest::Client,
poll_interval : Duration,
auth_headers : HashMap< String, String >,
}
#[ cfg( feature = "dynamic_configuration" ) ]
impl RemoteConfigSource
{
pub fn new( endpoint_url : String, priority : u8 ) -> Self
{
let source_id = format!( "remote:{}", endpoint_url );
let http_client = reqwest::Client::builder()
.timeout( Duration::from_secs( 30 ) )
.build()
.unwrap_or_else( | _ | reqwest::Client::new() );
Self {
endpoint_url,
priority,
source_id,
http_client,
poll_interval : Duration::from_secs( 60 ), auth_headers : HashMap::new(),
}
}
pub fn with_poll_interval( endpoint_url : String, priority : u8, poll_interval : Duration ) -> Self
{
let mut source = Self::new( endpoint_url, priority );
source.poll_interval = poll_interval;
source
}
pub fn with_auth_header( mut self, key : String, value : String ) -> Self
{
self.auth_headers.insert( key, value );
self
}
pub fn with_auth_headers( mut self, headers : HashMap< String, String > ) -> Self
{
self.auth_headers.extend( headers );
self
}
pub fn set_poll_interval( &mut self, interval : Duration )
{
self.poll_interval = interval;
}
}
#[ cfg( feature = "dynamic_configuration" ) ]
#[ async_trait::async_trait ]
impl ConfigSource for RemoteConfigSource
{
async fn load_config( &self ) -> Result< DynamicConfig, crate::error::Error >
{
let mut request = self.http_client.get( &self.endpoint_url );
for ( key, value ) in &self.auth_headers
{
request = request.header( key, value );
}
let response = request.send().await
.map_err( | e | crate::error::Error::NetworkError( format!( "Failed to fetch remote config : {}", e ) ) )?;
if !response.status().is_success()
{
return Err( crate::error::Error::ServerError(
format!( "Remote config request failed with status : {}", response.status() )
) );
}
let config_json = response.text().await
.map_err( | e | crate::error::Error::NetworkError( format!( "Failed to read remote config response : {}", e ) ) )?;
let mut config : DynamicConfig = serde_json::from_str( &config_json )
.map_err( | e | crate::error::Error::DeserializationError( format!( "Failed to parse remote config JSON: {}", e ) ) )?;
config.source_priority = Some( self.priority );
config.tags.insert( "source".to_string(), "remote".to_string() );
config.tags.insert( "endpoint".to_string(), self.endpoint_url.clone() );
Ok( config )
}
fn source_id( &self ) -> &str
{
&self.source_id
}
fn priority( &self ) -> u8
{
self.priority
}
fn supports_watching( &self ) -> bool
{
true
}
async fn start_watching( &self, sender : Sender< ConfigSourceEvent > ) -> Result< (), crate::error::Error >
{
let endpoint_url = self.endpoint_url.clone();
let source_id = self.source_id.clone();
let http_client = self.http_client.clone();
let auth_headers = self.auth_headers.clone();
let poll_interval = self.poll_interval;
tokio ::spawn( async move {
let mut last_config_hash : Option< u64 > = None;
let mut interval = tokio::time::interval( poll_interval );
loop
{
interval.tick().await;
match Self::fetch_config( &http_client, &endpoint_url, &auth_headers ).await
{
Ok( config ) => {
let current_hash = config.compute_hash();
if last_config_hash.map_or( true, | hash | hash != current_hash )
{
last_config_hash = Some( current_hash );
let event = ConfigSourceEvent {
source_id : source_id.clone(),
event_type : ConfigSourceEventType::ConfigChanged,
config : Some( config ),
error : None,
};
if sender.send( event ).await.is_err()
{
break; }
}
},
Err( e ) => {
let event = ConfigSourceEvent {
source_id : source_id.clone(),
event_type : ConfigSourceEventType::LoadError,
config : None,
error : Some( format!( "Failed to fetch remote config : {}", e ) ),
};
if sender.send( event ).await.is_err()
{
break; }
}
}
}
} );
Ok( () )
}
}
#[ cfg( feature = "dynamic_configuration" ) ]
impl RemoteConfigSource
{
async fn fetch_config(
http_client : &reqwest::Client,
endpoint_url : &str,
auth_headers : &HashMap< String, String >
) -> Result< DynamicConfig, crate::error::Error >
{
let mut request = http_client.get( endpoint_url );
for ( key, value ) in auth_headers
{
request = request.header( key, value );
}
let response = request.send().await
.map_err( | e | crate::error::Error::NetworkError( format!( "HTTP request failed : {}", e ) ) )?;
if !response.status().is_success()
{
return Err( crate::error::Error::ServerError(
format!( "HTTP request failed with status : {}", response.status() )
) );
}
let config_json = response.text().await
.map_err( | e | crate::error::Error::NetworkError( format!( "Failed to read response : {}", e ) ) )?;
let config : DynamicConfig = serde_json::from_str( &config_json )
.map_err( | e | crate::error::Error::DeserializationError( format!( "Failed to parse JSON: {}", e ) ) )?;
Ok( config )
}
}