use crate::error::{Result, ThingsError};
use crate::mcp_config::McpServerConfig;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, RwLock};
use tokio::time::interval;
use tracing::{debug, error, info};
#[derive(Debug)]
pub struct ConfigHotReloader {
config: Arc<RwLock<McpServerConfig>>,
config_path: PathBuf,
reload_interval: Duration,
enabled: bool,
change_tx: broadcast::Sender<McpServerConfig>,
last_modified: Option<std::time::SystemTime>,
}
impl ConfigHotReloader {
pub fn new(
config: McpServerConfig,
config_path: PathBuf,
reload_interval: Duration,
) -> Result<Self> {
if !config_path.exists() {
return Err(ThingsError::configuration(format!(
"Configuration file does not exist: {}",
config_path.display()
)));
}
let (change_tx, _) = broadcast::channel(16);
let last_modified = Self::get_file_modified_time(&config_path)?;
Ok(Self {
config: Arc::new(RwLock::new(config)),
config_path,
reload_interval,
enabled: true,
change_tx,
last_modified: Some(last_modified),
})
}
pub fn with_default_settings(config_path: PathBuf) -> Result<Self> {
let config = McpServerConfig::default();
Self::new(config, config_path, Duration::from_secs(5))
}
#[must_use]
pub async fn get_config(&self) -> McpServerConfig {
self.config.read().await.clone()
}
pub async fn update_config(&self, new_config: McpServerConfig) -> Result<()> {
new_config.validate()?;
let mut config = self.config.write().await;
*config = new_config.clone();
let _ = self.change_tx.send(new_config);
info!("Configuration updated successfully");
Ok(())
}
#[must_use]
pub fn subscribe_to_changes(&self) -> broadcast::Receiver<McpServerConfig> {
self.change_tx.subscribe()
}
pub fn set_enabled(&mut self, enabled: bool) {
self.enabled = enabled;
if enabled {
info!("Configuration hot reloading enabled");
} else {
info!("Configuration hot reloading disabled");
}
}
#[must_use]
pub fn is_enabled(&self) -> bool {
self.enabled
}
pub fn start(&self) -> Result<()> {
if !self.enabled {
debug!("Hot reloading is disabled, not starting reloader task");
return Ok(());
}
let config = Arc::clone(&self.config);
let config_path = self.config_path.clone();
let change_tx = self.change_tx.clone();
let mut interval = interval(self.reload_interval);
let mut last_modified = self.last_modified;
info!(
"Starting configuration hot reloader for: {}",
config_path.display()
);
tokio::spawn(async move {
loop {
interval.tick().await;
match Self::check_and_reload_config(
&config_path,
&config,
&change_tx,
&mut last_modified,
)
.await
{
Ok(reloaded) => {
if reloaded {
debug!(
"Configuration reloaded from file: {}",
config_path.display()
);
}
}
Err(e) => {
error!("Failed to check/reload configuration: {}", e);
}
}
}
});
Ok(())
}
async fn check_and_reload_config(
config_path: &PathBuf,
config: &Arc<RwLock<McpServerConfig>>,
change_tx: &broadcast::Sender<McpServerConfig>,
last_modified: &mut Option<std::time::SystemTime>,
) -> Result<bool> {
let current_modified = Self::get_file_modified_time(config_path)?;
if let Some(last) = *last_modified {
if current_modified <= last {
return Ok(false); }
}
debug!("Configuration file modified, attempting to reload");
match McpServerConfig::from_file(config_path) {
Ok(new_config) => {
new_config.validate()?;
{
let mut current_config = config.write().await;
*current_config = new_config.clone();
}
let _ = change_tx.send(new_config);
*last_modified = Some(current_modified);
info!(
"Configuration successfully reloaded from: {}",
config_path.display()
);
Ok(true)
}
Err(e) => {
error!(
"Failed to reload configuration from {}: {}",
config_path.display(),
e
);
Err(e)
}
}
}
fn get_file_modified_time(path: &PathBuf) -> Result<std::time::SystemTime> {
let metadata = std::fs::metadata(path).map_err(|e| {
ThingsError::Io(std::io::Error::other(format!(
"Failed to get file metadata for {}: {}",
path.display(),
e
)))
})?;
metadata.modified().map_err(|e| {
ThingsError::Io(std::io::Error::other(format!(
"Failed to get modification time for {}: {}",
path.display(),
e
)))
})
}
pub async fn reload_now(&self) -> Result<bool> {
let mut last_modified = self.last_modified;
Self::check_and_reload_config(
&self.config_path,
&self.config,
&self.change_tx,
&mut last_modified,
)
.await
}
#[must_use]
pub fn config_path(&self) -> &PathBuf {
&self.config_path
}
#[must_use]
pub fn reload_interval(&self) -> Duration {
self.reload_interval
}
pub fn set_reload_interval(&mut self, interval: Duration) {
self.reload_interval = interval;
debug!("Configuration reload interval set to: {:?}", interval);
}
}
#[async_trait::async_trait]
pub trait ConfigChangeHandler: Send + Sync {
async fn handle_config_change(
&self,
old_config: &McpServerConfig,
new_config: &McpServerConfig,
);
}
pub struct DefaultConfigChangeHandler;
#[async_trait::async_trait]
impl ConfigChangeHandler for DefaultConfigChangeHandler {
async fn handle_config_change(
&self,
old_config: &McpServerConfig,
new_config: &McpServerConfig,
) {
info!("Configuration changed:");
if old_config.server.name != new_config.server.name {
info!(
" Server name: {} -> {}",
old_config.server.name, new_config.server.name
);
}
if old_config.logging.level != new_config.logging.level {
info!(
" Log level: {} -> {}",
old_config.logging.level, new_config.logging.level
);
}
if old_config.cache.enabled != new_config.cache.enabled {
info!(
" Cache enabled: {} -> {}",
old_config.cache.enabled, new_config.cache.enabled
);
}
if old_config.performance.enabled != new_config.performance.enabled {
info!(
" Performance monitoring: {} -> {}",
old_config.performance.enabled, new_config.performance.enabled
);
}
if old_config.security.authentication.enabled != new_config.security.authentication.enabled
{
info!(
" Authentication: {} -> {}",
old_config.security.authentication.enabled,
new_config.security.authentication.enabled
);
}
}
}
pub struct ConfigHotReloaderWithHandler {
reloader: ConfigHotReloader,
handler: Arc<dyn ConfigChangeHandler>,
}
impl ConfigHotReloaderWithHandler {
pub fn new(
config: McpServerConfig,
config_path: PathBuf,
reload_interval: Duration,
handler: Arc<dyn ConfigChangeHandler>,
) -> Result<Self> {
let reloader = ConfigHotReloader::new(config, config_path, reload_interval)?;
Ok(Self { reloader, handler })
}
pub fn start_with_handler(&self) -> Result<()> {
self.reloader.start()?;
let mut change_rx = self.reloader.subscribe_to_changes();
let handler = Arc::clone(&self.handler);
let config = Arc::clone(&self.reloader.config);
tokio::spawn(async move {
let mut old_config = config.read().await.clone();
while let Ok(new_config) = change_rx.recv().await {
handler.handle_config_change(&old_config, &new_config).await;
old_config = new_config;
}
});
Ok(())
}
#[must_use]
pub fn reloader(&self) -> &ConfigHotReloader {
&self.reloader
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use tempfile::NamedTempFile;
#[tokio::test]
async fn test_config_hot_reloader_creation() {
let temp_file = NamedTempFile::new().unwrap();
let config_path = temp_file.path().with_extension("json");
let config = McpServerConfig::default();
config.to_file(&config_path, "json").unwrap();
let reloader = ConfigHotReloader::new(config, config_path, Duration::from_secs(1)).unwrap();
assert!(reloader.is_enabled());
}
#[tokio::test]
async fn test_config_hot_reloader_with_default_settings() {
let temp_file = NamedTempFile::new().unwrap();
let config_path = temp_file.path().with_extension("json");
let config = McpServerConfig::default();
config.to_file(&config_path, "json").unwrap();
let reloader = ConfigHotReloader::with_default_settings(config_path).unwrap();
assert!(reloader.is_enabled());
}
#[tokio::test]
async fn test_config_hot_reloader_enable_disable() {
let temp_file = NamedTempFile::new().unwrap();
let config_path = temp_file.path().with_extension("json");
let config = McpServerConfig::default();
config.to_file(&config_path, "json").unwrap();
let mut reloader =
ConfigHotReloader::new(config, config_path, Duration::from_secs(1)).unwrap();
assert!(reloader.is_enabled());
reloader.set_enabled(false);
assert!(!reloader.is_enabled());
reloader.set_enabled(true);
assert!(reloader.is_enabled());
}
#[tokio::test]
async fn test_config_hot_reloader_get_config() {
let temp_file = NamedTempFile::new().unwrap();
let config_path = temp_file.path().with_extension("json");
let mut config = McpServerConfig::default();
config.server.name = "test-server".to_string();
config.to_file(&config_path, "json").unwrap();
let reloader = ConfigHotReloader::new(config, config_path, Duration::from_secs(1)).unwrap();
let loaded_config = reloader.get_config().await;
assert_eq!(loaded_config.server.name, "test-server");
}
#[tokio::test]
async fn test_config_hot_reloader_update_config() {
let temp_file = NamedTempFile::new().unwrap();
let config_path = temp_file.path().with_extension("json");
let config = McpServerConfig::default();
config.to_file(&config_path, "json").unwrap();
let reloader = ConfigHotReloader::new(config, config_path, Duration::from_secs(1)).unwrap();
let mut new_config = McpServerConfig::default();
new_config.server.name = "updated-server".to_string();
reloader.update_config(new_config).await.unwrap();
let loaded_config = reloader.get_config().await;
assert_eq!(loaded_config.server.name, "updated-server");
}
#[tokio::test]
async fn test_config_hot_reloader_subscribe_to_changes() {
let temp_file = NamedTempFile::new().unwrap();
let config_path = temp_file.path().with_extension("json");
let config = McpServerConfig::default();
config.to_file(&config_path, "json").unwrap();
let reloader = ConfigHotReloader::new(config, config_path, Duration::from_secs(1)).unwrap();
let mut change_rx = reloader.subscribe_to_changes();
let mut new_config = McpServerConfig::default();
new_config.server.name = "changed-server".to_string();
reloader.update_config(new_config).await.unwrap();
let received_config = change_rx.recv().await.unwrap();
assert_eq!(received_config.server.name, "changed-server");
}
#[tokio::test]
async fn test_config_hot_reloader_with_handler() {
let temp_file = NamedTempFile::new().unwrap();
let config_path = temp_file.path().with_extension("json");
let config = McpServerConfig::default();
config.to_file(&config_path, "json").unwrap();
let handler = Arc::new(DefaultConfigChangeHandler);
let reloader =
ConfigHotReloaderWithHandler::new(config, config_path, Duration::from_secs(1), handler)
.unwrap();
assert!(reloader.reloader().is_enabled());
}
#[tokio::test]
async fn test_config_hot_reloader_nonexistent_file() {
let config_path = PathBuf::from("/nonexistent/config.json");
let config = McpServerConfig::default();
let result = ConfigHotReloader::new(config, config_path, Duration::from_secs(1));
assert!(result.is_err());
let error = result.unwrap_err();
assert!(matches!(error, ThingsError::Configuration { .. }));
}
#[tokio::test]
async fn test_config_hot_reloader_invalid_config_file() {
let temp_file = NamedTempFile::new().unwrap();
let config_path = temp_file.path().with_extension("json");
std::fs::write(&config_path, "{ invalid json }").unwrap();
let result = McpServerConfig::from_file(&config_path);
assert!(result.is_err());
}
#[tokio::test]
async fn test_config_hot_reloader_file_permission_error() {
let temp_file = NamedTempFile::new().unwrap();
let config_path = temp_file.path().with_extension("json");
let config = McpServerConfig::default();
config.to_file(&config_path, "json").unwrap();
let reloader =
ConfigHotReloader::new(config, config_path.clone(), Duration::from_secs(1)).unwrap();
std::fs::remove_file(&config_path).unwrap();
let result = reloader.reload_now().await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_config_hot_reloader_concurrent_updates() {
let temp_file = NamedTempFile::new().unwrap();
let config_path = temp_file.path().with_extension("json");
let config = McpServerConfig::default();
config.to_file(&config_path, "json").unwrap();
let reloader =
ConfigHotReloader::new(config, config_path.clone(), Duration::from_secs(1)).unwrap();
let mut change_rx = reloader.subscribe_to_changes();
let mut config1 = McpServerConfig::default();
config1.server.name = "config1".to_string();
let mut config2 = McpServerConfig::default();
config2.server.name = "config2".to_string();
let reloader_clone = Arc::new(reloader);
let reloader1 = Arc::clone(&reloader_clone);
let reloader2 = Arc::clone(&reloader_clone);
let handle1 = tokio::spawn(async move { reloader1.update_config(config1).await });
let handle2 = tokio::spawn(async move { reloader2.update_config(config2).await });
let _ = handle1.await.unwrap();
let _ = handle2.await.unwrap();
let _received_config = change_rx.recv().await.unwrap();
}
#[tokio::test]
async fn test_config_hot_reloader_validation_error() {
let temp_file = NamedTempFile::new().unwrap();
let config_path = temp_file.path().with_extension("json");
let config = McpServerConfig::default();
config.to_file(&config_path, "json").unwrap();
let reloader = ConfigHotReloader::new(config, config_path, Duration::from_secs(1)).unwrap();
let mut invalid_config = McpServerConfig::default();
invalid_config.server.name = String::new();
let result = reloader.update_config(invalid_config).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_config_hot_reloader_disabled_start() {
let temp_file = NamedTempFile::new().unwrap();
let config_path = temp_file.path().with_extension("json");
let config = McpServerConfig::default();
config.to_file(&config_path, "json").unwrap();
let mut reloader =
ConfigHotReloader::new(config, config_path, Duration::from_secs(1)).unwrap();
reloader.set_enabled(false);
let result = reloader.start();
assert!(result.is_ok());
}
#[tokio::test]
async fn test_config_hot_reloader_reload_interval() {
let temp_file = NamedTempFile::new().unwrap();
let config_path = temp_file.path().with_extension("json");
let config = McpServerConfig::default();
config.to_file(&config_path, "json").unwrap();
let mut reloader =
ConfigHotReloader::new(config, config_path, Duration::from_secs(5)).unwrap();
assert_eq!(reloader.reload_interval(), Duration::from_secs(5));
reloader.set_reload_interval(Duration::from_secs(10));
assert_eq!(reloader.reload_interval(), Duration::from_secs(10));
}
#[tokio::test]
async fn test_config_hot_reloader_metadata_error() {
let temp_file = NamedTempFile::new().unwrap();
let config_path = temp_file.path().with_extension("json");
let config = McpServerConfig::default();
config.to_file(&config_path, "json").unwrap();
let reloader =
ConfigHotReloader::new(config, config_path.clone(), Duration::from_secs(1)).unwrap();
std::fs::remove_file(&config_path).unwrap();
let result = reloader.reload_now().await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_config_hot_reloader_with_handler_start() {
let temp_file = NamedTempFile::new().unwrap();
let config_path = temp_file.path().with_extension("json");
let config = McpServerConfig::default();
config.to_file(&config_path, "json").unwrap();
let handler = Arc::new(DefaultConfigChangeHandler);
let reloader =
ConfigHotReloaderWithHandler::new(config, config_path, Duration::from_secs(1), handler)
.unwrap();
let result = reloader.start_with_handler();
assert!(result.is_ok());
}
#[tokio::test]
async fn test_config_hot_reloader_file_modified_time() {
let temp_file = NamedTempFile::new().unwrap();
let config_path = temp_file.path().with_extension("json");
let config = McpServerConfig::default();
config.to_file(&config_path, "json").unwrap();
let modified_time = ConfigHotReloader::get_file_modified_time(&config_path);
assert!(modified_time.is_ok());
}
#[tokio::test]
async fn test_config_hot_reloader_file_modified_time_nonexistent() {
let config_path = PathBuf::from("/nonexistent/file.json");
let result = ConfigHotReloader::get_file_modified_time(&config_path);
assert!(result.is_err());
}
#[tokio::test]
async fn test_config_hot_reloader_config_path() {
let temp_file = NamedTempFile::new().unwrap();
let config_path = temp_file.path().with_extension("json");
let config = McpServerConfig::default();
config.to_file(&config_path, "json").unwrap();
let reloader =
ConfigHotReloader::new(config, config_path.clone(), Duration::from_secs(1)).unwrap();
assert_eq!(reloader.config_path(), &config_path);
}
}