mod coordinator;
mod signals;
mod validators;
pub use coordinator::GracefulReloadCoordinator;
pub use signals::{SignalManager, SignalType};
pub use validators::{RouteValidator, UpstreamValidator};
use arc_swap::ArcSwap;
use notify::{Event, EventKind, RecursiveMode, Watcher};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{broadcast, Mutex, RwLock};
use tracing::{debug, error, info, trace, warn};
use zentinel_common::errors::{ZentinelError, ZentinelResult};
use zentinel_config::Config;
use crate::logging::{AuditLogEntry, SharedLogManager};
use crate::tls::CertificateReloader;
#[derive(Debug, Clone)]
pub enum ReloadEvent {
Started {
timestamp: Instant,
trigger: ReloadTrigger,
},
Validated { timestamp: Instant },
Applied { timestamp: Instant, version: String },
Failed { timestamp: Instant, error: String },
RolledBack { timestamp: Instant, reason: String },
}
#[derive(Debug, Clone)]
pub enum ReloadTrigger {
Manual,
FileChange,
Signal,
Scheduled,
GatewayApi,
}
#[async_trait::async_trait]
pub trait ConfigValidator: Send + Sync {
async fn validate(&self, config: &Config) -> ZentinelResult<()>;
fn name(&self) -> &str;
}
#[async_trait::async_trait]
pub trait ReloadHook: Send + Sync {
async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> ZentinelResult<()>;
async fn post_reload(&self, old_config: &Config, new_config: &Config);
async fn on_failure(&self, config: &Config, error: &ZentinelError);
fn name(&self) -> &str;
}
#[derive(Default)]
pub struct ReloadStats {
pub total_reloads: std::sync::atomic::AtomicU64,
pub successful_reloads: std::sync::atomic::AtomicU64,
pub failed_reloads: std::sync::atomic::AtomicU64,
pub rollbacks: std::sync::atomic::AtomicU64,
pub config_version: std::sync::atomic::AtomicU64,
pub last_success: RwLock<Option<Instant>>,
pub last_failure: RwLock<Option<Instant>>,
pub avg_duration_ms: RwLock<f64>,
}
pub struct ConfigManager {
current_config: Arc<ArcSwap<Config>>,
previous_config: Arc<RwLock<Option<Arc<Config>>>>,
config_path: PathBuf,
watcher: Arc<RwLock<Option<notify::RecommendedWatcher>>>,
reload_tx: broadcast::Sender<ReloadEvent>,
stats: Arc<ReloadStats>,
validators: Arc<RwLock<Vec<Box<dyn ConfigValidator>>>>,
reload_hooks: Arc<RwLock<Vec<Box<dyn ReloadHook>>>>,
cert_reloader: Arc<CertificateReloader>,
reload_mutex: Arc<Mutex<()>>,
}
impl ConfigManager {
pub async fn new(
config_path: impl AsRef<Path>,
initial_config: Config,
) -> ZentinelResult<Self> {
let config_path = config_path.as_ref().to_path_buf();
let (reload_tx, _) = broadcast::channel(100);
info!(
config_path = %config_path.display(),
route_count = initial_config.routes.len(),
upstream_count = initial_config.upstreams.len(),
listener_count = initial_config.listeners.len(),
"Initializing configuration manager"
);
trace!(
config_path = %config_path.display(),
"Creating ArcSwap for configuration"
);
Ok(Self {
current_config: Arc::new(ArcSwap::from_pointee(initial_config)),
previous_config: Arc::new(RwLock::new(None)),
config_path,
watcher: Arc::new(RwLock::new(None)),
reload_tx,
stats: Arc::new(ReloadStats::default()),
validators: Arc::new(RwLock::new(Vec::new())),
reload_hooks: Arc::new(RwLock::new(Vec::new())),
cert_reloader: Arc::new(CertificateReloader::new()),
reload_mutex: Arc::new(Mutex::new(())),
})
}
pub fn cert_reloader(&self) -> Arc<CertificateReloader> {
Arc::clone(&self.cert_reloader)
}
pub fn current(&self) -> Arc<Config> {
self.current_config.load_full()
}
pub async fn start_watching(&self) -> ZentinelResult<()> {
if self.watcher.read().await.is_some() {
warn!("File watcher already active, skipping");
return Ok(());
}
let config_path = self.config_path.clone();
let notify = Arc::new(tokio::sync::Notify::new());
let notify_sender = Arc::clone(¬ify);
let watched_path = config_path.clone();
let mut watcher =
notify::recommended_watcher(move |event: Result<Event, notify::Error>| {
match event {
Ok(event) => {
if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_)) {
let dominated = event.paths.iter().any(|p| {
p == &watched_path || p.extension().is_some_and(|ext| ext == "kdl")
});
if dominated {
notify_sender.notify_one();
}
}
}
Err(e) => {
warn!(error = %e, "File watcher error");
}
}
})
.map_err(|e| ZentinelError::Config {
message: format!("Failed to create file watcher: {}", e),
source: None,
})?;
watcher
.watch(&config_path, RecursiveMode::NonRecursive)
.map_err(|e| ZentinelError::Config {
message: format!("Failed to watch config file: {}", e),
source: None,
})?;
if let Some(parent) = config_path.parent() {
if let Err(e) = watcher.watch(parent, RecursiveMode::Recursive) {
warn!(
path = %parent.display(),
error = %e,
"Could not watch config directory for included files, \
only the main config file will trigger auto-reload"
);
} else {
debug!(
path = %parent.display(),
"Watching config directory recursively for included file changes"
);
}
}
*self.watcher.write().await = Some(watcher);
let manager = Arc::new(self.clone_for_task());
let config_path_log = self.config_path.clone();
tokio::spawn(async move {
loop {
notify.notified().await;
while let Ok(()) =
tokio::time::timeout(Duration::from_millis(200), notify.notified()).await
{
trace!("Debounce: additional file change, resetting timer");
}
info!("Configuration file changed, triggering reload");
if let Err(e) = manager.reload(ReloadTrigger::FileChange).await {
error!(error = %e, "Auto-reload failed, continuing with current configuration");
}
}
});
let poll_manager = Arc::new(self.clone_for_task());
let poll_path = self.config_path.clone();
tokio::spawn(async move {
use std::io::Read;
let content_sig = |p: &std::path::Path| -> Option<(u64, Vec<u8>)> {
let mut f = std::fs::File::open(p).ok()?;
let meta = f.metadata().ok()?;
let len = meta.len();
let mut buf = vec![0u8; 256.min(len as usize)];
f.read_exact(&mut buf).ok()?;
Some((len, buf))
};
let mut last_sig = content_sig(&poll_path);
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let current_sig = content_sig(&poll_path);
if current_sig != last_sig {
last_sig = current_sig;
debug!("Config file content changed (poll fallback), triggering reload");
if let Err(e) = poll_manager.reload(ReloadTrigger::FileChange).await {
error!(error = %e, "Poll-triggered reload failed");
}
}
}
});
info!(
config_file = %self.config_path.display(),
"Auto-reload enabled: watching for configuration changes (with poll fallback)"
);
Ok(())
}
pub async fn reload(&self, trigger: ReloadTrigger) -> ZentinelResult<()> {
let _reload_guard = self.reload_mutex.lock().await;
let start = Instant::now();
let reload_num = self
.stats
.total_reloads
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
+ 1;
info!(
trigger = ?trigger,
reload_num = reload_num,
config_path = %self.config_path.display(),
"Starting configuration reload"
);
let _ = self.reload_tx.send(ReloadEvent::Started {
timestamp: Instant::now(),
trigger: trigger.clone(),
});
trace!(
config_path = %self.config_path.display(),
"Reading configuration file"
);
let new_config = match Config::from_file(&self.config_path) {
Ok(config) => {
debug!(
route_count = config.routes.len(),
upstream_count = config.upstreams.len(),
listener_count = config.listeners.len(),
"Configuration file parsed successfully"
);
config
}
Err(e) => {
let error_msg = format!("Failed to load configuration: {}", e);
error!(
config_path = %self.config_path.display(),
error = %e,
"Failed to load configuration file"
);
self.stats
.failed_reloads
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
*self.stats.last_failure.write().await = Some(Instant::now());
let _ = self.reload_tx.send(ReloadEvent::Failed {
timestamp: Instant::now(),
error: error_msg.clone(),
});
return Err(ZentinelError::Config {
message: error_msg,
source: None,
});
}
};
trace!("Starting configuration validation");
if let Err(e) = self.validate_config(&new_config).await {
error!(
error = %e,
"Configuration validation failed - new configuration REJECTED"
);
self.stats
.failed_reloads
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
*self.stats.last_failure.write().await = Some(Instant::now());
let _ = self.reload_tx.send(ReloadEvent::Failed {
timestamp: Instant::now(),
error: e.to_string(),
});
return Err(e);
}
info!(
route_count = new_config.routes.len(),
upstream_count = new_config.upstreams.len(),
"Configuration validation passed, applying new configuration"
);
let _ = self.reload_tx.send(ReloadEvent::Validated {
timestamp: Instant::now(),
});
let old_config = self.current_config.load_full();
trace!(
old_routes = old_config.routes.len(),
new_routes = new_config.routes.len(),
"Preparing configuration swap"
);
let hooks = self.reload_hooks.read().await;
for hook in hooks.iter() {
trace!(hook_name = %hook.name(), "Running pre-reload hook");
if let Err(e) = hook.pre_reload(&old_config, &new_config).await {
warn!(
hook_name = %hook.name(),
error = %e,
"Pre-reload hook failed"
);
}
}
drop(hooks);
trace!("Saving previous configuration for potential rollback");
*self.previous_config.write().await = Some(old_config.clone());
trace!("Applying new configuration atomically");
self.current_config.store(Arc::new(new_config.clone()));
let hooks = self.reload_hooks.read().await;
for hook in hooks.iter() {
trace!(hook_name = %hook.name(), "Running post-reload hook");
hook.post_reload(&old_config, &new_config).await;
}
drop(hooks);
let duration = start.elapsed();
let successful_count = self
.stats
.successful_reloads
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
+ 1;
*self.stats.last_success.write().await = Some(Instant::now());
{
let mut avg = self.stats.avg_duration_ms.write().await;
let total = successful_count as f64;
*avg = (*avg * (total - 1.0) + duration.as_millis() as f64) / total;
}
let new_version = self
.stats
.config_version
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
+ 1;
let _ = self.reload_tx.send(ReloadEvent::Applied {
timestamp: Instant::now(),
version: format!("v{}", new_version),
});
let (cert_success, cert_errors) = self.cert_reloader.reload_all();
if !cert_errors.is_empty() {
for (listener_id, error) in &cert_errors {
error!(
listener_id = %listener_id,
error = %error,
"TLS certificate reload failed for listener"
);
}
}
info!(
duration_ms = duration.as_millis(),
successful_reloads = successful_count,
route_count = new_config.routes.len(),
upstream_count = new_config.upstreams.len(),
cert_reload_success = cert_success,
cert_reload_errors = cert_errors.len(),
"Configuration reload completed successfully"
);
Ok(())
}
pub async fn apply_config(
&self,
new_config: Config,
trigger: ReloadTrigger,
) -> ZentinelResult<()> {
let _reload_guard = self.reload_mutex.lock().await;
let start = Instant::now();
let reload_num = self
.stats
.total_reloads
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
+ 1;
info!(
trigger = ?trigger,
reload_num = reload_num,
routes = new_config.routes.len(),
upstreams = new_config.upstreams.len(),
listeners = new_config.listeners.len(),
"Applying programmatic configuration"
);
let _ = self.reload_tx.send(ReloadEvent::Started {
timestamp: Instant::now(),
trigger,
});
if let Err(e) = self.validate_config(&new_config).await {
error!(error = %e, "Programmatic configuration validation failed");
self.stats
.failed_reloads
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
*self.stats.last_failure.write().await = Some(Instant::now());
let _ = self.reload_tx.send(ReloadEvent::Failed {
timestamp: Instant::now(),
error: e.to_string(),
});
return Err(e);
}
let _ = self.reload_tx.send(ReloadEvent::Validated {
timestamp: Instant::now(),
});
let old_config = self.current_config.load_full();
let hooks = self.reload_hooks.read().await;
for hook in hooks.iter() {
if let Err(e) = hook.pre_reload(&old_config, &new_config).await {
warn!(hook_name = %hook.name(), error = %e, "Pre-reload hook failed");
}
}
drop(hooks);
*self.previous_config.write().await = Some(old_config.clone());
self.current_config.store(Arc::new(new_config.clone()));
let hooks = self.reload_hooks.read().await;
for hook in hooks.iter() {
hook.post_reload(&old_config, &new_config).await;
}
drop(hooks);
let duration = start.elapsed();
let successful_count = self
.stats
.successful_reloads
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
+ 1;
*self.stats.last_success.write().await = Some(Instant::now());
{
let mut avg = self.stats.avg_duration_ms.write().await;
let total = successful_count as f64;
*avg = (*avg * (total - 1.0) + duration.as_millis() as f64) / total;
}
let new_version = self
.stats
.config_version
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
+ 1;
let _ = self.reload_tx.send(ReloadEvent::Applied {
timestamp: Instant::now(),
version: format!("v{}", new_version),
});
let (cert_success, cert_errors) = self.cert_reloader.reload_all();
if !cert_errors.is_empty() {
for (listener_id, error) in &cert_errors {
error!(
listener_id = %listener_id,
error = %error,
"TLS certificate reload failed for listener"
);
}
}
info!(
duration_ms = duration.as_millis(),
successful_reloads = successful_count,
route_count = new_config.routes.len(),
upstream_count = new_config.upstreams.len(),
cert_reload_success = cert_success,
cert_reload_errors = cert_errors.len(),
"Programmatic configuration applied successfully"
);
Ok(())
}
pub fn config_store(&self) -> Arc<ArcSwap<Config>> {
Arc::clone(&self.current_config)
}
pub async fn rollback(&self, reason: String) -> ZentinelResult<()> {
info!(
reason = %reason,
"Starting configuration rollback"
);
let previous = self.previous_config.read().await.clone();
if let Some(prev_config) = previous {
trace!(
route_count = prev_config.routes.len(),
"Found previous configuration for rollback"
);
trace!("Validating previous configuration");
if let Err(e) = self.validate_config(&prev_config).await {
error!(
error = %e,
"Previous configuration validation failed during rollback"
);
return Err(e);
}
trace!("Applying previous configuration");
self.current_config.store(prev_config.clone());
let rollback_count = self
.stats
.rollbacks
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
+ 1;
let _ = self.reload_tx.send(ReloadEvent::RolledBack {
timestamp: Instant::now(),
reason: reason.clone(),
});
info!(
reason = %reason,
rollback_count = rollback_count,
route_count = prev_config.routes.len(),
"Configuration rolled back successfully"
);
Ok(())
} else {
warn!("No previous configuration available for rollback");
Err(ZentinelError::Config {
message: "No previous configuration available".to_string(),
source: None,
})
}
}
async fn validate_config(&self, config: &Config) -> ZentinelResult<()> {
trace!(
route_count = config.routes.len(),
upstream_count = config.upstreams.len(),
"Starting configuration validation"
);
trace!("Running built-in config validation");
config.validate()?;
let validators = self.validators.read().await;
trace!(
validator_count = validators.len(),
"Running custom validators"
);
for validator in validators.iter() {
trace!(validator_name = %validator.name(), "Running validator");
validator.validate(config).await.map_err(|e| {
error!(
validator_name = %validator.name(),
error = %e,
"Validator failed"
);
e
})?;
}
debug!(
route_count = config.routes.len(),
upstream_count = config.upstreams.len(),
"Configuration validation passed"
);
Ok(())
}
pub async fn add_validator(&self, validator: Box<dyn ConfigValidator>) {
info!("Adding configuration validator: {}", validator.name());
self.validators.write().await.push(validator);
}
pub async fn add_hook(&self, hook: Box<dyn ReloadHook>) {
info!("Adding reload hook: {}", hook.name());
self.reload_hooks.write().await.push(hook);
}
pub fn subscribe(&self) -> broadcast::Receiver<ReloadEvent> {
self.reload_tx.subscribe()
}
pub fn stats(&self) -> &ReloadStats {
&self.stats
}
fn clone_for_task(&self) -> ConfigManager {
ConfigManager {
current_config: Arc::clone(&self.current_config),
previous_config: Arc::clone(&self.previous_config),
config_path: self.config_path.clone(),
watcher: self.watcher.clone(),
reload_tx: self.reload_tx.clone(),
stats: Arc::clone(&self.stats),
validators: Arc::clone(&self.validators),
reload_hooks: Arc::clone(&self.reload_hooks),
cert_reloader: Arc::clone(&self.cert_reloader),
reload_mutex: Arc::clone(&self.reload_mutex),
}
}
}
pub struct AuditReloadHook {
log_manager: SharedLogManager,
}
impl AuditReloadHook {
pub fn new(log_manager: SharedLogManager) -> Self {
Self { log_manager }
}
}
#[async_trait::async_trait]
impl ReloadHook for AuditReloadHook {
async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> ZentinelResult<()> {
let trace_id = uuid::Uuid::new_v4().to_string();
let audit_entry = AuditLogEntry::config_change(
&trace_id,
"reload_started",
format!(
"Configuration reload starting: {} routes -> {} routes, {} upstreams -> {} upstreams",
old_config.routes.len(),
new_config.routes.len(),
old_config.upstreams.len(),
new_config.upstreams.len()
),
);
self.log_manager.log_audit(&audit_entry);
Ok(())
}
async fn post_reload(&self, old_config: &Config, new_config: &Config) {
let trace_id = uuid::Uuid::new_v4().to_string();
let audit_entry = AuditLogEntry::config_change(
&trace_id,
"reload_success",
format!(
"Configuration reload successful: {} routes, {} upstreams, {} listeners",
new_config.routes.len(),
new_config.upstreams.len(),
new_config.listeners.len()
),
)
.with_metadata("old_routes", old_config.routes.len().to_string())
.with_metadata("new_routes", new_config.routes.len().to_string())
.with_metadata("old_upstreams", old_config.upstreams.len().to_string())
.with_metadata("new_upstreams", new_config.upstreams.len().to_string());
self.log_manager.log_audit(&audit_entry);
}
async fn on_failure(&self, config: &Config, error: &ZentinelError) {
let trace_id = uuid::Uuid::new_v4().to_string();
let audit_entry = AuditLogEntry::config_change(
&trace_id,
"reload_failed",
format!("Configuration reload failed: {}", error),
)
.with_metadata("current_routes", config.routes.len().to_string())
.with_metadata("current_upstreams", config.upstreams.len().to_string());
self.log_manager.log_audit(&audit_entry);
}
fn name(&self) -> &str {
"audit_reload_hook"
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_config_reload_rejects_invalid_config() {
let initial_config = Config::default_for_testing();
let initial_routes = initial_config.routes.len();
let temp_dir = tempfile::tempdir().unwrap();
let config_path = temp_dir.path().join("config.kdl");
std::fs::write(&config_path, "this is not valid KDL { {{{{ broken").unwrap();
let manager = ConfigManager::new(&config_path, initial_config)
.await
.unwrap();
assert_eq!(manager.current().routes.len(), initial_routes);
let result = manager.reload(ReloadTrigger::Manual).await;
assert!(result.is_err(), "Reload should fail for invalid config");
assert_eq!(
manager.current().routes.len(),
initial_routes,
"Original config should be preserved after failed reload"
);
assert_eq!(
manager
.stats()
.failed_reloads
.load(std::sync::atomic::Ordering::Relaxed),
1,
"Failed reload should be recorded"
);
}
#[tokio::test]
async fn test_config_reload_accepts_valid_config() {
let initial_config = Config::default_for_testing();
let temp_dir = tempfile::tempdir().unwrap();
let config_path = temp_dir.path().join("config.kdl");
let static_dir = temp_dir.path().join("static");
std::fs::create_dir_all(&static_dir).unwrap();
let valid_config = r#"
server {
worker-threads 4
}
listeners {
listener "http" {
address "0.0.0.0:8080"
protocol "http"
}
}
upstreams {
upstream "backend" {
target "127.0.0.1:3000"
}
}
routes {
route "api" {
priority "high"
matches {
path-prefix "/api/"
}
upstream "backend"
}
}
"#;
std::fs::write(&config_path, valid_config).unwrap();
let manager = ConfigManager::new(&config_path, initial_config)
.await
.unwrap();
let result = manager.reload(ReloadTrigger::Manual).await;
assert!(
result.is_ok(),
"Reload should succeed for valid config: {:?}",
result.err()
);
assert_eq!(
manager
.stats()
.successful_reloads
.load(std::sync::atomic::Ordering::Relaxed),
1,
"Successful reload should be recorded"
);
}
fn write_config_with_routes(path: &Path, route_count: usize) {
let mut routes = String::new();
for i in 0..route_count {
routes.push_str(&format!(
r#"
route "route{i}" {{
priority "medium"
matches {{
path-prefix "/route{i}/"
}}
upstream "backend"
}}
"#
));
}
let config = format!(
r#"
server {{
worker-threads 4
}}
listeners {{
listener "http" {{
address "0.0.0.0:8080"
protocol "http"
}}
}}
upstreams {{
upstream "backend" {{
target "127.0.0.1:3000"
}}
}}
routes {{
{routes}
}}
"#
);
std::fs::write(path, config).unwrap();
}
#[tokio::test]
async fn test_concurrent_config_reads_during_reload() {
let initial_config = Config::default_for_testing();
let temp_dir = tempfile::tempdir().unwrap();
let config_path = temp_dir.path().join("config.kdl");
write_config_with_routes(&config_path, 5);
let manager = Arc::new(
ConfigManager::new(&config_path, initial_config)
.await
.unwrap(),
);
let mut readers = Vec::new();
for _ in 0..10 {
let manager_clone = Arc::clone(&manager);
readers.push(tokio::spawn(async move {
let mut read_count = 0;
for _ in 0..100 {
let config = manager_clone.current();
let _ = config.routes.len();
read_count += 1;
tokio::task::yield_now().await;
}
read_count
}));
}
let manager_reload = Arc::clone(&manager);
let reload_handle =
tokio::spawn(async move { manager_reload.reload(ReloadTrigger::Manual).await });
let mut total_reads = 0;
for reader in readers {
total_reads += reader.await.unwrap();
}
let reload_result = reload_handle.await.unwrap();
assert!(reload_result.is_ok(), "Reload should succeed");
assert_eq!(total_reads, 1000, "All reads should complete");
}
#[tokio::test]
async fn test_multiple_concurrent_reloads() {
let initial_config = Config::default_for_testing();
let temp_dir = tempfile::tempdir().unwrap();
let config_path = temp_dir.path().join("config.kdl");
write_config_with_routes(&config_path, 3);
let manager = Arc::new(
ConfigManager::new(&config_path, initial_config)
.await
.unwrap(),
);
let mut reload_handles = Vec::new();
for i in 0..5 {
let manager_clone = Arc::clone(&manager);
let trigger = if i % 2 == 0 {
ReloadTrigger::Manual
} else {
ReloadTrigger::Signal
};
reload_handles.push(tokio::spawn(
async move { manager_clone.reload(trigger).await },
));
}
let mut success_count = 0;
for handle in reload_handles {
if handle.await.unwrap().is_ok() {
success_count += 1;
}
}
assert!(success_count >= 1, "At least one reload should succeed");
let total = manager
.stats()
.total_reloads
.load(std::sync::atomic::Ordering::Relaxed);
assert_eq!(total, 5, "All reload attempts should be counted");
}
#[tokio::test]
async fn test_config_visibility_after_reload() {
let initial_config = Config::default_for_testing();
let initial_route_count = initial_config.routes.len();
let temp_dir = tempfile::tempdir().unwrap();
let config_path = temp_dir.path().join("config.kdl");
write_config_with_routes(&config_path, 2);
let manager = ConfigManager::new(&config_path, initial_config)
.await
.unwrap();
assert_eq!(manager.current().routes.len(), initial_route_count);
manager.reload(ReloadTrigger::Manual).await.unwrap();
assert_eq!(manager.current().routes.len(), 2);
write_config_with_routes(&config_path, 5);
manager.reload(ReloadTrigger::Manual).await.unwrap();
assert_eq!(
manager.current().routes.len(),
5,
"New config should be visible immediately after reload"
);
write_config_with_routes(&config_path, 1);
manager.reload(ReloadTrigger::Manual).await.unwrap();
assert_eq!(
manager.current().routes.len(),
1,
"Config changes should be visible after each reload"
);
}
#[tokio::test]
async fn test_rapid_successive_reloads() {
let initial_config = Config::default_for_testing();
let temp_dir = tempfile::tempdir().unwrap();
let config_path = temp_dir.path().join("config.kdl");
write_config_with_routes(&config_path, 3);
let manager = ConfigManager::new(&config_path, initial_config)
.await
.unwrap();
for i in 0..20 {
write_config_with_routes(&config_path, (i % 5) + 1);
let result = manager.reload(ReloadTrigger::Manual).await;
assert!(result.is_ok(), "Reload {} should succeed", i);
}
let stats = manager.stats();
assert_eq!(
stats
.successful_reloads
.load(std::sync::atomic::Ordering::Relaxed),
20,
"All 20 reloads should succeed"
);
assert_eq!(
stats
.failed_reloads
.load(std::sync::atomic::Ordering::Relaxed),
0,
"No reloads should fail"
);
}
#[tokio::test]
async fn test_rollback_preserves_previous_config() {
let initial_config = Config::default_for_testing();
let temp_dir = tempfile::tempdir().unwrap();
let config_path = temp_dir.path().join("config.kdl");
write_config_with_routes(&config_path, 3);
let manager = ConfigManager::new(&config_path, initial_config)
.await
.unwrap();
manager.reload(ReloadTrigger::Manual).await.unwrap();
assert_eq!(manager.current().routes.len(), 3);
write_config_with_routes(&config_path, 5);
manager.reload(ReloadTrigger::Manual).await.unwrap();
assert_eq!(manager.current().routes.len(), 5);
manager
.rollback("Testing rollback".to_string())
.await
.unwrap();
assert_eq!(
manager.current().routes.len(),
3,
"Rollback should restore previous config"
);
assert_eq!(
manager
.stats()
.rollbacks
.load(std::sync::atomic::Ordering::Relaxed),
1,
"Rollback should be recorded in stats"
);
}
#[tokio::test]
async fn test_reload_events_broadcast() {
let initial_config = Config::default_for_testing();
let temp_dir = tempfile::tempdir().unwrap();
let config_path = temp_dir.path().join("config.kdl");
write_config_with_routes(&config_path, 2);
let manager = ConfigManager::new(&config_path, initial_config)
.await
.unwrap();
let mut receiver = manager.subscribe();
manager.reload(ReloadTrigger::Manual).await.unwrap();
let mut events = Vec::new();
while let Ok(Ok(event)) =
tokio::time::timeout(Duration::from_millis(100), receiver.recv()).await
{
events.push(event);
}
assert!(
events.len() >= 2,
"Should receive at least Started and Applied/Validated events"
);
assert!(
events
.iter()
.any(|e| matches!(e, ReloadEvent::Started { .. })),
"Should receive Started event"
);
assert!(
events
.iter()
.any(|e| matches!(e, ReloadEvent::Applied { .. })),
"Should receive Applied event on success"
);
}
#[tokio::test]
async fn test_graceful_coordinator_with_reload() {
let coordinator = GracefulReloadCoordinator::new(Duration::from_secs(5));
coordinator.inc_requests();
coordinator.inc_requests();
coordinator.inc_requests();
assert_eq!(coordinator.active_count(), 3);
coordinator.dec_requests();
assert_eq!(coordinator.active_count(), 2);
let coord_clone = Arc::new(coordinator);
let coord_for_drain = Arc::clone(&coord_clone);
let drain_handle = tokio::spawn(async move { coord_for_drain.wait_for_drain().await });
tokio::time::sleep(Duration::from_millis(50)).await;
coord_clone.dec_requests();
tokio::time::sleep(Duration::from_millis(50)).await;
coord_clone.dec_requests();
let drained = drain_handle.await.unwrap();
assert!(drained, "All requests should drain successfully");
}
#[tokio::test]
async fn test_graceful_coordinator_drain_timeout() {
let coordinator = GracefulReloadCoordinator::new(Duration::from_millis(200));
coordinator.inc_requests();
coordinator.inc_requests();
let drained = coordinator.wait_for_drain().await;
assert!(!drained, "Drain should timeout with stuck requests");
assert_eq!(
coordinator.active_count(),
2,
"Requests should still be tracked"
);
}
}