1mod coordinator;
13mod signals;
14mod validators;
15
16pub use coordinator::GracefulReloadCoordinator;
17pub use signals::{SignalManager, SignalType};
18pub use validators::{RouteValidator, UpstreamValidator};
19
20use arc_swap::ArcSwap;
23use notify::{Event, EventKind, RecursiveMode, Watcher};
24use std::path::{Path, PathBuf};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tokio::sync::{broadcast, RwLock};
28use tracing::{debug, error, info, trace, warn};
29
30use sentinel_common::errors::{SentinelError, SentinelResult};
31use sentinel_config::Config;
32
33use crate::logging::{AuditLogEntry, SharedLogManager};
34use crate::tls::CertificateReloader;
35
36#[derive(Debug, Clone)]
42pub enum ReloadEvent {
43 Started {
45 timestamp: Instant,
46 trigger: ReloadTrigger,
47 },
48 Validated { timestamp: Instant },
50 Applied { timestamp: Instant, version: String },
52 Failed { timestamp: Instant, error: String },
54 RolledBack { timestamp: Instant, reason: String },
56}
57
58#[derive(Debug, Clone)]
60pub enum ReloadTrigger {
61 Manual,
63 FileChange,
65 Signal,
67 Scheduled,
69}
70
71#[async_trait::async_trait]
77pub trait ConfigValidator: Send + Sync {
78 async fn validate(&self, config: &Config) -> SentinelResult<()>;
80
81 fn name(&self) -> &str;
83}
84
85#[async_trait::async_trait]
87pub trait ReloadHook: Send + Sync {
88 async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> SentinelResult<()>;
90
91 async fn post_reload(&self, old_config: &Config, new_config: &Config);
93
94 async fn on_failure(&self, config: &Config, error: &SentinelError);
96
97 fn name(&self) -> &str;
99}
100
101#[derive(Default)]
107pub struct ReloadStats {
108 pub total_reloads: std::sync::atomic::AtomicU64,
110 pub successful_reloads: std::sync::atomic::AtomicU64,
112 pub failed_reloads: std::sync::atomic::AtomicU64,
114 pub rollbacks: std::sync::atomic::AtomicU64,
116 pub config_version: std::sync::atomic::AtomicU64,
118 pub last_success: RwLock<Option<Instant>>,
120 pub last_failure: RwLock<Option<Instant>>,
122 pub avg_duration_ms: RwLock<f64>,
124}
125
126pub struct ConfigManager {
132 current_config: Arc<ArcSwap<Config>>,
134 previous_config: Arc<RwLock<Option<Arc<Config>>>>,
136 config_path: PathBuf,
138 watcher: Arc<RwLock<Option<notify::RecommendedWatcher>>>,
140 reload_tx: broadcast::Sender<ReloadEvent>,
142 stats: Arc<ReloadStats>,
144 validators: Arc<RwLock<Vec<Box<dyn ConfigValidator>>>>,
146 reload_hooks: Arc<RwLock<Vec<Box<dyn ReloadHook>>>>,
148 cert_reloader: Arc<CertificateReloader>,
150}
151
152impl ConfigManager {
153 pub async fn new(
155 config_path: impl AsRef<Path>,
156 initial_config: Config,
157 ) -> SentinelResult<Self> {
158 let config_path = config_path.as_ref().to_path_buf();
159 let (reload_tx, _) = broadcast::channel(100);
160
161 info!(
162 config_path = %config_path.display(),
163 route_count = initial_config.routes.len(),
164 upstream_count = initial_config.upstreams.len(),
165 listener_count = initial_config.listeners.len(),
166 "Initializing configuration manager"
167 );
168
169 trace!(
170 config_path = %config_path.display(),
171 "Creating ArcSwap for configuration"
172 );
173
174 Ok(Self {
175 current_config: Arc::new(ArcSwap::from_pointee(initial_config)),
176 previous_config: Arc::new(RwLock::new(None)),
177 config_path,
178 watcher: Arc::new(RwLock::new(None)),
179 reload_tx,
180 stats: Arc::new(ReloadStats::default()),
181 validators: Arc::new(RwLock::new(Vec::new())),
182 reload_hooks: Arc::new(RwLock::new(Vec::new())),
183 cert_reloader: Arc::new(CertificateReloader::new()),
184 })
185 }
186
187 pub fn cert_reloader(&self) -> Arc<CertificateReloader> {
189 Arc::clone(&self.cert_reloader)
190 }
191
192 pub fn current(&self) -> Arc<Config> {
194 self.current_config.load_full()
195 }
196
197 pub async fn start_watching(&self) -> SentinelResult<()> {
202 if self.watcher.read().await.is_some() {
204 warn!("File watcher already active, skipping");
205 return Ok(());
206 }
207
208 let config_path = self.config_path.clone();
209
210 let (tx, mut rx) = tokio::sync::mpsc::channel(10);
212
213 let mut watcher =
214 notify::recommended_watcher(move |event: Result<Event, notify::Error>| {
215 if let Ok(event) = event {
216 let _ = tx.blocking_send(event);
217 }
218 })
219 .map_err(|e| SentinelError::Config {
220 message: format!("Failed to create file watcher: {}", e),
221 source: None,
222 })?;
223
224 watcher
226 .watch(&config_path, RecursiveMode::NonRecursive)
227 .map_err(|e| SentinelError::Config {
228 message: format!("Failed to watch config file: {}", e),
229 source: None,
230 })?;
231
232 *self.watcher.write().await = Some(watcher);
234
235 let manager = Arc::new(self.clone_for_task());
237 tokio::spawn(async move {
238 while let Some(event) = rx.recv().await {
239 if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_)) {
240 info!("Configuration file changed, triggering reload");
241
242 tokio::time::sleep(Duration::from_millis(100)).await;
244
245 if let Err(e) = manager.reload(ReloadTrigger::FileChange).await {
246 error!("Auto-reload failed: {}", e);
247 error!("Continuing with current configuration");
248 }
249 }
250 }
251 });
252
253 info!(
254 "Auto-reload enabled: watching configuration file {:?}",
255 self.config_path
256 );
257 Ok(())
258 }
259
260 pub async fn reload(&self, trigger: ReloadTrigger) -> SentinelResult<()> {
262 let start = Instant::now();
263 let reload_num = self
264 .stats
265 .total_reloads
266 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
267 + 1;
268
269 info!(
270 trigger = ?trigger,
271 reload_num = reload_num,
272 config_path = %self.config_path.display(),
273 "Starting configuration reload"
274 );
275
276 let _ = self.reload_tx.send(ReloadEvent::Started {
278 timestamp: Instant::now(),
279 trigger: trigger.clone(),
280 });
281
282 trace!(
283 config_path = %self.config_path.display(),
284 "Reading configuration file"
285 );
286
287 let new_config = match Config::from_file(&self.config_path) {
289 Ok(config) => {
290 debug!(
291 route_count = config.routes.len(),
292 upstream_count = config.upstreams.len(),
293 listener_count = config.listeners.len(),
294 "Configuration file parsed successfully"
295 );
296 config
297 }
298 Err(e) => {
299 let error_msg = format!("Failed to load configuration: {}", e);
300 error!(
301 config_path = %self.config_path.display(),
302 error = %e,
303 "Failed to load configuration file"
304 );
305 self.stats
306 .failed_reloads
307 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
308 *self.stats.last_failure.write().await = Some(Instant::now());
309
310 let _ = self.reload_tx.send(ReloadEvent::Failed {
311 timestamp: Instant::now(),
312 error: error_msg.clone(),
313 });
314
315 return Err(SentinelError::Config {
316 message: error_msg,
317 source: None,
318 });
319 }
320 };
321
322 trace!("Starting configuration validation");
323
324 if let Err(e) = self.validate_config(&new_config).await {
327 error!(
328 error = %e,
329 "Configuration validation failed - new configuration REJECTED"
330 );
331 self.stats
332 .failed_reloads
333 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
334 *self.stats.last_failure.write().await = Some(Instant::now());
335
336 let _ = self.reload_tx.send(ReloadEvent::Failed {
337 timestamp: Instant::now(),
338 error: e.to_string(),
339 });
340
341 return Err(e);
342 }
343
344 info!(
345 route_count = new_config.routes.len(),
346 upstream_count = new_config.upstreams.len(),
347 "Configuration validation passed, applying new configuration"
348 );
349
350 let _ = self.reload_tx.send(ReloadEvent::Validated {
351 timestamp: Instant::now(),
352 });
353
354 let old_config = self.current_config.load_full();
356
357 trace!(
358 old_routes = old_config.routes.len(),
359 new_routes = new_config.routes.len(),
360 "Preparing configuration swap"
361 );
362
363 let hooks = self.reload_hooks.read().await;
365 for hook in hooks.iter() {
366 trace!(hook_name = %hook.name(), "Running pre-reload hook");
367 if let Err(e) = hook.pre_reload(&old_config, &new_config).await {
368 warn!(
369 hook_name = %hook.name(),
370 error = %e,
371 "Pre-reload hook failed"
372 );
373 }
375 }
376 drop(hooks);
377
378 trace!("Saving previous configuration for potential rollback");
380 *self.previous_config.write().await = Some(old_config.clone());
381
382 trace!("Applying new configuration atomically");
384 self.current_config.store(Arc::new(new_config.clone()));
385
386 let hooks = self.reload_hooks.read().await;
388 for hook in hooks.iter() {
389 trace!(hook_name = %hook.name(), "Running post-reload hook");
390 hook.post_reload(&old_config, &new_config).await;
391 }
392 drop(hooks);
393
394 let duration = start.elapsed();
396 let successful_count = self
397 .stats
398 .successful_reloads
399 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
400 + 1;
401 *self.stats.last_success.write().await = Some(Instant::now());
402
403 {
405 let mut avg = self.stats.avg_duration_ms.write().await;
406 let total = successful_count as f64;
407 *avg = (*avg * (total - 1.0) + duration.as_millis() as f64) / total;
408 }
409
410 let new_version = self
412 .stats
413 .config_version
414 .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
415 + 1;
416
417 let _ = self.reload_tx.send(ReloadEvent::Applied {
418 timestamp: Instant::now(),
419 version: format!("v{}", new_version),
420 });
421
422 let (cert_success, cert_errors) = self.cert_reloader.reload_all();
425 if !cert_errors.is_empty() {
426 for (listener_id, error) in &cert_errors {
427 error!(
428 listener_id = %listener_id,
429 error = %error,
430 "TLS certificate reload failed for listener"
431 );
432 }
433 }
434
435 info!(
436 duration_ms = duration.as_millis(),
437 successful_reloads = successful_count,
438 route_count = new_config.routes.len(),
439 upstream_count = new_config.upstreams.len(),
440 cert_reload_success = cert_success,
441 cert_reload_errors = cert_errors.len(),
442 "Configuration reload completed successfully"
443 );
444
445 Ok(())
446 }
447
448 pub async fn rollback(&self, reason: String) -> SentinelResult<()> {
450 info!(
451 reason = %reason,
452 "Starting configuration rollback"
453 );
454
455 let previous = self.previous_config.read().await.clone();
456
457 if let Some(prev_config) = previous {
458 trace!(
459 route_count = prev_config.routes.len(),
460 "Found previous configuration for rollback"
461 );
462
463 trace!("Validating previous configuration");
465 if let Err(e) = self.validate_config(&prev_config).await {
466 error!(
467 error = %e,
468 "Previous configuration validation failed during rollback"
469 );
470 return Err(e);
471 }
472
473 trace!("Applying previous configuration");
475 self.current_config.store(prev_config.clone());
476 let rollback_count = self
477 .stats
478 .rollbacks
479 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
480 + 1;
481
482 let _ = self.reload_tx.send(ReloadEvent::RolledBack {
483 timestamp: Instant::now(),
484 reason: reason.clone(),
485 });
486
487 info!(
488 reason = %reason,
489 rollback_count = rollback_count,
490 route_count = prev_config.routes.len(),
491 "Configuration rolled back successfully"
492 );
493 Ok(())
494 } else {
495 warn!("No previous configuration available for rollback");
496 Err(SentinelError::Config {
497 message: "No previous configuration available".to_string(),
498 source: None,
499 })
500 }
501 }
502
503 async fn validate_config(&self, config: &Config) -> SentinelResult<()> {
505 trace!(
506 route_count = config.routes.len(),
507 upstream_count = config.upstreams.len(),
508 "Starting configuration validation"
509 );
510
511 trace!("Running built-in config validation");
513 config.validate()?;
514
515 let validators = self.validators.read().await;
517 trace!(
518 validator_count = validators.len(),
519 "Running custom validators"
520 );
521 for validator in validators.iter() {
522 trace!(validator_name = %validator.name(), "Running validator");
523 validator.validate(config).await.map_err(|e| {
524 error!(
525 validator_name = %validator.name(),
526 error = %e,
527 "Validator failed"
528 );
529 e
530 })?;
531 }
532
533 debug!(
534 route_count = config.routes.len(),
535 upstream_count = config.upstreams.len(),
536 "Configuration validation passed"
537 );
538
539 Ok(())
540 }
541
542 pub async fn add_validator(&self, validator: Box<dyn ConfigValidator>) {
544 info!("Adding configuration validator: {}", validator.name());
545 self.validators.write().await.push(validator);
546 }
547
548 pub async fn add_hook(&self, hook: Box<dyn ReloadHook>) {
550 info!("Adding reload hook: {}", hook.name());
551 self.reload_hooks.write().await.push(hook);
552 }
553
554 pub fn subscribe(&self) -> broadcast::Receiver<ReloadEvent> {
556 self.reload_tx.subscribe()
557 }
558
559 pub fn stats(&self) -> &ReloadStats {
561 &self.stats
562 }
563
564 fn clone_for_task(&self) -> ConfigManager {
566 ConfigManager {
567 current_config: Arc::clone(&self.current_config),
568 previous_config: Arc::clone(&self.previous_config),
569 config_path: self.config_path.clone(),
570 watcher: self.watcher.clone(),
571 reload_tx: self.reload_tx.clone(),
572 stats: Arc::clone(&self.stats),
573 validators: Arc::clone(&self.validators),
574 reload_hooks: Arc::clone(&self.reload_hooks),
575 cert_reloader: Arc::clone(&self.cert_reloader),
576 }
577 }
578}
579
580pub struct AuditReloadHook {
586 log_manager: SharedLogManager,
587}
588
589impl AuditReloadHook {
590 pub fn new(log_manager: SharedLogManager) -> Self {
592 Self { log_manager }
593 }
594}
595
596#[async_trait::async_trait]
597impl ReloadHook for AuditReloadHook {
598 async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> SentinelResult<()> {
599 let trace_id = uuid::Uuid::new_v4().to_string();
601 let audit_entry = AuditLogEntry::config_change(
602 &trace_id,
603 "reload_started",
604 format!(
605 "Configuration reload starting: {} routes -> {} routes, {} upstreams -> {} upstreams",
606 old_config.routes.len(),
607 new_config.routes.len(),
608 old_config.upstreams.len(),
609 new_config.upstreams.len()
610 ),
611 );
612 self.log_manager.log_audit(&audit_entry);
613 Ok(())
614 }
615
616 async fn post_reload(&self, old_config: &Config, new_config: &Config) {
617 let trace_id = uuid::Uuid::new_v4().to_string();
619 let audit_entry = AuditLogEntry::config_change(
620 &trace_id,
621 "reload_success",
622 format!(
623 "Configuration reload successful: {} routes, {} upstreams, {} listeners",
624 new_config.routes.len(),
625 new_config.upstreams.len(),
626 new_config.listeners.len()
627 ),
628 )
629 .with_metadata("old_routes", old_config.routes.len().to_string())
630 .with_metadata("new_routes", new_config.routes.len().to_string())
631 .with_metadata("old_upstreams", old_config.upstreams.len().to_string())
632 .with_metadata("new_upstreams", new_config.upstreams.len().to_string());
633 self.log_manager.log_audit(&audit_entry);
634 }
635
636 async fn on_failure(&self, config: &Config, error: &SentinelError) {
637 let trace_id = uuid::Uuid::new_v4().to_string();
639 let audit_entry = AuditLogEntry::config_change(
640 &trace_id,
641 "reload_failed",
642 format!("Configuration reload failed: {}", error),
643 )
644 .with_metadata("current_routes", config.routes.len().to_string())
645 .with_metadata("current_upstreams", config.upstreams.len().to_string());
646 self.log_manager.log_audit(&audit_entry);
647 }
648
649 fn name(&self) -> &str {
650 "audit_reload_hook"
651 }
652}
653
654#[cfg(test)]
655mod tests {
656 use super::*;
657
658 #[tokio::test]
659 async fn test_config_reload_rejects_invalid_config() {
660 let initial_config = Config::default_for_testing();
662 let initial_routes = initial_config.routes.len();
663
664 let temp_dir = tempfile::tempdir().unwrap();
665 let config_path = temp_dir.path().join("config.kdl");
666
667 std::fs::write(&config_path, "this is not valid KDL { {{{{ broken").unwrap();
669
670 let manager = ConfigManager::new(&config_path, initial_config)
672 .await
673 .unwrap();
674
675 assert_eq!(manager.current().routes.len(), initial_routes);
677
678 let result = manager.reload(ReloadTrigger::Manual).await;
680 assert!(result.is_err(), "Reload should fail for invalid config");
681
682 assert_eq!(
684 manager.current().routes.len(),
685 initial_routes,
686 "Original config should be preserved after failed reload"
687 );
688
689 assert_eq!(
691 manager
692 .stats()
693 .failed_reloads
694 .load(std::sync::atomic::Ordering::Relaxed),
695 1,
696 "Failed reload should be recorded"
697 );
698 }
699
700 #[tokio::test]
701 async fn test_config_reload_accepts_valid_config() {
702 let initial_config = Config::default_for_testing();
704 let temp_dir = tempfile::tempdir().unwrap();
705 let config_path = temp_dir.path().join("config.kdl");
706
707 let static_dir = temp_dir.path().join("static");
709 std::fs::create_dir_all(&static_dir).unwrap();
710
711 let valid_config = r#"
713server {
714 worker-threads 4
715}
716
717listeners {
718 listener "http" {
719 address "0.0.0.0:8080"
720 protocol "http"
721 }
722}
723
724upstreams {
725 upstream "backend" {
726 target "127.0.0.1:3000"
727 }
728}
729
730routes {
731 route "api" {
732 priority "high"
733 matches {
734 path-prefix "/api/"
735 }
736 upstream "backend"
737 }
738}
739"#;
740 std::fs::write(&config_path, valid_config).unwrap();
741
742 let manager = ConfigManager::new(&config_path, initial_config)
744 .await
745 .unwrap();
746
747 let result = manager.reload(ReloadTrigger::Manual).await;
749 assert!(
750 result.is_ok(),
751 "Reload should succeed for valid config: {:?}",
752 result.err()
753 );
754
755 assert_eq!(
757 manager
758 .stats()
759 .successful_reloads
760 .load(std::sync::atomic::Ordering::Relaxed),
761 1,
762 "Successful reload should be recorded"
763 );
764 }
765
766 fn write_config_with_routes(path: &Path, route_count: usize) {
772 let mut routes = String::new();
773 for i in 0..route_count {
774 routes.push_str(&format!(
775 r#"
776 route "route{i}" {{
777 priority "medium"
778 matches {{
779 path-prefix "/route{i}/"
780 }}
781 upstream "backend"
782 }}
783"#
784 ));
785 }
786
787 let config = format!(
788 r#"
789server {{
790 worker-threads 4
791}}
792
793listeners {{
794 listener "http" {{
795 address "0.0.0.0:8080"
796 protocol "http"
797 }}
798}}
799
800upstreams {{
801 upstream "backend" {{
802 target "127.0.0.1:3000"
803 }}
804}}
805
806routes {{
807{routes}
808}}
809"#
810 );
811
812 std::fs::write(path, config).unwrap();
813 }
814
815 #[tokio::test]
816 async fn test_concurrent_config_reads_during_reload() {
817 let initial_config = Config::default_for_testing();
819 let temp_dir = tempfile::tempdir().unwrap();
820 let config_path = temp_dir.path().join("config.kdl");
821
822 write_config_with_routes(&config_path, 5);
823
824 let manager = Arc::new(
825 ConfigManager::new(&config_path, initial_config)
826 .await
827 .unwrap(),
828 );
829
830 let mut readers = Vec::new();
832 for _ in 0..10 {
833 let manager_clone = Arc::clone(&manager);
834 readers.push(tokio::spawn(async move {
835 let mut read_count = 0;
836 for _ in 0..100 {
837 let config = manager_clone.current();
838 let _ = config.routes.len();
840 read_count += 1;
841 tokio::task::yield_now().await;
842 }
843 read_count
844 }));
845 }
846
847 let manager_reload = Arc::clone(&manager);
849 let reload_handle = tokio::spawn(async move {
850 manager_reload.reload(ReloadTrigger::Manual).await
851 });
852
853 let mut total_reads = 0;
855 for reader in readers {
856 total_reads += reader.await.unwrap();
857 }
858
859 let reload_result = reload_handle.await.unwrap();
860 assert!(reload_result.is_ok(), "Reload should succeed");
861 assert_eq!(total_reads, 1000, "All reads should complete");
862 }
863
864 #[tokio::test]
865 async fn test_multiple_concurrent_reloads() {
866 let initial_config = Config::default_for_testing();
868 let temp_dir = tempfile::tempdir().unwrap();
869 let config_path = temp_dir.path().join("config.kdl");
870
871 write_config_with_routes(&config_path, 3);
872
873 let manager = Arc::new(
874 ConfigManager::new(&config_path, initial_config)
875 .await
876 .unwrap(),
877 );
878
879 let mut reload_handles = Vec::new();
881 for i in 0..5 {
882 let manager_clone = Arc::clone(&manager);
883 let trigger = if i % 2 == 0 {
884 ReloadTrigger::Manual
885 } else {
886 ReloadTrigger::Signal
887 };
888 reload_handles.push(tokio::spawn(async move {
889 manager_clone.reload(trigger).await
890 }));
891 }
892
893 let mut success_count = 0;
895 for handle in reload_handles {
896 if handle.await.unwrap().is_ok() {
897 success_count += 1;
898 }
899 }
900
901 assert!(success_count >= 1, "At least one reload should succeed");
903
904 let total = manager
906 .stats()
907 .total_reloads
908 .load(std::sync::atomic::Ordering::Relaxed);
909 assert_eq!(total, 5, "All reload attempts should be counted");
910 }
911
912 #[tokio::test]
913 async fn test_config_visibility_after_reload() {
914 let initial_config = Config::default_for_testing();
916 let initial_route_count = initial_config.routes.len();
917
918 let temp_dir = tempfile::tempdir().unwrap();
919 let config_path = temp_dir.path().join("config.kdl");
920
921 write_config_with_routes(&config_path, 2);
923
924 let manager = ConfigManager::new(&config_path, initial_config)
925 .await
926 .unwrap();
927
928 assert_eq!(manager.current().routes.len(), initial_route_count);
930
931 manager.reload(ReloadTrigger::Manual).await.unwrap();
933 assert_eq!(manager.current().routes.len(), 2);
934
935 write_config_with_routes(&config_path, 5);
937 manager.reload(ReloadTrigger::Manual).await.unwrap();
938 assert_eq!(
939 manager.current().routes.len(),
940 5,
941 "New config should be visible immediately after reload"
942 );
943
944 write_config_with_routes(&config_path, 1);
946 manager.reload(ReloadTrigger::Manual).await.unwrap();
947 assert_eq!(
948 manager.current().routes.len(),
949 1,
950 "Config changes should be visible after each reload"
951 );
952 }
953
954 #[tokio::test]
955 async fn test_rapid_successive_reloads() {
956 let initial_config = Config::default_for_testing();
958 let temp_dir = tempfile::tempdir().unwrap();
959 let config_path = temp_dir.path().join("config.kdl");
960
961 write_config_with_routes(&config_path, 3);
962
963 let manager = ConfigManager::new(&config_path, initial_config)
964 .await
965 .unwrap();
966
967 for i in 0..20 {
969 write_config_with_routes(&config_path, (i % 5) + 1);
971 let result = manager.reload(ReloadTrigger::Manual).await;
972 assert!(result.is_ok(), "Reload {} should succeed", i);
973 }
974
975 let stats = manager.stats();
977 assert_eq!(
978 stats
979 .successful_reloads
980 .load(std::sync::atomic::Ordering::Relaxed),
981 20,
982 "All 20 reloads should succeed"
983 );
984 assert_eq!(
985 stats
986 .failed_reloads
987 .load(std::sync::atomic::Ordering::Relaxed),
988 0,
989 "No reloads should fail"
990 );
991 }
992
993 #[tokio::test]
994 async fn test_rollback_preserves_previous_config() {
995 let initial_config = Config::default_for_testing();
997 let temp_dir = tempfile::tempdir().unwrap();
998 let config_path = temp_dir.path().join("config.kdl");
999
1000 write_config_with_routes(&config_path, 3);
1002
1003 let manager = ConfigManager::new(&config_path, initial_config)
1004 .await
1005 .unwrap();
1006
1007 manager.reload(ReloadTrigger::Manual).await.unwrap();
1009 assert_eq!(manager.current().routes.len(), 3);
1010
1011 write_config_with_routes(&config_path, 5);
1013 manager.reload(ReloadTrigger::Manual).await.unwrap();
1014 assert_eq!(manager.current().routes.len(), 5);
1015
1016 manager
1018 .rollback("Testing rollback".to_string())
1019 .await
1020 .unwrap();
1021 assert_eq!(
1022 manager.current().routes.len(),
1023 3,
1024 "Rollback should restore previous config"
1025 );
1026
1027 assert_eq!(
1029 manager
1030 .stats()
1031 .rollbacks
1032 .load(std::sync::atomic::Ordering::Relaxed),
1033 1,
1034 "Rollback should be recorded in stats"
1035 );
1036 }
1037
1038 #[tokio::test]
1039 async fn test_reload_events_broadcast() {
1040 let initial_config = Config::default_for_testing();
1042 let temp_dir = tempfile::tempdir().unwrap();
1043 let config_path = temp_dir.path().join("config.kdl");
1044
1045 write_config_with_routes(&config_path, 2);
1046
1047 let manager = ConfigManager::new(&config_path, initial_config)
1048 .await
1049 .unwrap();
1050
1051 let mut receiver = manager.subscribe();
1053
1054 manager.reload(ReloadTrigger::Manual).await.unwrap();
1056
1057 let mut events = Vec::new();
1059 loop {
1060 match tokio::time::timeout(Duration::from_millis(100), receiver.recv()).await {
1061 Ok(Ok(event)) => events.push(event),
1062 _ => break,
1063 }
1064 }
1065
1066 assert!(events.len() >= 2, "Should receive at least Started and Applied/Validated events");
1068
1069 assert!(
1071 events.iter().any(|e| matches!(e, ReloadEvent::Started { .. })),
1072 "Should receive Started event"
1073 );
1074
1075 assert!(
1077 events.iter().any(|e| matches!(e, ReloadEvent::Applied { .. })),
1078 "Should receive Applied event on success"
1079 );
1080 }
1081
1082 #[tokio::test]
1083 async fn test_graceful_coordinator_with_reload() {
1084 let coordinator = GracefulReloadCoordinator::new(Duration::from_secs(5));
1086
1087 coordinator.inc_requests();
1089 coordinator.inc_requests();
1090 coordinator.inc_requests();
1091 assert_eq!(coordinator.active_count(), 3);
1092
1093 coordinator.dec_requests();
1095 assert_eq!(coordinator.active_count(), 2);
1096
1097 let coord_clone = Arc::new(coordinator);
1099 let coord_for_drain = Arc::clone(&coord_clone);
1100 let drain_handle = tokio::spawn(async move {
1101 coord_for_drain.wait_for_drain().await
1102 });
1103
1104 tokio::time::sleep(Duration::from_millis(50)).await;
1106 coord_clone.dec_requests();
1107 tokio::time::sleep(Duration::from_millis(50)).await;
1108 coord_clone.dec_requests();
1109
1110 let drained = drain_handle.await.unwrap();
1112 assert!(drained, "All requests should drain successfully");
1113 }
1114
1115 #[tokio::test]
1116 async fn test_graceful_coordinator_drain_timeout() {
1117 let coordinator = GracefulReloadCoordinator::new(Duration::from_millis(200));
1119
1120 coordinator.inc_requests();
1122 coordinator.inc_requests();
1123
1124 let drained = coordinator.wait_for_drain().await;
1126 assert!(!drained, "Drain should timeout with stuck requests");
1127 assert_eq!(coordinator.active_count(), 2, "Requests should still be tracked");
1128 }
1129}