1mod coordinator;
13mod signals;
14mod validators;
15
16pub use coordinator::GracefulReloadCoordinator;
17pub use signals::{SignalManager, SignalType};
18pub use validators::{RouteValidator, UpstreamValidator};
19
20use arc_swap::ArcSwap;
23use notify::{Event, EventKind, RecursiveMode, Watcher};
24use std::path::{Path, PathBuf};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tokio::sync::{broadcast, RwLock};
28use tracing::{debug, error, info, trace, warn};
29
30use grapsus_common::errors::{GrapsusError, GrapsusResult};
31use grapsus_config::Config;
32
33use crate::logging::{AuditLogEntry, SharedLogManager};
34use crate::tls::CertificateReloader;
35
36#[derive(Debug, Clone)]
42pub enum ReloadEvent {
43 Started {
45 timestamp: Instant,
46 trigger: ReloadTrigger,
47 },
48 Validated { timestamp: Instant },
50 Applied { timestamp: Instant, version: String },
52 Failed { timestamp: Instant, error: String },
54 RolledBack { timestamp: Instant, reason: String },
56}
57
58#[derive(Debug, Clone)]
60pub enum ReloadTrigger {
61 Manual,
63 FileChange,
65 Signal,
67 Scheduled,
69}
70
71#[async_trait::async_trait]
77pub trait ConfigValidator: Send + Sync {
78 async fn validate(&self, config: &Config) -> GrapsusResult<()>;
80
81 fn name(&self) -> &str;
83}
84
85#[async_trait::async_trait]
87pub trait ReloadHook: Send + Sync {
88 async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> GrapsusResult<()>;
90
91 async fn post_reload(&self, old_config: &Config, new_config: &Config);
93
94 async fn on_failure(&self, config: &Config, error: &GrapsusError);
96
97 fn name(&self) -> &str;
99}
100
101#[derive(Default)]
107pub struct ReloadStats {
108 pub total_reloads: std::sync::atomic::AtomicU64,
110 pub successful_reloads: std::sync::atomic::AtomicU64,
112 pub failed_reloads: std::sync::atomic::AtomicU64,
114 pub rollbacks: std::sync::atomic::AtomicU64,
116 pub config_version: std::sync::atomic::AtomicU64,
118 pub last_success: RwLock<Option<Instant>>,
120 pub last_failure: RwLock<Option<Instant>>,
122 pub avg_duration_ms: RwLock<f64>,
124}
125
126pub struct ConfigManager {
132 current_config: Arc<ArcSwap<Config>>,
134 previous_config: Arc<RwLock<Option<Arc<Config>>>>,
136 config_path: PathBuf,
138 watcher: Arc<RwLock<Option<notify::RecommendedWatcher>>>,
140 reload_tx: broadcast::Sender<ReloadEvent>,
142 stats: Arc<ReloadStats>,
144 validators: Arc<RwLock<Vec<Box<dyn ConfigValidator>>>>,
146 reload_hooks: Arc<RwLock<Vec<Box<dyn ReloadHook>>>>,
148 cert_reloader: Arc<CertificateReloader>,
150}
151
152impl ConfigManager {
153 pub async fn new(
155 config_path: impl AsRef<Path>,
156 initial_config: Config,
157 ) -> GrapsusResult<Self> {
158 let config_path = config_path.as_ref().to_path_buf();
159 let (reload_tx, _) = broadcast::channel(100);
160
161 info!(
162 config_path = %config_path.display(),
163 route_count = initial_config.routes.len(),
164 upstream_count = initial_config.upstreams.len(),
165 listener_count = initial_config.listeners.len(),
166 "Initializing configuration manager"
167 );
168
169 trace!(
170 config_path = %config_path.display(),
171 "Creating ArcSwap for configuration"
172 );
173
174 Ok(Self {
175 current_config: Arc::new(ArcSwap::from_pointee(initial_config)),
176 previous_config: Arc::new(RwLock::new(None)),
177 config_path,
178 watcher: Arc::new(RwLock::new(None)),
179 reload_tx,
180 stats: Arc::new(ReloadStats::default()),
181 validators: Arc::new(RwLock::new(Vec::new())),
182 reload_hooks: Arc::new(RwLock::new(Vec::new())),
183 cert_reloader: Arc::new(CertificateReloader::new()),
184 })
185 }
186
187 pub fn cert_reloader(&self) -> Arc<CertificateReloader> {
189 Arc::clone(&self.cert_reloader)
190 }
191
192 pub fn current(&self) -> Arc<Config> {
194 self.current_config.load_full()
195 }
196
197 pub async fn start_watching(&self) -> GrapsusResult<()> {
202 if self.watcher.read().await.is_some() {
204 warn!("File watcher already active, skipping");
205 return Ok(());
206 }
207
208 let config_path = self.config_path.clone();
209
210 let (tx, mut rx) = tokio::sync::mpsc::channel(10);
212
213 let mut watcher =
214 notify::recommended_watcher(move |event: Result<Event, notify::Error>| {
215 if let Ok(event) = event {
216 let _ = tx.blocking_send(event);
217 }
218 })
219 .map_err(|e| GrapsusError::Config {
220 message: format!("Failed to create file watcher: {}", e),
221 source: None,
222 })?;
223
224 watcher
226 .watch(&config_path, RecursiveMode::NonRecursive)
227 .map_err(|e| GrapsusError::Config {
228 message: format!("Failed to watch config file: {}", e),
229 source: None,
230 })?;
231
232 *self.watcher.write().await = Some(watcher);
234
235 let manager = Arc::new(self.clone_for_task());
237 tokio::spawn(async move {
238 while let Some(event) = rx.recv().await {
239 if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_)) {
240 info!("Configuration file changed, triggering reload");
241
242 tokio::time::sleep(Duration::from_millis(100)).await;
244
245 if let Err(e) = manager.reload(ReloadTrigger::FileChange).await {
246 error!("Auto-reload failed: {}", e);
247 error!("Continuing with current configuration");
248 }
249 }
250 }
251 });
252
253 info!(
254 "Auto-reload enabled: watching configuration file {:?}",
255 self.config_path
256 );
257 Ok(())
258 }
259
260 pub async fn reload(&self, trigger: ReloadTrigger) -> GrapsusResult<()> {
262 let start = Instant::now();
263 let reload_num = self
264 .stats
265 .total_reloads
266 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
267 + 1;
268
269 info!(
270 trigger = ?trigger,
271 reload_num = reload_num,
272 config_path = %self.config_path.display(),
273 "Starting configuration reload"
274 );
275
276 let _ = self.reload_tx.send(ReloadEvent::Started {
278 timestamp: Instant::now(),
279 trigger: trigger.clone(),
280 });
281
282 trace!(
283 config_path = %self.config_path.display(),
284 "Reading configuration file"
285 );
286
287 let new_config = match Config::from_file(&self.config_path) {
289 Ok(config) => {
290 debug!(
291 route_count = config.routes.len(),
292 upstream_count = config.upstreams.len(),
293 listener_count = config.listeners.len(),
294 "Configuration file parsed successfully"
295 );
296 config
297 }
298 Err(e) => {
299 let error_msg = format!("Failed to load configuration: {}", e);
300 error!(
301 config_path = %self.config_path.display(),
302 error = %e,
303 "Failed to load configuration file"
304 );
305 self.stats
306 .failed_reloads
307 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
308 *self.stats.last_failure.write().await = Some(Instant::now());
309
310 let _ = self.reload_tx.send(ReloadEvent::Failed {
311 timestamp: Instant::now(),
312 error: error_msg.clone(),
313 });
314
315 return Err(GrapsusError::Config {
316 message: error_msg,
317 source: None,
318 });
319 }
320 };
321
322 trace!("Starting configuration validation");
323
324 if let Err(e) = self.validate_config(&new_config).await {
327 error!(
328 error = %e,
329 "Configuration validation failed - new configuration REJECTED"
330 );
331 self.stats
332 .failed_reloads
333 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
334 *self.stats.last_failure.write().await = Some(Instant::now());
335
336 let _ = self.reload_tx.send(ReloadEvent::Failed {
337 timestamp: Instant::now(),
338 error: e.to_string(),
339 });
340
341 return Err(e);
342 }
343
344 info!(
345 route_count = new_config.routes.len(),
346 upstream_count = new_config.upstreams.len(),
347 "Configuration validation passed, applying new configuration"
348 );
349
350 let _ = self.reload_tx.send(ReloadEvent::Validated {
351 timestamp: Instant::now(),
352 });
353
354 let old_config = self.current_config.load_full();
356
357 trace!(
358 old_routes = old_config.routes.len(),
359 new_routes = new_config.routes.len(),
360 "Preparing configuration swap"
361 );
362
363 let hooks = self.reload_hooks.read().await;
365 for hook in hooks.iter() {
366 trace!(hook_name = %hook.name(), "Running pre-reload hook");
367 if let Err(e) = hook.pre_reload(&old_config, &new_config).await {
368 warn!(
369 hook_name = %hook.name(),
370 error = %e,
371 "Pre-reload hook failed"
372 );
373 }
375 }
376 drop(hooks);
377
378 trace!("Saving previous configuration for potential rollback");
380 *self.previous_config.write().await = Some(old_config.clone());
381
382 trace!("Applying new configuration atomically");
384 self.current_config.store(Arc::new(new_config.clone()));
385
386 let hooks = self.reload_hooks.read().await;
388 for hook in hooks.iter() {
389 trace!(hook_name = %hook.name(), "Running post-reload hook");
390 hook.post_reload(&old_config, &new_config).await;
391 }
392 drop(hooks);
393
394 let duration = start.elapsed();
396 let successful_count = self
397 .stats
398 .successful_reloads
399 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
400 + 1;
401 *self.stats.last_success.write().await = Some(Instant::now());
402
403 {
405 let mut avg = self.stats.avg_duration_ms.write().await;
406 let total = successful_count as f64;
407 *avg = (*avg * (total - 1.0) + duration.as_millis() as f64) / total;
408 }
409
410 let new_version = self
412 .stats
413 .config_version
414 .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
415 + 1;
416
417 let _ = self.reload_tx.send(ReloadEvent::Applied {
418 timestamp: Instant::now(),
419 version: format!("v{}", new_version),
420 });
421
422 let (cert_success, cert_errors) = self.cert_reloader.reload_all();
425 if !cert_errors.is_empty() {
426 for (listener_id, error) in &cert_errors {
427 error!(
428 listener_id = %listener_id,
429 error = %error,
430 "TLS certificate reload failed for listener"
431 );
432 }
433 }
434
435 info!(
436 duration_ms = duration.as_millis(),
437 successful_reloads = successful_count,
438 route_count = new_config.routes.len(),
439 upstream_count = new_config.upstreams.len(),
440 cert_reload_success = cert_success,
441 cert_reload_errors = cert_errors.len(),
442 "Configuration reload completed successfully"
443 );
444
445 Ok(())
446 }
447
448 pub async fn rollback(&self, reason: String) -> GrapsusResult<()> {
450 info!(
451 reason = %reason,
452 "Starting configuration rollback"
453 );
454
455 let previous = self.previous_config.read().await.clone();
456
457 if let Some(prev_config) = previous {
458 trace!(
459 route_count = prev_config.routes.len(),
460 "Found previous configuration for rollback"
461 );
462
463 trace!("Validating previous configuration");
465 if let Err(e) = self.validate_config(&prev_config).await {
466 error!(
467 error = %e,
468 "Previous configuration validation failed during rollback"
469 );
470 return Err(e);
471 }
472
473 trace!("Applying previous configuration");
475 self.current_config.store(prev_config.clone());
476 let rollback_count = self
477 .stats
478 .rollbacks
479 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
480 + 1;
481
482 let _ = self.reload_tx.send(ReloadEvent::RolledBack {
483 timestamp: Instant::now(),
484 reason: reason.clone(),
485 });
486
487 info!(
488 reason = %reason,
489 rollback_count = rollback_count,
490 route_count = prev_config.routes.len(),
491 "Configuration rolled back successfully"
492 );
493 Ok(())
494 } else {
495 warn!("No previous configuration available for rollback");
496 Err(GrapsusError::Config {
497 message: "No previous configuration available".to_string(),
498 source: None,
499 })
500 }
501 }
502
503 async fn validate_config(&self, config: &Config) -> GrapsusResult<()> {
505 trace!(
506 route_count = config.routes.len(),
507 upstream_count = config.upstreams.len(),
508 "Starting configuration validation"
509 );
510
511 trace!("Running built-in config validation");
513 config.validate()?;
514
515 let validators = self.validators.read().await;
517 trace!(
518 validator_count = validators.len(),
519 "Running custom validators"
520 );
521 for validator in validators.iter() {
522 trace!(validator_name = %validator.name(), "Running validator");
523 validator.validate(config).await.map_err(|e| {
524 error!(
525 validator_name = %validator.name(),
526 error = %e,
527 "Validator failed"
528 );
529 e
530 })?;
531 }
532
533 debug!(
534 route_count = config.routes.len(),
535 upstream_count = config.upstreams.len(),
536 "Configuration validation passed"
537 );
538
539 Ok(())
540 }
541
542 pub async fn add_validator(&self, validator: Box<dyn ConfigValidator>) {
544 info!("Adding configuration validator: {}", validator.name());
545 self.validators.write().await.push(validator);
546 }
547
548 pub async fn add_hook(&self, hook: Box<dyn ReloadHook>) {
550 info!("Adding reload hook: {}", hook.name());
551 self.reload_hooks.write().await.push(hook);
552 }
553
554 pub fn subscribe(&self) -> broadcast::Receiver<ReloadEvent> {
556 self.reload_tx.subscribe()
557 }
558
559 pub fn stats(&self) -> &ReloadStats {
561 &self.stats
562 }
563
564 fn clone_for_task(&self) -> ConfigManager {
566 ConfigManager {
567 current_config: Arc::clone(&self.current_config),
568 previous_config: Arc::clone(&self.previous_config),
569 config_path: self.config_path.clone(),
570 watcher: self.watcher.clone(),
571 reload_tx: self.reload_tx.clone(),
572 stats: Arc::clone(&self.stats),
573 validators: Arc::clone(&self.validators),
574 reload_hooks: Arc::clone(&self.reload_hooks),
575 cert_reloader: Arc::clone(&self.cert_reloader),
576 }
577 }
578}
579
580pub struct AuditReloadHook {
586 log_manager: SharedLogManager,
587}
588
589impl AuditReloadHook {
590 pub fn new(log_manager: SharedLogManager) -> Self {
592 Self { log_manager }
593 }
594}
595
596#[async_trait::async_trait]
597impl ReloadHook for AuditReloadHook {
598 async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> GrapsusResult<()> {
599 let trace_id = uuid::Uuid::new_v4().to_string();
601 let audit_entry = AuditLogEntry::config_change(
602 &trace_id,
603 "reload_started",
604 format!(
605 "Configuration reload starting: {} routes -> {} routes, {} upstreams -> {} upstreams",
606 old_config.routes.len(),
607 new_config.routes.len(),
608 old_config.upstreams.len(),
609 new_config.upstreams.len()
610 ),
611 );
612 self.log_manager.log_audit(&audit_entry);
613 Ok(())
614 }
615
616 async fn post_reload(&self, old_config: &Config, new_config: &Config) {
617 let trace_id = uuid::Uuid::new_v4().to_string();
619 let audit_entry = AuditLogEntry::config_change(
620 &trace_id,
621 "reload_success",
622 format!(
623 "Configuration reload successful: {} routes, {} upstreams, {} listeners",
624 new_config.routes.len(),
625 new_config.upstreams.len(),
626 new_config.listeners.len()
627 ),
628 )
629 .with_metadata("old_routes", old_config.routes.len().to_string())
630 .with_metadata("new_routes", new_config.routes.len().to_string())
631 .with_metadata("old_upstreams", old_config.upstreams.len().to_string())
632 .with_metadata("new_upstreams", new_config.upstreams.len().to_string());
633 self.log_manager.log_audit(&audit_entry);
634 }
635
636 async fn on_failure(&self, config: &Config, error: &GrapsusError) {
637 let trace_id = uuid::Uuid::new_v4().to_string();
639 let audit_entry = AuditLogEntry::config_change(
640 &trace_id,
641 "reload_failed",
642 format!("Configuration reload failed: {}", error),
643 )
644 .with_metadata("current_routes", config.routes.len().to_string())
645 .with_metadata("current_upstreams", config.upstreams.len().to_string());
646 self.log_manager.log_audit(&audit_entry);
647 }
648
649 fn name(&self) -> &str {
650 "audit_reload_hook"
651 }
652}
653
654#[cfg(test)]
655mod tests {
656 use super::*;
657
658 #[tokio::test]
659 async fn test_config_reload_rejects_invalid_config() {
660 let initial_config = Config::default_for_testing();
662 let initial_routes = initial_config.routes.len();
663
664 let temp_dir = tempfile::tempdir().unwrap();
665 let config_path = temp_dir.path().join("config.kdl");
666
667 std::fs::write(&config_path, "this is not valid KDL { {{{{ broken").unwrap();
669
670 let manager = ConfigManager::new(&config_path, initial_config)
672 .await
673 .unwrap();
674
675 assert_eq!(manager.current().routes.len(), initial_routes);
677
678 let result = manager.reload(ReloadTrigger::Manual).await;
680 assert!(result.is_err(), "Reload should fail for invalid config");
681
682 assert_eq!(
684 manager.current().routes.len(),
685 initial_routes,
686 "Original config should be preserved after failed reload"
687 );
688
689 assert_eq!(
691 manager
692 .stats()
693 .failed_reloads
694 .load(std::sync::atomic::Ordering::Relaxed),
695 1,
696 "Failed reload should be recorded"
697 );
698 }
699
700 #[tokio::test]
701 async fn test_config_reload_accepts_valid_config() {
702 let initial_config = Config::default_for_testing();
704 let temp_dir = tempfile::tempdir().unwrap();
705 let config_path = temp_dir.path().join("config.kdl");
706
707 let static_dir = temp_dir.path().join("static");
709 std::fs::create_dir_all(&static_dir).unwrap();
710
711 let valid_config = r#"
713server {
714 worker-threads 4
715}
716
717listeners {
718 listener "http" {
719 address "0.0.0.0:8080"
720 protocol "http"
721 }
722}
723
724upstreams {
725 upstream "backend" {
726 target "127.0.0.1:3000"
727 }
728}
729
730routes {
731 route "api" {
732 priority "high"
733 matches {
734 path-prefix "/api/"
735 }
736 upstream "backend"
737 }
738}
739"#;
740 std::fs::write(&config_path, valid_config).unwrap();
741
742 let manager = ConfigManager::new(&config_path, initial_config)
744 .await
745 .unwrap();
746
747 let result = manager.reload(ReloadTrigger::Manual).await;
749 assert!(
750 result.is_ok(),
751 "Reload should succeed for valid config: {:?}",
752 result.err()
753 );
754
755 assert_eq!(
757 manager
758 .stats()
759 .successful_reloads
760 .load(std::sync::atomic::Ordering::Relaxed),
761 1,
762 "Successful reload should be recorded"
763 );
764 }
765
766 fn write_config_with_routes(path: &Path, route_count: usize) {
772 let mut routes = String::new();
773 for i in 0..route_count {
774 routes.push_str(&format!(
775 r#"
776 route "route{i}" {{
777 priority "medium"
778 matches {{
779 path-prefix "/route{i}/"
780 }}
781 upstream "backend"
782 }}
783"#
784 ));
785 }
786
787 let config = format!(
788 r#"
789server {{
790 worker-threads 4
791}}
792
793listeners {{
794 listener "http" {{
795 address "0.0.0.0:8080"
796 protocol "http"
797 }}
798}}
799
800upstreams {{
801 upstream "backend" {{
802 target "127.0.0.1:3000"
803 }}
804}}
805
806routes {{
807{routes}
808}}
809"#
810 );
811
812 std::fs::write(path, config).unwrap();
813 }
814
815 #[tokio::test]
816 async fn test_concurrent_config_reads_during_reload() {
817 let initial_config = Config::default_for_testing();
819 let temp_dir = tempfile::tempdir().unwrap();
820 let config_path = temp_dir.path().join("config.kdl");
821
822 write_config_with_routes(&config_path, 5);
823
824 let manager = Arc::new(
825 ConfigManager::new(&config_path, initial_config)
826 .await
827 .unwrap(),
828 );
829
830 let mut readers = Vec::new();
832 for _ in 0..10 {
833 let manager_clone = Arc::clone(&manager);
834 readers.push(tokio::spawn(async move {
835 let mut read_count = 0;
836 for _ in 0..100 {
837 let config = manager_clone.current();
838 let _ = config.routes.len();
840 read_count += 1;
841 tokio::task::yield_now().await;
842 }
843 read_count
844 }));
845 }
846
847 let manager_reload = Arc::clone(&manager);
849 let reload_handle =
850 tokio::spawn(async move { manager_reload.reload(ReloadTrigger::Manual).await });
851
852 let mut total_reads = 0;
854 for reader in readers {
855 total_reads += reader.await.unwrap();
856 }
857
858 let reload_result = reload_handle.await.unwrap();
859 assert!(reload_result.is_ok(), "Reload should succeed");
860 assert_eq!(total_reads, 1000, "All reads should complete");
861 }
862
863 #[tokio::test]
864 async fn test_multiple_concurrent_reloads() {
865 let initial_config = Config::default_for_testing();
867 let temp_dir = tempfile::tempdir().unwrap();
868 let config_path = temp_dir.path().join("config.kdl");
869
870 write_config_with_routes(&config_path, 3);
871
872 let manager = Arc::new(
873 ConfigManager::new(&config_path, initial_config)
874 .await
875 .unwrap(),
876 );
877
878 let mut reload_handles = Vec::new();
880 for i in 0..5 {
881 let manager_clone = Arc::clone(&manager);
882 let trigger = if i % 2 == 0 {
883 ReloadTrigger::Manual
884 } else {
885 ReloadTrigger::Signal
886 };
887 reload_handles.push(tokio::spawn(
888 async move { manager_clone.reload(trigger).await },
889 ));
890 }
891
892 let mut success_count = 0;
894 for handle in reload_handles {
895 if handle.await.unwrap().is_ok() {
896 success_count += 1;
897 }
898 }
899
900 assert!(success_count >= 1, "At least one reload should succeed");
902
903 let total = manager
905 .stats()
906 .total_reloads
907 .load(std::sync::atomic::Ordering::Relaxed);
908 assert_eq!(total, 5, "All reload attempts should be counted");
909 }
910
911 #[tokio::test]
912 async fn test_config_visibility_after_reload() {
913 let initial_config = Config::default_for_testing();
915 let initial_route_count = initial_config.routes.len();
916
917 let temp_dir = tempfile::tempdir().unwrap();
918 let config_path = temp_dir.path().join("config.kdl");
919
920 write_config_with_routes(&config_path, 2);
922
923 let manager = ConfigManager::new(&config_path, initial_config)
924 .await
925 .unwrap();
926
927 assert_eq!(manager.current().routes.len(), initial_route_count);
929
930 manager.reload(ReloadTrigger::Manual).await.unwrap();
932 assert_eq!(manager.current().routes.len(), 2);
933
934 write_config_with_routes(&config_path, 5);
936 manager.reload(ReloadTrigger::Manual).await.unwrap();
937 assert_eq!(
938 manager.current().routes.len(),
939 5,
940 "New config should be visible immediately after reload"
941 );
942
943 write_config_with_routes(&config_path, 1);
945 manager.reload(ReloadTrigger::Manual).await.unwrap();
946 assert_eq!(
947 manager.current().routes.len(),
948 1,
949 "Config changes should be visible after each reload"
950 );
951 }
952
953 #[tokio::test]
954 async fn test_rapid_successive_reloads() {
955 let initial_config = Config::default_for_testing();
957 let temp_dir = tempfile::tempdir().unwrap();
958 let config_path = temp_dir.path().join("config.kdl");
959
960 write_config_with_routes(&config_path, 3);
961
962 let manager = ConfigManager::new(&config_path, initial_config)
963 .await
964 .unwrap();
965
966 for i in 0..20 {
968 write_config_with_routes(&config_path, (i % 5) + 1);
970 let result = manager.reload(ReloadTrigger::Manual).await;
971 assert!(result.is_ok(), "Reload {} should succeed", i);
972 }
973
974 let stats = manager.stats();
976 assert_eq!(
977 stats
978 .successful_reloads
979 .load(std::sync::atomic::Ordering::Relaxed),
980 20,
981 "All 20 reloads should succeed"
982 );
983 assert_eq!(
984 stats
985 .failed_reloads
986 .load(std::sync::atomic::Ordering::Relaxed),
987 0,
988 "No reloads should fail"
989 );
990 }
991
992 #[tokio::test]
993 async fn test_rollback_preserves_previous_config() {
994 let initial_config = Config::default_for_testing();
996 let temp_dir = tempfile::tempdir().unwrap();
997 let config_path = temp_dir.path().join("config.kdl");
998
999 write_config_with_routes(&config_path, 3);
1001
1002 let manager = ConfigManager::new(&config_path, initial_config)
1003 .await
1004 .unwrap();
1005
1006 manager.reload(ReloadTrigger::Manual).await.unwrap();
1008 assert_eq!(manager.current().routes.len(), 3);
1009
1010 write_config_with_routes(&config_path, 5);
1012 manager.reload(ReloadTrigger::Manual).await.unwrap();
1013 assert_eq!(manager.current().routes.len(), 5);
1014
1015 manager
1017 .rollback("Testing rollback".to_string())
1018 .await
1019 .unwrap();
1020 assert_eq!(
1021 manager.current().routes.len(),
1022 3,
1023 "Rollback should restore previous config"
1024 );
1025
1026 assert_eq!(
1028 manager
1029 .stats()
1030 .rollbacks
1031 .load(std::sync::atomic::Ordering::Relaxed),
1032 1,
1033 "Rollback should be recorded in stats"
1034 );
1035 }
1036
1037 #[tokio::test]
1038 async fn test_reload_events_broadcast() {
1039 let initial_config = Config::default_for_testing();
1041 let temp_dir = tempfile::tempdir().unwrap();
1042 let config_path = temp_dir.path().join("config.kdl");
1043
1044 write_config_with_routes(&config_path, 2);
1045
1046 let manager = ConfigManager::new(&config_path, initial_config)
1047 .await
1048 .unwrap();
1049
1050 let mut receiver = manager.subscribe();
1052
1053 manager.reload(ReloadTrigger::Manual).await.unwrap();
1055
1056 let mut events = Vec::new();
1058 while let Ok(Ok(event)) =
1059 tokio::time::timeout(Duration::from_millis(100), receiver.recv()).await
1060 {
1061 events.push(event);
1062 }
1063
1064 assert!(
1066 events.len() >= 2,
1067 "Should receive at least Started and Applied/Validated events"
1068 );
1069
1070 assert!(
1072 events
1073 .iter()
1074 .any(|e| matches!(e, ReloadEvent::Started { .. })),
1075 "Should receive Started event"
1076 );
1077
1078 assert!(
1080 events
1081 .iter()
1082 .any(|e| matches!(e, ReloadEvent::Applied { .. })),
1083 "Should receive Applied event on success"
1084 );
1085 }
1086
1087 #[tokio::test]
1088 async fn test_graceful_coordinator_with_reload() {
1089 let coordinator = GracefulReloadCoordinator::new(Duration::from_secs(5));
1091
1092 coordinator.inc_requests();
1094 coordinator.inc_requests();
1095 coordinator.inc_requests();
1096 assert_eq!(coordinator.active_count(), 3);
1097
1098 coordinator.dec_requests();
1100 assert_eq!(coordinator.active_count(), 2);
1101
1102 let coord_clone = Arc::new(coordinator);
1104 let coord_for_drain = Arc::clone(&coord_clone);
1105 let drain_handle = tokio::spawn(async move { coord_for_drain.wait_for_drain().await });
1106
1107 tokio::time::sleep(Duration::from_millis(50)).await;
1109 coord_clone.dec_requests();
1110 tokio::time::sleep(Duration::from_millis(50)).await;
1111 coord_clone.dec_requests();
1112
1113 let drained = drain_handle.await.unwrap();
1115 assert!(drained, "All requests should drain successfully");
1116 }
1117
1118 #[tokio::test]
1119 async fn test_graceful_coordinator_drain_timeout() {
1120 let coordinator = GracefulReloadCoordinator::new(Duration::from_millis(200));
1122
1123 coordinator.inc_requests();
1125 coordinator.inc_requests();
1126
1127 let drained = coordinator.wait_for_drain().await;
1129 assert!(!drained, "Drain should timeout with stuck requests");
1130 assert_eq!(
1131 coordinator.active_count(),
1132 2,
1133 "Requests should still be tracked"
1134 );
1135 }
1136}