use std::path::PathBuf;
use std::sync::Arc;
use parking_lot::RwLock;
use tokio::sync::broadcast;
use super::file_watcher::FileWatcherService;
use super::loader::ConfigLoader;
use super::validation::Validatable;
use super::watcher::{ConfigEvent, ConfigSource, ConfigWatcher};
use super::EngineConfig;
use crate::error::Error;
use crate::Result;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ReloadStrategy {
#[default]
ValidateFirst,
Immediate,
DryRun,
}
#[derive(Debug, Clone)]
pub enum ReloadEvent {
FileChanged { path: PathBuf },
Reloaded {
path: PathBuf,
changes: Vec<ConfigChange>,
},
Failed { path: PathBuf, error: String },
ValidationFailed { path: PathBuf, errors: Vec<String> },
RolledBack { path: PathBuf, reason: String },
}
#[derive(Debug, Clone)]
pub struct ConfigChange {
pub field: String,
pub old_value: String,
pub new_value: String,
}
impl ConfigChange {
pub fn new(field: impl Into<String>, old: impl ToString, new: impl ToString) -> Self {
Self {
field: field.into(),
old_value: old.to_string(),
new_value: new.to_string(),
}
}
}
pub struct HotReloadManager {
config: Arc<RwLock<EngineConfig>>,
file_watcher: Option<FileWatcherService>,
config_watcher: Arc<ConfigWatcher>,
strategy: ReloadStrategy,
config_path: Option<PathBuf>,
event_tx: broadcast::Sender<ReloadEvent>,
running: Arc<RwLock<bool>>,
}
impl HotReloadManager {
pub fn new(config: EngineConfig) -> Self {
let (event_tx, _) = broadcast::channel(64);
let config_watcher = Arc::new(ConfigWatcher::new());
Self {
config: Arc::new(RwLock::new(config)),
file_watcher: None,
config_watcher,
strategy: ReloadStrategy::default(),
config_path: None,
event_tx,
running: Arc::new(RwLock::new(false)),
}
}
pub fn strategy(mut self, strategy: ReloadStrategy) -> Self {
self.strategy = strategy;
self
}
pub fn config_path(mut self, path: impl Into<PathBuf>) -> Self {
self.config_path = Some(path.into());
self
}
pub fn config(&self) -> EngineConfig {
self.config.read().clone()
}
pub fn config_lock(&self) -> &Arc<RwLock<EngineConfig>> {
&self.config
}
pub fn is_running(&self) -> bool {
*self.running.read()
}
pub fn subscribe(&self) -> broadcast::Receiver<ReloadEvent> {
self.event_tx.subscribe()
}
pub fn config_watcher(&self) -> &Arc<ConfigWatcher> {
&self.config_watcher
}
pub fn start(&mut self) -> Result<()> {
if *self.running.read() {
return Err(Error::Engine("Hot reload manager already running".into()));
}
let file_watcher = FileWatcherService::new(self.config_watcher.clone());
if let Some(path) = &self.config_path {
file_watcher.watch(ConfigSource::Main(path.clone()))?;
}
file_watcher.start()?;
self.start_event_processor();
self.file_watcher = Some(file_watcher);
*self.running.write() = true;
tracing::info!("Hot reload manager started");
Ok(())
}
pub fn stop(&mut self) -> Result<()> {
if !*self.running.read() {
return Err(Error::Engine("Hot reload manager not running".into()));
}
if let Some(ref file_watcher) = self.file_watcher {
file_watcher.stop()?;
}
*self.running.write() = false;
self.file_watcher = None;
tracing::info!("Hot reload manager stopped");
Ok(())
}
pub fn reload(&self) -> Result<Vec<ConfigChange>> {
let path = self
.config_path
.as_ref()
.ok_or_else(|| Error::Config("No configuration path set".into()))?;
self.reload_from_path(path)
}
pub fn reload_from_path(&self, path: &PathBuf) -> Result<Vec<ConfigChange>> {
tracing::debug!(path = %path.display(), "Reloading configuration");
let _ = self
.event_tx
.send(ReloadEvent::FileChanged { path: path.clone() });
let new_config: EngineConfig = ConfigLoader::load(path)?;
if self.strategy == ReloadStrategy::ValidateFirst {
if let Err(e) = new_config.validate() {
let error_msg = e.to_string();
let _ = self.event_tx.send(ReloadEvent::ValidationFailed {
path: path.clone(),
errors: vec![error_msg.clone()],
});
return Err(e);
}
}
if self.strategy == ReloadStrategy::DryRun {
let changes = self.detect_changes(&new_config);
tracing::info!(
path = %path.display(),
changes = changes.len(),
"Dry-run: would apply {} changes",
changes.len()
);
return Ok(changes);
}
let changes = self.detect_changes(&new_config);
*self.config.write() = new_config;
let _ = self.event_tx.send(ReloadEvent::Reloaded {
path: path.clone(),
changes: changes.clone(),
});
tracing::info!(
path = %path.display(),
changes = changes.len(),
"Configuration reloaded successfully"
);
Ok(changes)
}
fn detect_changes(&self, new_config: &EngineConfig) -> Vec<ConfigChange> {
let current = self.config.read();
let mut changes = Vec::new();
if current.name != new_config.name {
changes.push(ConfigChange::new("name", ¤t.name, &new_config.name));
}
if current.max_devices != new_config.max_devices {
changes.push(ConfigChange::new(
"max_devices",
current.max_devices,
new_config.max_devices,
));
}
if current.max_points != new_config.max_points {
changes.push(ConfigChange::new(
"max_points",
current.max_points,
new_config.max_points,
));
}
if current.tick_interval_ms != new_config.tick_interval_ms {
changes.push(ConfigChange::new(
"tick_interval_ms",
current.tick_interval_ms,
new_config.tick_interval_ms,
));
}
if current.workers != new_config.workers {
changes.push(ConfigChange::new(
"workers",
current.workers,
new_config.workers,
));
}
if current.enable_metrics != new_config.enable_metrics {
changes.push(ConfigChange::new(
"enable_metrics",
current.enable_metrics,
new_config.enable_metrics,
));
}
if current.metrics_interval_secs != new_config.metrics_interval_secs {
changes.push(ConfigChange::new(
"metrics_interval_secs",
current.metrics_interval_secs,
new_config.metrics_interval_secs,
));
}
if current.log_level != new_config.log_level {
changes.push(ConfigChange::new(
"log_level",
¤t.log_level,
&new_config.log_level,
));
}
changes
}
fn start_event_processor(&self) {
let mut rx = self.config_watcher.subscribe();
let config = self.config.clone();
let strategy = self.strategy;
let event_tx = self.event_tx.clone();
let running = self.running.clone();
tokio::spawn(async move {
while *running.read() {
match rx.recv().await {
Ok(ConfigEvent::Modified { source, .. })
| Ok(ConfigEvent::Created { source, .. }) => {
let path = source.path().clone();
match ConfigLoader::load::<EngineConfig>(&path) {
Ok(new_config) => {
if strategy == ReloadStrategy::ValidateFirst {
if let Err(e) = new_config.validate() {
let _ = event_tx.send(ReloadEvent::ValidationFailed {
path: path.clone(),
errors: vec![e.to_string()],
});
continue;
}
}
if strategy != ReloadStrategy::DryRun {
*config.write() = new_config;
}
let _ = event_tx.send(ReloadEvent::Reloaded {
path,
changes: Vec::new(), });
}
Err(e) => {
let _ = event_tx.send(ReloadEvent::Failed {
path,
error: e.to_string(),
});
}
}
}
Ok(ConfigEvent::Deleted { source, .. }) => {
tracing::warn!(
path = %source.path().display(),
"Configuration file deleted"
);
}
Ok(ConfigEvent::Error { message, .. }) => {
tracing::error!(error = %message, "Config watcher error");
}
Err(broadcast::error::RecvError::Closed) => {
break;
}
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(skipped = n, "Event processor lagged behind");
}
_ => {}
}
}
});
}
pub fn watch(&self, source: ConfigSource) -> Result<()> {
if let Some(ref file_watcher) = self.file_watcher {
file_watcher.watch(source)?;
}
Ok(())
}
pub fn unwatch(&self, path: &PathBuf) -> Result<()> {
if let Some(ref file_watcher) = self.file_watcher {
file_watcher.unwatch(path)?;
}
Ok(())
}
}
impl Drop for HotReloadManager {
fn drop(&mut self) {
let _ = self.stop();
}
}
pub struct HotReloadManagerBuilder {
config: EngineConfig,
strategy: ReloadStrategy,
config_path: Option<PathBuf>,
additional_paths: Vec<ConfigSource>,
}
impl HotReloadManagerBuilder {
pub fn new() -> Self {
Self {
config: EngineConfig::default(),
strategy: ReloadStrategy::default(),
config_path: None,
additional_paths: Vec::new(),
}
}
pub fn with_config(config: EngineConfig) -> Self {
Self {
config,
strategy: ReloadStrategy::default(),
config_path: None,
additional_paths: Vec::new(),
}
}
pub fn strategy(mut self, strategy: ReloadStrategy) -> Self {
self.strategy = strategy;
self
}
pub fn config_path(mut self, path: impl Into<PathBuf>) -> Self {
self.config_path = Some(path.into());
self
}
pub fn watch(mut self, source: ConfigSource) -> Self {
self.additional_paths.push(source);
self
}
pub fn build(self) -> HotReloadManager {
let mut manager = HotReloadManager::new(self.config).strategy(self.strategy);
if let Some(path) = self.config_path {
manager = manager.config_path(path);
}
manager
}
}
impl Default for HotReloadManagerBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use tempfile::tempdir;
#[test]
fn test_reload_strategy_default() {
assert_eq!(ReloadStrategy::default(), ReloadStrategy::ValidateFirst);
}
#[test]
fn test_hot_reload_manager_creation() {
let config = EngineConfig::default();
let manager = HotReloadManager::new(config.clone());
assert!(!manager.is_running());
assert_eq!(manager.config().max_devices, config.max_devices);
}
#[test]
fn test_hot_reload_manager_config_access() {
let config = EngineConfig::default().with_max_devices(50_000);
let manager = HotReloadManager::new(config);
let current = manager.config();
assert_eq!(current.max_devices, 50_000);
}
#[test]
fn test_detect_changes() {
let old_config = EngineConfig::default();
let manager = HotReloadManager::new(old_config);
let new_config = EngineConfig::default()
.with_max_devices(50_000)
.with_log_level("debug");
let changes = manager.detect_changes(&new_config);
assert!(changes.iter().any(|c| c.field == "max_devices"));
assert!(changes.iter().any(|c| c.field == "log_level"));
}
#[test]
fn test_config_change() {
let change = ConfigChange::new("max_devices", 10_000, 50_000);
assert_eq!(change.field, "max_devices");
assert_eq!(change.old_value, "10000");
assert_eq!(change.new_value, "50000");
}
#[test]
fn test_builder() {
let manager = HotReloadManagerBuilder::new()
.strategy(ReloadStrategy::DryRun)
.config_path("config.yaml")
.build();
assert_eq!(manager.strategy, ReloadStrategy::DryRun);
}
#[tokio::test]
async fn test_reload_from_file() {
let dir = tempdir().unwrap();
let config_path = dir.path().join("config.yaml");
let yaml = "name: test\nmax_devices: 10000\nmax_points: 100000\n";
fs::write(&config_path, yaml).unwrap();
let config = EngineConfig::default();
let manager = HotReloadManager::new(config).config_path(config_path.clone());
let result = manager.reload();
assert!(result.is_ok());
assert_eq!(manager.config().name, "test");
}
#[tokio::test]
async fn test_reload_validation_failure() {
let dir = tempdir().unwrap();
let config_path = dir.path().join("config.yaml");
let yaml = "name: test\nmax_devices: 0\n"; fs::write(&config_path, yaml).unwrap();
let config = EngineConfig::default();
let manager = HotReloadManager::new(config)
.strategy(ReloadStrategy::ValidateFirst)
.config_path(config_path);
let result = manager.reload();
assert!(result.is_err());
}
#[tokio::test]
async fn test_dry_run_mode() {
let dir = tempdir().unwrap();
let config_path = dir.path().join("config.yaml");
let yaml = "name: dry-run-test\nmax_devices: 99999\nmax_points: 1000000\n";
fs::write(&config_path, yaml).unwrap();
let original_config = EngineConfig::default();
let manager = HotReloadManager::new(original_config.clone())
.strategy(ReloadStrategy::DryRun)
.config_path(config_path);
let changes = manager.reload().unwrap();
assert!(changes.iter().any(|c| c.field == "name"));
assert_eq!(manager.config().name, original_config.name);
}
}