1mod coordinator;
13mod signals;
14mod validators;
15
16pub use coordinator::GracefulReloadCoordinator;
17pub use signals::{SignalManager, SignalType};
18pub use validators::{RouteValidator, UpstreamValidator};
19
20use arc_swap::ArcSwap;
23use notify::{Event, EventKind, RecursiveMode, Watcher};
24use std::path::{Path, PathBuf};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tokio::sync::{broadcast, Mutex, RwLock};
28use tracing::{debug, error, info, trace, warn};
29
30use zentinel_common::errors::{ZentinelError, ZentinelResult};
31use zentinel_config::Config;
32
33use crate::logging::{AuditLogEntry, SharedLogManager};
34use crate::tls::CertificateReloader;
35
36#[derive(Debug, Clone)]
42pub enum ReloadEvent {
43 Started {
45 timestamp: Instant,
46 trigger: ReloadTrigger,
47 },
48 Validated { timestamp: Instant },
50 Applied { timestamp: Instant, version: String },
52 Failed { timestamp: Instant, error: String },
54 RolledBack { timestamp: Instant, reason: String },
56}
57
58#[derive(Debug, Clone)]
60pub enum ReloadTrigger {
61 Manual,
63 FileChange,
65 Signal,
67 Scheduled,
69 GatewayApi,
71}
72
73#[async_trait::async_trait]
79pub trait ConfigValidator: Send + Sync {
80 async fn validate(&self, config: &Config) -> ZentinelResult<()>;
82
83 fn name(&self) -> &str;
85}
86
87#[async_trait::async_trait]
89pub trait ReloadHook: Send + Sync {
90 async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> ZentinelResult<()>;
92
93 async fn post_reload(&self, old_config: &Config, new_config: &Config);
95
96 async fn on_failure(&self, config: &Config, error: &ZentinelError);
98
99 fn name(&self) -> &str;
101}
102
103#[derive(Default)]
109pub struct ReloadStats {
110 pub total_reloads: std::sync::atomic::AtomicU64,
112 pub successful_reloads: std::sync::atomic::AtomicU64,
114 pub failed_reloads: std::sync::atomic::AtomicU64,
116 pub rollbacks: std::sync::atomic::AtomicU64,
118 pub config_version: std::sync::atomic::AtomicU64,
120 pub last_success: RwLock<Option<Instant>>,
122 pub last_failure: RwLock<Option<Instant>>,
124 pub avg_duration_ms: RwLock<f64>,
126}
127
128pub struct ConfigManager {
134 current_config: Arc<ArcSwap<Config>>,
136 previous_config: Arc<RwLock<Option<Arc<Config>>>>,
138 config_path: PathBuf,
140 watcher: Arc<RwLock<Option<notify::RecommendedWatcher>>>,
142 reload_tx: broadcast::Sender<ReloadEvent>,
144 stats: Arc<ReloadStats>,
146 validators: Arc<RwLock<Vec<Box<dyn ConfigValidator>>>>,
148 reload_hooks: Arc<RwLock<Vec<Box<dyn ReloadHook>>>>,
150 cert_reloader: Arc<CertificateReloader>,
152 reload_mutex: Arc<Mutex<()>>,
154}
155
156impl ConfigManager {
157 pub async fn new(
159 config_path: impl AsRef<Path>,
160 initial_config: Config,
161 ) -> ZentinelResult<Self> {
162 let config_path = config_path.as_ref().to_path_buf();
163 let (reload_tx, _) = broadcast::channel(100);
164
165 info!(
166 config_path = %config_path.display(),
167 route_count = initial_config.routes.len(),
168 upstream_count = initial_config.upstreams.len(),
169 listener_count = initial_config.listeners.len(),
170 "Initializing configuration manager"
171 );
172
173 trace!(
174 config_path = %config_path.display(),
175 "Creating ArcSwap for configuration"
176 );
177
178 Ok(Self {
179 current_config: Arc::new(ArcSwap::from_pointee(initial_config)),
180 previous_config: Arc::new(RwLock::new(None)),
181 config_path,
182 watcher: Arc::new(RwLock::new(None)),
183 reload_tx,
184 stats: Arc::new(ReloadStats::default()),
185 validators: Arc::new(RwLock::new(Vec::new())),
186 reload_hooks: Arc::new(RwLock::new(Vec::new())),
187 cert_reloader: Arc::new(CertificateReloader::new()),
188 reload_mutex: Arc::new(Mutex::new(())),
189 })
190 }
191
192 pub fn cert_reloader(&self) -> Arc<CertificateReloader> {
194 Arc::clone(&self.cert_reloader)
195 }
196
197 pub fn current(&self) -> Arc<Config> {
199 self.current_config.load_full()
200 }
201
202 pub async fn start_watching(&self) -> ZentinelResult<()> {
208 if self.watcher.read().await.is_some() {
210 warn!("File watcher already active, skipping");
211 return Ok(());
212 }
213
214 let config_path = self.config_path.clone();
215
216 let notify = Arc::new(tokio::sync::Notify::new());
220 let notify_sender = Arc::clone(¬ify);
221
222 let watched_path = config_path.clone();
223 let mut watcher =
224 notify::recommended_watcher(move |event: Result<Event, notify::Error>| {
225 match event {
226 Ok(event) => {
227 if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_)) {
228 let dominated = event.paths.iter().any(|p| {
230 p == &watched_path || p.extension().is_some_and(|ext| ext == "kdl")
231 });
232 if dominated {
233 notify_sender.notify_one();
234 }
235 }
236 }
237 Err(e) => {
238 warn!(error = %e, "File watcher error");
239 }
240 }
241 })
242 .map_err(|e| ZentinelError::Config {
243 message: format!("Failed to create file watcher: {}", e),
244 source: None,
245 })?;
246
247 watcher
249 .watch(&config_path, RecursiveMode::NonRecursive)
250 .map_err(|e| ZentinelError::Config {
251 message: format!("Failed to watch config file: {}", e),
252 source: None,
253 })?;
254
255 if let Some(parent) = config_path.parent() {
259 if let Err(e) = watcher.watch(parent, RecursiveMode::Recursive) {
260 warn!(
261 path = %parent.display(),
262 error = %e,
263 "Could not watch config directory for included files, \
264 only the main config file will trigger auto-reload"
265 );
266 } else {
267 debug!(
268 path = %parent.display(),
269 "Watching config directory recursively for included file changes"
270 );
271 }
272 }
273
274 *self.watcher.write().await = Some(watcher);
276
277 let manager = Arc::new(self.clone_for_task());
279 let config_path_log = self.config_path.clone();
280 tokio::spawn(async move {
281 loop {
282 notify.notified().await;
284
285 while let Ok(()) =
289 tokio::time::timeout(Duration::from_millis(200), notify.notified()).await
290 {
291 trace!("Debounce: additional file change, resetting timer");
293 }
294
295 info!("Configuration file changed, triggering reload");
296
297 if let Err(e) = manager.reload(ReloadTrigger::FileChange).await {
298 error!(error = %e, "Auto-reload failed, continuing with current configuration");
299 }
300 }
301 });
302
303 let poll_manager = Arc::new(self.clone_for_task());
309 let poll_path = self.config_path.clone();
310 tokio::spawn(async move {
311 use std::io::Read;
312 let content_sig = |p: &std::path::Path| -> Option<(u64, Vec<u8>)> {
313 let mut f = std::fs::File::open(p).ok()?;
314 let meta = f.metadata().ok()?;
315 let len = meta.len();
316 let mut buf = vec![0u8; 256.min(len as usize)];
318 f.read_exact(&mut buf).ok()?;
319 Some((len, buf))
320 };
321 let mut last_sig = content_sig(&poll_path);
322
323 loop {
324 tokio::time::sleep(Duration::from_secs(1)).await;
325
326 let current_sig = content_sig(&poll_path);
327 if current_sig != last_sig {
328 last_sig = current_sig;
329 debug!("Config file content changed (poll fallback), triggering reload");
330 if let Err(e) = poll_manager.reload(ReloadTrigger::FileChange).await {
331 error!(error = %e, "Poll-triggered reload failed");
332 }
333 }
334 }
335 });
336
337 info!(
338 config_file = %self.config_path.display(),
339 "Auto-reload enabled: watching for configuration changes (with poll fallback)"
340 );
341 Ok(())
342 }
343
344 pub async fn reload(&self, trigger: ReloadTrigger) -> ZentinelResult<()> {
350 let _reload_guard = self.reload_mutex.lock().await;
352
353 let start = Instant::now();
354 let reload_num = self
355 .stats
356 .total_reloads
357 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
358 + 1;
359
360 info!(
361 trigger = ?trigger,
362 reload_num = reload_num,
363 config_path = %self.config_path.display(),
364 "Starting configuration reload"
365 );
366
367 let _ = self.reload_tx.send(ReloadEvent::Started {
369 timestamp: Instant::now(),
370 trigger: trigger.clone(),
371 });
372
373 trace!(
374 config_path = %self.config_path.display(),
375 "Reading configuration file"
376 );
377
378 let new_config = match Config::from_file(&self.config_path) {
380 Ok(config) => {
381 debug!(
382 route_count = config.routes.len(),
383 upstream_count = config.upstreams.len(),
384 listener_count = config.listeners.len(),
385 "Configuration file parsed successfully"
386 );
387 config
388 }
389 Err(e) => {
390 let error_msg = format!("Failed to load configuration: {}", e);
391 error!(
392 config_path = %self.config_path.display(),
393 error = %e,
394 "Failed to load configuration file"
395 );
396 self.stats
397 .failed_reloads
398 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
399 *self.stats.last_failure.write().await = Some(Instant::now());
400
401 let _ = self.reload_tx.send(ReloadEvent::Failed {
402 timestamp: Instant::now(),
403 error: error_msg.clone(),
404 });
405
406 return Err(ZentinelError::Config {
407 message: error_msg,
408 source: None,
409 });
410 }
411 };
412
413 trace!("Starting configuration validation");
414
415 if let Err(e) = self.validate_config(&new_config).await {
418 error!(
419 error = %e,
420 "Configuration validation failed - new configuration REJECTED"
421 );
422 self.stats
423 .failed_reloads
424 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
425 *self.stats.last_failure.write().await = Some(Instant::now());
426
427 let _ = self.reload_tx.send(ReloadEvent::Failed {
428 timestamp: Instant::now(),
429 error: e.to_string(),
430 });
431
432 return Err(e);
433 }
434
435 info!(
436 route_count = new_config.routes.len(),
437 upstream_count = new_config.upstreams.len(),
438 "Configuration validation passed, applying new configuration"
439 );
440
441 let _ = self.reload_tx.send(ReloadEvent::Validated {
442 timestamp: Instant::now(),
443 });
444
445 let old_config = self.current_config.load_full();
447
448 trace!(
449 old_routes = old_config.routes.len(),
450 new_routes = new_config.routes.len(),
451 "Preparing configuration swap"
452 );
453
454 let hooks = self.reload_hooks.read().await;
456 for hook in hooks.iter() {
457 trace!(hook_name = %hook.name(), "Running pre-reload hook");
458 if let Err(e) = hook.pre_reload(&old_config, &new_config).await {
459 warn!(
460 hook_name = %hook.name(),
461 error = %e,
462 "Pre-reload hook failed"
463 );
464 }
466 }
467 drop(hooks);
468
469 trace!("Saving previous configuration for potential rollback");
471 *self.previous_config.write().await = Some(old_config.clone());
472
473 trace!("Applying new configuration atomically");
475 self.current_config.store(Arc::new(new_config.clone()));
476
477 let hooks = self.reload_hooks.read().await;
479 for hook in hooks.iter() {
480 trace!(hook_name = %hook.name(), "Running post-reload hook");
481 hook.post_reload(&old_config, &new_config).await;
482 }
483 drop(hooks);
484
485 let duration = start.elapsed();
487 let successful_count = self
488 .stats
489 .successful_reloads
490 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
491 + 1;
492 *self.stats.last_success.write().await = Some(Instant::now());
493
494 {
496 let mut avg = self.stats.avg_duration_ms.write().await;
497 let total = successful_count as f64;
498 *avg = (*avg * (total - 1.0) + duration.as_millis() as f64) / total;
499 }
500
501 let new_version = self
503 .stats
504 .config_version
505 .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
506 + 1;
507
508 let _ = self.reload_tx.send(ReloadEvent::Applied {
509 timestamp: Instant::now(),
510 version: format!("v{}", new_version),
511 });
512
513 let (cert_success, cert_errors) = self.cert_reloader.reload_all();
516 if !cert_errors.is_empty() {
517 for (listener_id, error) in &cert_errors {
518 error!(
519 listener_id = %listener_id,
520 error = %error,
521 "TLS certificate reload failed for listener"
522 );
523 }
524 }
525
526 info!(
527 duration_ms = duration.as_millis(),
528 successful_reloads = successful_count,
529 route_count = new_config.routes.len(),
530 upstream_count = new_config.upstreams.len(),
531 cert_reload_success = cert_success,
532 cert_reload_errors = cert_errors.len(),
533 "Configuration reload completed successfully"
534 );
535
536 Ok(())
537 }
538
539 pub async fn apply_config(
547 &self,
548 new_config: Config,
549 trigger: ReloadTrigger,
550 ) -> ZentinelResult<()> {
551 let _reload_guard = self.reload_mutex.lock().await;
553
554 let start = Instant::now();
555 let reload_num = self
556 .stats
557 .total_reloads
558 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
559 + 1;
560
561 info!(
562 trigger = ?trigger,
563 reload_num = reload_num,
564 routes = new_config.routes.len(),
565 upstreams = new_config.upstreams.len(),
566 listeners = new_config.listeners.len(),
567 "Applying programmatic configuration"
568 );
569
570 let _ = self.reload_tx.send(ReloadEvent::Started {
571 timestamp: Instant::now(),
572 trigger,
573 });
574
575 if let Err(e) = self.validate_config(&new_config).await {
577 error!(error = %e, "Programmatic configuration validation failed");
578 self.stats
579 .failed_reloads
580 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
581 *self.stats.last_failure.write().await = Some(Instant::now());
582
583 let _ = self.reload_tx.send(ReloadEvent::Failed {
584 timestamp: Instant::now(),
585 error: e.to_string(),
586 });
587
588 return Err(e);
589 }
590
591 let _ = self.reload_tx.send(ReloadEvent::Validated {
592 timestamp: Instant::now(),
593 });
594
595 let old_config = self.current_config.load_full();
597
598 let hooks = self.reload_hooks.read().await;
600 for hook in hooks.iter() {
601 if let Err(e) = hook.pre_reload(&old_config, &new_config).await {
602 warn!(hook_name = %hook.name(), error = %e, "Pre-reload hook failed");
603 }
604 }
605 drop(hooks);
606
607 *self.previous_config.write().await = Some(old_config.clone());
609
610 self.current_config.store(Arc::new(new_config.clone()));
612
613 let hooks = self.reload_hooks.read().await;
615 for hook in hooks.iter() {
616 hook.post_reload(&old_config, &new_config).await;
617 }
618 drop(hooks);
619
620 let duration = start.elapsed();
622 let successful_count = self
623 .stats
624 .successful_reloads
625 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
626 + 1;
627 *self.stats.last_success.write().await = Some(Instant::now());
628
629 {
630 let mut avg = self.stats.avg_duration_ms.write().await;
631 let total = successful_count as f64;
632 *avg = (*avg * (total - 1.0) + duration.as_millis() as f64) / total;
633 }
634
635 let new_version = self
636 .stats
637 .config_version
638 .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
639 + 1;
640
641 let _ = self.reload_tx.send(ReloadEvent::Applied {
642 timestamp: Instant::now(),
643 version: format!("v{}", new_version),
644 });
645
646 let (cert_success, cert_errors) = self.cert_reloader.reload_all();
648 if !cert_errors.is_empty() {
649 for (listener_id, error) in &cert_errors {
650 error!(
651 listener_id = %listener_id,
652 error = %error,
653 "TLS certificate reload failed for listener"
654 );
655 }
656 }
657
658 info!(
659 duration_ms = duration.as_millis(),
660 successful_reloads = successful_count,
661 route_count = new_config.routes.len(),
662 upstream_count = new_config.upstreams.len(),
663 cert_reload_success = cert_success,
664 cert_reload_errors = cert_errors.len(),
665 "Programmatic configuration applied successfully"
666 );
667
668 Ok(())
669 }
670
671 pub fn config_store(&self) -> Arc<ArcSwap<Config>> {
675 Arc::clone(&self.current_config)
676 }
677
678 pub async fn rollback(&self, reason: String) -> ZentinelResult<()> {
680 info!(
681 reason = %reason,
682 "Starting configuration rollback"
683 );
684
685 let previous = self.previous_config.read().await.clone();
686
687 if let Some(prev_config) = previous {
688 trace!(
689 route_count = prev_config.routes.len(),
690 "Found previous configuration for rollback"
691 );
692
693 trace!("Validating previous configuration");
695 if let Err(e) = self.validate_config(&prev_config).await {
696 error!(
697 error = %e,
698 "Previous configuration validation failed during rollback"
699 );
700 return Err(e);
701 }
702
703 trace!("Applying previous configuration");
705 self.current_config.store(prev_config.clone());
706 let rollback_count = self
707 .stats
708 .rollbacks
709 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
710 + 1;
711
712 let _ = self.reload_tx.send(ReloadEvent::RolledBack {
713 timestamp: Instant::now(),
714 reason: reason.clone(),
715 });
716
717 info!(
718 reason = %reason,
719 rollback_count = rollback_count,
720 route_count = prev_config.routes.len(),
721 "Configuration rolled back successfully"
722 );
723 Ok(())
724 } else {
725 warn!("No previous configuration available for rollback");
726 Err(ZentinelError::Config {
727 message: "No previous configuration available".to_string(),
728 source: None,
729 })
730 }
731 }
732
733 async fn validate_config(&self, config: &Config) -> ZentinelResult<()> {
735 trace!(
736 route_count = config.routes.len(),
737 upstream_count = config.upstreams.len(),
738 "Starting configuration validation"
739 );
740
741 trace!("Running built-in config validation");
743 config.validate()?;
744
745 let validators = self.validators.read().await;
747 trace!(
748 validator_count = validators.len(),
749 "Running custom validators"
750 );
751 for validator in validators.iter() {
752 trace!(validator_name = %validator.name(), "Running validator");
753 validator.validate(config).await.map_err(|e| {
754 error!(
755 validator_name = %validator.name(),
756 error = %e,
757 "Validator failed"
758 );
759 e
760 })?;
761 }
762
763 debug!(
764 route_count = config.routes.len(),
765 upstream_count = config.upstreams.len(),
766 "Configuration validation passed"
767 );
768
769 Ok(())
770 }
771
772 pub async fn add_validator(&self, validator: Box<dyn ConfigValidator>) {
774 info!("Adding configuration validator: {}", validator.name());
775 self.validators.write().await.push(validator);
776 }
777
778 pub async fn add_hook(&self, hook: Box<dyn ReloadHook>) {
780 info!("Adding reload hook: {}", hook.name());
781 self.reload_hooks.write().await.push(hook);
782 }
783
784 pub fn subscribe(&self) -> broadcast::Receiver<ReloadEvent> {
786 self.reload_tx.subscribe()
787 }
788
789 pub fn stats(&self) -> &ReloadStats {
791 &self.stats
792 }
793
794 fn clone_for_task(&self) -> ConfigManager {
796 ConfigManager {
797 current_config: Arc::clone(&self.current_config),
798 previous_config: Arc::clone(&self.previous_config),
799 config_path: self.config_path.clone(),
800 watcher: self.watcher.clone(),
801 reload_tx: self.reload_tx.clone(),
802 stats: Arc::clone(&self.stats),
803 validators: Arc::clone(&self.validators),
804 reload_hooks: Arc::clone(&self.reload_hooks),
805 cert_reloader: Arc::clone(&self.cert_reloader),
806 reload_mutex: Arc::clone(&self.reload_mutex),
807 }
808 }
809}
810
811pub struct AuditReloadHook {
817 log_manager: SharedLogManager,
818}
819
820impl AuditReloadHook {
821 pub fn new(log_manager: SharedLogManager) -> Self {
823 Self { log_manager }
824 }
825}
826
827#[async_trait::async_trait]
828impl ReloadHook for AuditReloadHook {
829 async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> ZentinelResult<()> {
830 let trace_id = uuid::Uuid::new_v4().to_string();
832 let audit_entry = AuditLogEntry::config_change(
833 &trace_id,
834 "reload_started",
835 format!(
836 "Configuration reload starting: {} routes -> {} routes, {} upstreams -> {} upstreams",
837 old_config.routes.len(),
838 new_config.routes.len(),
839 old_config.upstreams.len(),
840 new_config.upstreams.len()
841 ),
842 );
843 self.log_manager.log_audit(&audit_entry);
844 Ok(())
845 }
846
847 async fn post_reload(&self, old_config: &Config, new_config: &Config) {
848 let trace_id = uuid::Uuid::new_v4().to_string();
850 let audit_entry = AuditLogEntry::config_change(
851 &trace_id,
852 "reload_success",
853 format!(
854 "Configuration reload successful: {} routes, {} upstreams, {} listeners",
855 new_config.routes.len(),
856 new_config.upstreams.len(),
857 new_config.listeners.len()
858 ),
859 )
860 .with_metadata("old_routes", old_config.routes.len().to_string())
861 .with_metadata("new_routes", new_config.routes.len().to_string())
862 .with_metadata("old_upstreams", old_config.upstreams.len().to_string())
863 .with_metadata("new_upstreams", new_config.upstreams.len().to_string());
864 self.log_manager.log_audit(&audit_entry);
865 }
866
867 async fn on_failure(&self, config: &Config, error: &ZentinelError) {
868 let trace_id = uuid::Uuid::new_v4().to_string();
870 let audit_entry = AuditLogEntry::config_change(
871 &trace_id,
872 "reload_failed",
873 format!("Configuration reload failed: {}", error),
874 )
875 .with_metadata("current_routes", config.routes.len().to_string())
876 .with_metadata("current_upstreams", config.upstreams.len().to_string());
877 self.log_manager.log_audit(&audit_entry);
878 }
879
880 fn name(&self) -> &str {
881 "audit_reload_hook"
882 }
883}
884
885#[cfg(test)]
886mod tests {
887 use super::*;
888
889 #[tokio::test]
890 async fn test_config_reload_rejects_invalid_config() {
891 let initial_config = Config::default_for_testing();
893 let initial_routes = initial_config.routes.len();
894
895 let temp_dir = tempfile::tempdir().unwrap();
896 let config_path = temp_dir.path().join("config.kdl");
897
898 std::fs::write(&config_path, "this is not valid KDL { {{{{ broken").unwrap();
900
901 let manager = ConfigManager::new(&config_path, initial_config)
903 .await
904 .unwrap();
905
906 assert_eq!(manager.current().routes.len(), initial_routes);
908
909 let result = manager.reload(ReloadTrigger::Manual).await;
911 assert!(result.is_err(), "Reload should fail for invalid config");
912
913 assert_eq!(
915 manager.current().routes.len(),
916 initial_routes,
917 "Original config should be preserved after failed reload"
918 );
919
920 assert_eq!(
922 manager
923 .stats()
924 .failed_reloads
925 .load(std::sync::atomic::Ordering::Relaxed),
926 1,
927 "Failed reload should be recorded"
928 );
929 }
930
931 #[tokio::test]
932 async fn test_config_reload_accepts_valid_config() {
933 let initial_config = Config::default_for_testing();
935 let temp_dir = tempfile::tempdir().unwrap();
936 let config_path = temp_dir.path().join("config.kdl");
937
938 let static_dir = temp_dir.path().join("static");
940 std::fs::create_dir_all(&static_dir).unwrap();
941
942 let valid_config = r#"
944server {
945 worker-threads 4
946}
947
948listeners {
949 listener "http" {
950 address "0.0.0.0:8080"
951 protocol "http"
952 }
953}
954
955upstreams {
956 upstream "backend" {
957 target "127.0.0.1:3000"
958 }
959}
960
961routes {
962 route "api" {
963 priority "high"
964 matches {
965 path-prefix "/api/"
966 }
967 upstream "backend"
968 }
969}
970"#;
971 std::fs::write(&config_path, valid_config).unwrap();
972
973 let manager = ConfigManager::new(&config_path, initial_config)
975 .await
976 .unwrap();
977
978 let result = manager.reload(ReloadTrigger::Manual).await;
980 assert!(
981 result.is_ok(),
982 "Reload should succeed for valid config: {:?}",
983 result.err()
984 );
985
986 assert_eq!(
988 manager
989 .stats()
990 .successful_reloads
991 .load(std::sync::atomic::Ordering::Relaxed),
992 1,
993 "Successful reload should be recorded"
994 );
995 }
996
997 fn write_config_with_routes(path: &Path, route_count: usize) {
1003 let mut routes = String::new();
1004 for i in 0..route_count {
1005 routes.push_str(&format!(
1006 r#"
1007 route "route{i}" {{
1008 priority "medium"
1009 matches {{
1010 path-prefix "/route{i}/"
1011 }}
1012 upstream "backend"
1013 }}
1014"#
1015 ));
1016 }
1017
1018 let config = format!(
1019 r#"
1020server {{
1021 worker-threads 4
1022}}
1023
1024listeners {{
1025 listener "http" {{
1026 address "0.0.0.0:8080"
1027 protocol "http"
1028 }}
1029}}
1030
1031upstreams {{
1032 upstream "backend" {{
1033 target "127.0.0.1:3000"
1034 }}
1035}}
1036
1037routes {{
1038{routes}
1039}}
1040"#
1041 );
1042
1043 std::fs::write(path, config).unwrap();
1044 }
1045
1046 #[tokio::test]
1047 async fn test_concurrent_config_reads_during_reload() {
1048 let initial_config = Config::default_for_testing();
1050 let temp_dir = tempfile::tempdir().unwrap();
1051 let config_path = temp_dir.path().join("config.kdl");
1052
1053 write_config_with_routes(&config_path, 5);
1054
1055 let manager = Arc::new(
1056 ConfigManager::new(&config_path, initial_config)
1057 .await
1058 .unwrap(),
1059 );
1060
1061 let mut readers = Vec::new();
1063 for _ in 0..10 {
1064 let manager_clone = Arc::clone(&manager);
1065 readers.push(tokio::spawn(async move {
1066 let mut read_count = 0;
1067 for _ in 0..100 {
1068 let config = manager_clone.current();
1069 let _ = config.routes.len();
1071 read_count += 1;
1072 tokio::task::yield_now().await;
1073 }
1074 read_count
1075 }));
1076 }
1077
1078 let manager_reload = Arc::clone(&manager);
1080 let reload_handle =
1081 tokio::spawn(async move { manager_reload.reload(ReloadTrigger::Manual).await });
1082
1083 let mut total_reads = 0;
1085 for reader in readers {
1086 total_reads += reader.await.unwrap();
1087 }
1088
1089 let reload_result = reload_handle.await.unwrap();
1090 assert!(reload_result.is_ok(), "Reload should succeed");
1091 assert_eq!(total_reads, 1000, "All reads should complete");
1092 }
1093
1094 #[tokio::test]
1095 async fn test_multiple_concurrent_reloads() {
1096 let initial_config = Config::default_for_testing();
1098 let temp_dir = tempfile::tempdir().unwrap();
1099 let config_path = temp_dir.path().join("config.kdl");
1100
1101 write_config_with_routes(&config_path, 3);
1102
1103 let manager = Arc::new(
1104 ConfigManager::new(&config_path, initial_config)
1105 .await
1106 .unwrap(),
1107 );
1108
1109 let mut reload_handles = Vec::new();
1111 for i in 0..5 {
1112 let manager_clone = Arc::clone(&manager);
1113 let trigger = if i % 2 == 0 {
1114 ReloadTrigger::Manual
1115 } else {
1116 ReloadTrigger::Signal
1117 };
1118 reload_handles.push(tokio::spawn(
1119 async move { manager_clone.reload(trigger).await },
1120 ));
1121 }
1122
1123 let mut success_count = 0;
1125 for handle in reload_handles {
1126 if handle.await.unwrap().is_ok() {
1127 success_count += 1;
1128 }
1129 }
1130
1131 assert!(success_count >= 1, "At least one reload should succeed");
1133
1134 let total = manager
1136 .stats()
1137 .total_reloads
1138 .load(std::sync::atomic::Ordering::Relaxed);
1139 assert_eq!(total, 5, "All reload attempts should be counted");
1140 }
1141
1142 #[tokio::test]
1143 async fn test_config_visibility_after_reload() {
1144 let initial_config = Config::default_for_testing();
1146 let initial_route_count = initial_config.routes.len();
1147
1148 let temp_dir = tempfile::tempdir().unwrap();
1149 let config_path = temp_dir.path().join("config.kdl");
1150
1151 write_config_with_routes(&config_path, 2);
1153
1154 let manager = ConfigManager::new(&config_path, initial_config)
1155 .await
1156 .unwrap();
1157
1158 assert_eq!(manager.current().routes.len(), initial_route_count);
1160
1161 manager.reload(ReloadTrigger::Manual).await.unwrap();
1163 assert_eq!(manager.current().routes.len(), 2);
1164
1165 write_config_with_routes(&config_path, 5);
1167 manager.reload(ReloadTrigger::Manual).await.unwrap();
1168 assert_eq!(
1169 manager.current().routes.len(),
1170 5,
1171 "New config should be visible immediately after reload"
1172 );
1173
1174 write_config_with_routes(&config_path, 1);
1176 manager.reload(ReloadTrigger::Manual).await.unwrap();
1177 assert_eq!(
1178 manager.current().routes.len(),
1179 1,
1180 "Config changes should be visible after each reload"
1181 );
1182 }
1183
1184 #[tokio::test]
1185 async fn test_rapid_successive_reloads() {
1186 let initial_config = Config::default_for_testing();
1188 let temp_dir = tempfile::tempdir().unwrap();
1189 let config_path = temp_dir.path().join("config.kdl");
1190
1191 write_config_with_routes(&config_path, 3);
1192
1193 let manager = ConfigManager::new(&config_path, initial_config)
1194 .await
1195 .unwrap();
1196
1197 for i in 0..20 {
1199 write_config_with_routes(&config_path, (i % 5) + 1);
1201 let result = manager.reload(ReloadTrigger::Manual).await;
1202 assert!(result.is_ok(), "Reload {} should succeed", i);
1203 }
1204
1205 let stats = manager.stats();
1207 assert_eq!(
1208 stats
1209 .successful_reloads
1210 .load(std::sync::atomic::Ordering::Relaxed),
1211 20,
1212 "All 20 reloads should succeed"
1213 );
1214 assert_eq!(
1215 stats
1216 .failed_reloads
1217 .load(std::sync::atomic::Ordering::Relaxed),
1218 0,
1219 "No reloads should fail"
1220 );
1221 }
1222
1223 #[tokio::test]
1224 async fn test_rollback_preserves_previous_config() {
1225 let initial_config = Config::default_for_testing();
1227 let temp_dir = tempfile::tempdir().unwrap();
1228 let config_path = temp_dir.path().join("config.kdl");
1229
1230 write_config_with_routes(&config_path, 3);
1232
1233 let manager = ConfigManager::new(&config_path, initial_config)
1234 .await
1235 .unwrap();
1236
1237 manager.reload(ReloadTrigger::Manual).await.unwrap();
1239 assert_eq!(manager.current().routes.len(), 3);
1240
1241 write_config_with_routes(&config_path, 5);
1243 manager.reload(ReloadTrigger::Manual).await.unwrap();
1244 assert_eq!(manager.current().routes.len(), 5);
1245
1246 manager
1248 .rollback("Testing rollback".to_string())
1249 .await
1250 .unwrap();
1251 assert_eq!(
1252 manager.current().routes.len(),
1253 3,
1254 "Rollback should restore previous config"
1255 );
1256
1257 assert_eq!(
1259 manager
1260 .stats()
1261 .rollbacks
1262 .load(std::sync::atomic::Ordering::Relaxed),
1263 1,
1264 "Rollback should be recorded in stats"
1265 );
1266 }
1267
1268 #[tokio::test]
1269 async fn test_reload_events_broadcast() {
1270 let initial_config = Config::default_for_testing();
1272 let temp_dir = tempfile::tempdir().unwrap();
1273 let config_path = temp_dir.path().join("config.kdl");
1274
1275 write_config_with_routes(&config_path, 2);
1276
1277 let manager = ConfigManager::new(&config_path, initial_config)
1278 .await
1279 .unwrap();
1280
1281 let mut receiver = manager.subscribe();
1283
1284 manager.reload(ReloadTrigger::Manual).await.unwrap();
1286
1287 let mut events = Vec::new();
1289 while let Ok(Ok(event)) =
1290 tokio::time::timeout(Duration::from_millis(100), receiver.recv()).await
1291 {
1292 events.push(event);
1293 }
1294
1295 assert!(
1297 events.len() >= 2,
1298 "Should receive at least Started and Applied/Validated events"
1299 );
1300
1301 assert!(
1303 events
1304 .iter()
1305 .any(|e| matches!(e, ReloadEvent::Started { .. })),
1306 "Should receive Started event"
1307 );
1308
1309 assert!(
1311 events
1312 .iter()
1313 .any(|e| matches!(e, ReloadEvent::Applied { .. })),
1314 "Should receive Applied event on success"
1315 );
1316 }
1317
1318 #[tokio::test]
1319 async fn test_graceful_coordinator_with_reload() {
1320 let coordinator = GracefulReloadCoordinator::new(Duration::from_secs(5));
1322
1323 coordinator.inc_requests();
1325 coordinator.inc_requests();
1326 coordinator.inc_requests();
1327 assert_eq!(coordinator.active_count(), 3);
1328
1329 coordinator.dec_requests();
1331 assert_eq!(coordinator.active_count(), 2);
1332
1333 let coord_clone = Arc::new(coordinator);
1335 let coord_for_drain = Arc::clone(&coord_clone);
1336 let drain_handle = tokio::spawn(async move { coord_for_drain.wait_for_drain().await });
1337
1338 tokio::time::sleep(Duration::from_millis(50)).await;
1340 coord_clone.dec_requests();
1341 tokio::time::sleep(Duration::from_millis(50)).await;
1342 coord_clone.dec_requests();
1343
1344 let drained = drain_handle.await.unwrap();
1346 assert!(drained, "All requests should drain successfully");
1347 }
1348
1349 #[tokio::test]
1350 async fn test_graceful_coordinator_drain_timeout() {
1351 let coordinator = GracefulReloadCoordinator::new(Duration::from_millis(200));
1353
1354 coordinator.inc_requests();
1356 coordinator.inc_requests();
1357
1358 let drained = coordinator.wait_for_drain().await;
1360 assert!(!drained, "Drain should timeout with stuck requests");
1361 assert_eq!(
1362 coordinator.active_count(),
1363 2,
1364 "Requests should still be tracked"
1365 );
1366 }
1367}