sentinel_proxy/reload/
mod.rs

1//! Configuration hot reload module for Sentinel proxy.
2//!
3//! This module implements zero-downtime configuration reloading with validation,
4//! atomic swaps, and rollback support for production reliability.
5//!
6//! ## Submodules
7//!
8//! - [`coordinator`]: Graceful reload coordination and request draining
9//! - [`signals`]: OS signal handling (SIGHUP, SIGTERM)
10//! - [`validators`]: Runtime configuration validators
11
12mod coordinator;
13mod signals;
14mod validators;
15
16pub use coordinator::GracefulReloadCoordinator;
17pub use signals::{SignalManager, SignalType};
18pub use validators::{RouteValidator, UpstreamValidator};
19
20// Re-export for use by proxy initialization
21
22use arc_swap::ArcSwap;
23use notify::{Event, EventKind, RecursiveMode, Watcher};
24use std::path::{Path, PathBuf};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tokio::sync::{broadcast, RwLock};
28use tracing::{debug, error, info, trace, warn};
29
30use sentinel_common::errors::{SentinelError, SentinelResult};
31use sentinel_config::Config;
32
33use crate::logging::{AuditLogEntry, SharedLogManager};
34use crate::tls::CertificateReloader;
35
36// ============================================================================
37// Reload Events and Types
38// ============================================================================
39
40/// Reload event types
41#[derive(Debug, Clone)]
42pub enum ReloadEvent {
43    /// Configuration reload started
44    Started {
45        timestamp: Instant,
46        trigger: ReloadTrigger,
47    },
48    /// Configuration validated successfully
49    Validated { timestamp: Instant },
50    /// Configuration applied successfully
51    Applied { timestamp: Instant, version: String },
52    /// Configuration reload failed
53    Failed { timestamp: Instant, error: String },
54    /// Configuration rolled back
55    RolledBack { timestamp: Instant, reason: String },
56}
57
58/// Reload trigger source
59#[derive(Debug, Clone)]
60pub enum ReloadTrigger {
61    /// Manual reload via API
62    Manual,
63    /// File change detected
64    FileChange,
65    /// Signal received (SIGHUP)
66    Signal,
67    /// Scheduled reload
68    Scheduled,
69}
70
71// ============================================================================
72// Traits
73// ============================================================================
74
75/// Configuration validator trait
76#[async_trait::async_trait]
77pub trait ConfigValidator: Send + Sync {
78    /// Validate configuration before applying
79    async fn validate(&self, config: &Config) -> SentinelResult<()>;
80
81    /// Validator name for logging
82    fn name(&self) -> &str;
83}
84
85/// Reload hook trait for custom actions
86#[async_trait::async_trait]
87pub trait ReloadHook: Send + Sync {
88    /// Called before reload starts
89    async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> SentinelResult<()>;
90
91    /// Called after successful reload
92    async fn post_reload(&self, old_config: &Config, new_config: &Config);
93
94    /// Called on reload failure
95    async fn on_failure(&self, config: &Config, error: &SentinelError);
96
97    /// Hook name for logging
98    fn name(&self) -> &str;
99}
100
101// ============================================================================
102// Reload Statistics
103// ============================================================================
104
105/// Reload statistics
106#[derive(Default)]
107pub struct ReloadStats {
108    /// Total reload attempts
109    pub total_reloads: std::sync::atomic::AtomicU64,
110    /// Successful reloads
111    pub successful_reloads: std::sync::atomic::AtomicU64,
112    /// Failed reloads
113    pub failed_reloads: std::sync::atomic::AtomicU64,
114    /// Rollbacks performed
115    pub rollbacks: std::sync::atomic::AtomicU64,
116    /// Current config version (incremented on each successful reload)
117    pub config_version: std::sync::atomic::AtomicU64,
118    /// Last successful reload time
119    pub last_success: RwLock<Option<Instant>>,
120    /// Last failure time
121    pub last_failure: RwLock<Option<Instant>>,
122    /// Average reload duration
123    pub avg_duration_ms: RwLock<f64>,
124}
125
126// ============================================================================
127// Configuration Manager
128// ============================================================================
129
130/// Configuration manager with hot reload support
131pub struct ConfigManager {
132    /// Current active configuration
133    current_config: Arc<ArcSwap<Config>>,
134    /// Previous configuration for rollback
135    previous_config: Arc<RwLock<Option<Arc<Config>>>>,
136    /// Configuration file path
137    config_path: PathBuf,
138    /// File watcher for auto-reload (uses RwLock for interior mutability)
139    watcher: Arc<RwLock<Option<notify::RecommendedWatcher>>>,
140    /// Reload event broadcaster
141    reload_tx: broadcast::Sender<ReloadEvent>,
142    /// Reload statistics
143    stats: Arc<ReloadStats>,
144    /// Validation hooks
145    validators: Arc<RwLock<Vec<Box<dyn ConfigValidator>>>>,
146    /// Reload hooks
147    reload_hooks: Arc<RwLock<Vec<Box<dyn ReloadHook>>>>,
148    /// Certificate reloader for TLS hot-reload
149    cert_reloader: Arc<CertificateReloader>,
150}
151
152impl ConfigManager {
153    /// Create new configuration manager
154    pub async fn new(
155        config_path: impl AsRef<Path>,
156        initial_config: Config,
157    ) -> SentinelResult<Self> {
158        let config_path = config_path.as_ref().to_path_buf();
159        let (reload_tx, _) = broadcast::channel(100);
160
161        info!(
162            config_path = %config_path.display(),
163            route_count = initial_config.routes.len(),
164            upstream_count = initial_config.upstreams.len(),
165            listener_count = initial_config.listeners.len(),
166            "Initializing configuration manager"
167        );
168
169        trace!(
170            config_path = %config_path.display(),
171            "Creating ArcSwap for configuration"
172        );
173
174        Ok(Self {
175            current_config: Arc::new(ArcSwap::from_pointee(initial_config)),
176            previous_config: Arc::new(RwLock::new(None)),
177            config_path,
178            watcher: Arc::new(RwLock::new(None)),
179            reload_tx,
180            stats: Arc::new(ReloadStats::default()),
181            validators: Arc::new(RwLock::new(Vec::new())),
182            reload_hooks: Arc::new(RwLock::new(Vec::new())),
183            cert_reloader: Arc::new(CertificateReloader::new()),
184        })
185    }
186
187    /// Get the certificate reloader for registering TLS listeners
188    pub fn cert_reloader(&self) -> Arc<CertificateReloader> {
189        Arc::clone(&self.cert_reloader)
190    }
191
192    /// Get current configuration
193    pub fn current(&self) -> Arc<Config> {
194        self.current_config.load_full()
195    }
196
197    /// Start watching configuration file for changes
198    ///
199    /// When enabled, the proxy will automatically reload configuration
200    /// when the config file is modified.
201    pub async fn start_watching(&self) -> SentinelResult<()> {
202        // Check if already watching
203        if self.watcher.read().await.is_some() {
204            warn!("File watcher already active, skipping");
205            return Ok(());
206        }
207
208        let config_path = self.config_path.clone();
209
210        // Create file watcher
211        let (tx, mut rx) = tokio::sync::mpsc::channel(10);
212
213        let mut watcher =
214            notify::recommended_watcher(move |event: Result<Event, notify::Error>| {
215                if let Ok(event) = event {
216                    let _ = tx.blocking_send(event);
217                }
218            })
219            .map_err(|e| SentinelError::Config {
220                message: format!("Failed to create file watcher: {}", e),
221                source: None,
222            })?;
223
224        // Watch config file
225        watcher
226            .watch(&config_path, RecursiveMode::NonRecursive)
227            .map_err(|e| SentinelError::Config {
228                message: format!("Failed to watch config file: {}", e),
229                source: None,
230            })?;
231
232        // Store watcher using interior mutability
233        *self.watcher.write().await = Some(watcher);
234
235        // Spawn event handler task
236        let manager = Arc::new(self.clone_for_task());
237        tokio::spawn(async move {
238            while let Some(event) = rx.recv().await {
239                if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_)) {
240                    info!("Configuration file changed, triggering reload");
241
242                    // Debounce rapid changes
243                    tokio::time::sleep(Duration::from_millis(100)).await;
244
245                    if let Err(e) = manager.reload(ReloadTrigger::FileChange).await {
246                        error!("Auto-reload failed: {}", e);
247                        error!("Continuing with current configuration");
248                    }
249                }
250            }
251        });
252
253        info!(
254            "Auto-reload enabled: watching configuration file {:?}",
255            self.config_path
256        );
257        Ok(())
258    }
259
260    /// Reload configuration
261    pub async fn reload(&self, trigger: ReloadTrigger) -> SentinelResult<()> {
262        let start = Instant::now();
263        let reload_num = self
264            .stats
265            .total_reloads
266            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
267            + 1;
268
269        info!(
270            trigger = ?trigger,
271            reload_num = reload_num,
272            config_path = %self.config_path.display(),
273            "Starting configuration reload"
274        );
275
276        // Notify reload started
277        let _ = self.reload_tx.send(ReloadEvent::Started {
278            timestamp: Instant::now(),
279            trigger: trigger.clone(),
280        });
281
282        trace!(
283            config_path = %self.config_path.display(),
284            "Reading configuration file"
285        );
286
287        // Load new configuration
288        let new_config = match Config::from_file(&self.config_path) {
289            Ok(config) => {
290                debug!(
291                    route_count = config.routes.len(),
292                    upstream_count = config.upstreams.len(),
293                    listener_count = config.listeners.len(),
294                    "Configuration file parsed successfully"
295                );
296                config
297            }
298            Err(e) => {
299                let error_msg = format!("Failed to load configuration: {}", e);
300                error!(
301                    config_path = %self.config_path.display(),
302                    error = %e,
303                    "Failed to load configuration file"
304                );
305                self.stats
306                    .failed_reloads
307                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
308                *self.stats.last_failure.write().await = Some(Instant::now());
309
310                let _ = self.reload_tx.send(ReloadEvent::Failed {
311                    timestamp: Instant::now(),
312                    error: error_msg.clone(),
313                });
314
315                return Err(SentinelError::Config {
316                    message: error_msg,
317                    source: None,
318                });
319            }
320        };
321
322        trace!("Starting configuration validation");
323
324        // Validate new configuration BEFORE applying
325        // This is critical - invalid configs must never be loaded
326        if let Err(e) = self.validate_config(&new_config).await {
327            error!(
328                error = %e,
329                "Configuration validation failed - new configuration REJECTED"
330            );
331            self.stats
332                .failed_reloads
333                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
334            *self.stats.last_failure.write().await = Some(Instant::now());
335
336            let _ = self.reload_tx.send(ReloadEvent::Failed {
337                timestamp: Instant::now(),
338                error: e.to_string(),
339            });
340
341            return Err(e);
342        }
343
344        info!(
345            route_count = new_config.routes.len(),
346            upstream_count = new_config.upstreams.len(),
347            "Configuration validation passed, applying new configuration"
348        );
349
350        let _ = self.reload_tx.send(ReloadEvent::Validated {
351            timestamp: Instant::now(),
352        });
353
354        // Get current config for rollback
355        let old_config = self.current_config.load_full();
356
357        trace!(
358            old_routes = old_config.routes.len(),
359            new_routes = new_config.routes.len(),
360            "Preparing configuration swap"
361        );
362
363        // Run pre-reload hooks
364        let hooks = self.reload_hooks.read().await;
365        for hook in hooks.iter() {
366            trace!(hook_name = %hook.name(), "Running pre-reload hook");
367            if let Err(e) = hook.pre_reload(&old_config, &new_config).await {
368                warn!(
369                    hook_name = %hook.name(),
370                    error = %e,
371                    "Pre-reload hook failed"
372                );
373                // Continue with reload despite hook failure
374            }
375        }
376        drop(hooks);
377
378        // Save previous config for rollback
379        trace!("Saving previous configuration for potential rollback");
380        *self.previous_config.write().await = Some(old_config.clone());
381
382        // Apply new configuration atomically
383        trace!("Applying new configuration atomically");
384        self.current_config.store(Arc::new(new_config.clone()));
385
386        // Run post-reload hooks
387        let hooks = self.reload_hooks.read().await;
388        for hook in hooks.iter() {
389            trace!(hook_name = %hook.name(), "Running post-reload hook");
390            hook.post_reload(&old_config, &new_config).await;
391        }
392        drop(hooks);
393
394        // Update statistics
395        let duration = start.elapsed();
396        let successful_count = self
397            .stats
398            .successful_reloads
399            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
400            + 1;
401        *self.stats.last_success.write().await = Some(Instant::now());
402
403        // Update average duration
404        {
405            let mut avg = self.stats.avg_duration_ms.write().await;
406            let total = successful_count as f64;
407            *avg = (*avg * (total - 1.0) + duration.as_millis() as f64) / total;
408        }
409
410        // Increment config version
411        let new_version = self
412            .stats
413            .config_version
414            .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
415            + 1;
416
417        let _ = self.reload_tx.send(ReloadEvent::Applied {
418            timestamp: Instant::now(),
419            version: format!("v{}", new_version),
420        });
421
422        // Reload TLS certificates (hot-reload)
423        // This picks up any certificate file changes without restart
424        let (cert_success, cert_errors) = self.cert_reloader.reload_all();
425        if !cert_errors.is_empty() {
426            for (listener_id, error) in &cert_errors {
427                error!(
428                    listener_id = %listener_id,
429                    error = %error,
430                    "TLS certificate reload failed for listener"
431                );
432            }
433        }
434
435        info!(
436            duration_ms = duration.as_millis(),
437            successful_reloads = successful_count,
438            route_count = new_config.routes.len(),
439            upstream_count = new_config.upstreams.len(),
440            cert_reload_success = cert_success,
441            cert_reload_errors = cert_errors.len(),
442            "Configuration reload completed successfully"
443        );
444
445        Ok(())
446    }
447
448    /// Rollback to previous configuration
449    pub async fn rollback(&self, reason: String) -> SentinelResult<()> {
450        info!(
451            reason = %reason,
452            "Starting configuration rollback"
453        );
454
455        let previous = self.previous_config.read().await.clone();
456
457        if let Some(prev_config) = previous {
458            trace!(
459                route_count = prev_config.routes.len(),
460                "Found previous configuration for rollback"
461            );
462
463            // Validate previous config (should always pass)
464            trace!("Validating previous configuration");
465            if let Err(e) = self.validate_config(&prev_config).await {
466                error!(
467                    error = %e,
468                    "Previous configuration validation failed during rollback"
469                );
470                return Err(e);
471            }
472
473            // Apply previous configuration
474            trace!("Applying previous configuration");
475            self.current_config.store(prev_config.clone());
476            let rollback_count = self
477                .stats
478                .rollbacks
479                .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
480                + 1;
481
482            let _ = self.reload_tx.send(ReloadEvent::RolledBack {
483                timestamp: Instant::now(),
484                reason: reason.clone(),
485            });
486
487            info!(
488                reason = %reason,
489                rollback_count = rollback_count,
490                route_count = prev_config.routes.len(),
491                "Configuration rolled back successfully"
492            );
493            Ok(())
494        } else {
495            warn!("No previous configuration available for rollback");
496            Err(SentinelError::Config {
497                message: "No previous configuration available".to_string(),
498                source: None,
499            })
500        }
501    }
502
503    /// Validate configuration
504    async fn validate_config(&self, config: &Config) -> SentinelResult<()> {
505        trace!(
506            route_count = config.routes.len(),
507            upstream_count = config.upstreams.len(),
508            "Starting configuration validation"
509        );
510
511        // Built-in validation
512        trace!("Running built-in config validation");
513        config.validate()?;
514
515        // Run custom validators
516        let validators = self.validators.read().await;
517        trace!(
518            validator_count = validators.len(),
519            "Running custom validators"
520        );
521        for validator in validators.iter() {
522            trace!(validator_name = %validator.name(), "Running validator");
523            validator.validate(config).await.map_err(|e| {
524                error!(
525                    validator_name = %validator.name(),
526                    error = %e,
527                    "Validator failed"
528                );
529                e
530            })?;
531        }
532
533        debug!(
534            route_count = config.routes.len(),
535            upstream_count = config.upstreams.len(),
536            "Configuration validation passed"
537        );
538
539        Ok(())
540    }
541
542    /// Add configuration validator
543    pub async fn add_validator(&self, validator: Box<dyn ConfigValidator>) {
544        info!("Adding configuration validator: {}", validator.name());
545        self.validators.write().await.push(validator);
546    }
547
548    /// Add reload hook
549    pub async fn add_hook(&self, hook: Box<dyn ReloadHook>) {
550        info!("Adding reload hook: {}", hook.name());
551        self.reload_hooks.write().await.push(hook);
552    }
553
554    /// Subscribe to reload events
555    pub fn subscribe(&self) -> broadcast::Receiver<ReloadEvent> {
556        self.reload_tx.subscribe()
557    }
558
559    /// Get reload statistics
560    pub fn stats(&self) -> &ReloadStats {
561        &self.stats
562    }
563
564    /// Create a lightweight clone for async tasks
565    fn clone_for_task(&self) -> ConfigManager {
566        ConfigManager {
567            current_config: Arc::clone(&self.current_config),
568            previous_config: Arc::clone(&self.previous_config),
569            config_path: self.config_path.clone(),
570            watcher: self.watcher.clone(),
571            reload_tx: self.reload_tx.clone(),
572            stats: Arc::clone(&self.stats),
573            validators: Arc::clone(&self.validators),
574            reload_hooks: Arc::clone(&self.reload_hooks),
575            cert_reloader: Arc::clone(&self.cert_reloader),
576        }
577    }
578}
579
580// ============================================================================
581// Audit Reload Hook
582// ============================================================================
583
584/// Reload hook that logs configuration changes to the audit log.
585pub struct AuditReloadHook {
586    log_manager: SharedLogManager,
587}
588
589impl AuditReloadHook {
590    /// Create a new audit reload hook with the given log manager.
591    pub fn new(log_manager: SharedLogManager) -> Self {
592        Self { log_manager }
593    }
594}
595
596#[async_trait::async_trait]
597impl ReloadHook for AuditReloadHook {
598    async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> SentinelResult<()> {
599        // Log that reload is starting
600        let trace_id = uuid::Uuid::new_v4().to_string();
601        let audit_entry = AuditLogEntry::config_change(
602            &trace_id,
603            "reload_started",
604            format!(
605                "Configuration reload starting: {} routes -> {} routes, {} upstreams -> {} upstreams",
606                old_config.routes.len(),
607                new_config.routes.len(),
608                old_config.upstreams.len(),
609                new_config.upstreams.len()
610            ),
611        );
612        self.log_manager.log_audit(&audit_entry);
613        Ok(())
614    }
615
616    async fn post_reload(&self, old_config: &Config, new_config: &Config) {
617        // Log successful reload
618        let trace_id = uuid::Uuid::new_v4().to_string();
619        let audit_entry = AuditLogEntry::config_change(
620            &trace_id,
621            "reload_success",
622            format!(
623                "Configuration reload successful: {} routes, {} upstreams, {} listeners",
624                new_config.routes.len(),
625                new_config.upstreams.len(),
626                new_config.listeners.len()
627            ),
628        )
629        .with_metadata("old_routes", old_config.routes.len().to_string())
630        .with_metadata("new_routes", new_config.routes.len().to_string())
631        .with_metadata("old_upstreams", old_config.upstreams.len().to_string())
632        .with_metadata("new_upstreams", new_config.upstreams.len().to_string());
633        self.log_manager.log_audit(&audit_entry);
634    }
635
636    async fn on_failure(&self, config: &Config, error: &SentinelError) {
637        // Log failed reload
638        let trace_id = uuid::Uuid::new_v4().to_string();
639        let audit_entry = AuditLogEntry::config_change(
640            &trace_id,
641            "reload_failed",
642            format!("Configuration reload failed: {}", error),
643        )
644        .with_metadata("current_routes", config.routes.len().to_string())
645        .with_metadata("current_upstreams", config.upstreams.len().to_string());
646        self.log_manager.log_audit(&audit_entry);
647    }
648
649    fn name(&self) -> &str {
650        "audit_reload_hook"
651    }
652}
653
654#[cfg(test)]
655mod tests {
656    use super::*;
657
658    #[tokio::test]
659    async fn test_config_reload_rejects_invalid_config() {
660        // Create valid initial config
661        let initial_config = Config::default_for_testing();
662        let initial_routes = initial_config.routes.len();
663
664        let temp_dir = tempfile::tempdir().unwrap();
665        let config_path = temp_dir.path().join("config.kdl");
666
667        // Write INVALID config (not valid KDL)
668        std::fs::write(&config_path, "this is not valid KDL { {{{{ broken").unwrap();
669
670        // Create config manager with valid initial config
671        let manager = ConfigManager::new(&config_path, initial_config)
672            .await
673            .unwrap();
674
675        // Verify initial config is loaded
676        assert_eq!(manager.current().routes.len(), initial_routes);
677
678        // Attempt reload with invalid config - should fail
679        let result = manager.reload(ReloadTrigger::Manual).await;
680        assert!(result.is_err(), "Reload should fail for invalid config");
681
682        // Verify original config is STILL loaded (not replaced)
683        assert_eq!(
684            manager.current().routes.len(),
685            initial_routes,
686            "Original config should be preserved after failed reload"
687        );
688
689        // Verify failure was recorded in stats
690        assert_eq!(
691            manager
692                .stats()
693                .failed_reloads
694                .load(std::sync::atomic::Ordering::Relaxed),
695            1,
696            "Failed reload should be recorded"
697        );
698    }
699
700    #[tokio::test]
701    async fn test_config_reload_accepts_valid_config() {
702        // Create valid initial config
703        let initial_config = Config::default_for_testing();
704        let temp_dir = tempfile::tempdir().unwrap();
705        let config_path = temp_dir.path().join("config.kdl");
706
707        // Create a static files directory for the test
708        let static_dir = temp_dir.path().join("static");
709        std::fs::create_dir_all(&static_dir).unwrap();
710
711        // Write a valid config with upstream
712        let valid_config = r#"
713server {
714    worker-threads 4
715}
716
717listeners {
718    listener "http" {
719        address "0.0.0.0:8080"
720        protocol "http"
721    }
722}
723
724upstreams {
725    upstream "backend" {
726        target "127.0.0.1:3000"
727    }
728}
729
730routes {
731    route "api" {
732        priority "high"
733        matches {
734            path-prefix "/api/"
735        }
736        upstream "backend"
737    }
738}
739"#;
740        std::fs::write(&config_path, valid_config).unwrap();
741
742        // Create config manager
743        let manager = ConfigManager::new(&config_path, initial_config)
744            .await
745            .unwrap();
746
747        // Reload should succeed with valid config
748        let result = manager.reload(ReloadTrigger::Manual).await;
749        assert!(
750            result.is_ok(),
751            "Reload should succeed for valid config: {:?}",
752            result.err()
753        );
754
755        // Verify success was recorded
756        assert_eq!(
757            manager
758                .stats()
759                .successful_reloads
760                .load(std::sync::atomic::Ordering::Relaxed),
761            1,
762            "Successful reload should be recorded"
763        );
764    }
765
766    // ========================================================================
767    // Concurrent Reload Tests
768    // ========================================================================
769
770    /// Helper to create a valid config file with a specified route count
771    fn write_config_with_routes(path: &Path, route_count: usize) {
772        let mut routes = String::new();
773        for i in 0..route_count {
774            routes.push_str(&format!(
775                r#"
776    route "route{i}" {{
777        priority "medium"
778        matches {{
779            path-prefix "/route{i}/"
780        }}
781        upstream "backend"
782    }}
783"#
784            ));
785        }
786
787        let config = format!(
788            r#"
789server {{
790    worker-threads 4
791}}
792
793listeners {{
794    listener "http" {{
795        address "0.0.0.0:8080"
796        protocol "http"
797    }}
798}}
799
800upstreams {{
801    upstream "backend" {{
802        target "127.0.0.1:3000"
803    }}
804}}
805
806routes {{
807{routes}
808}}
809"#
810        );
811
812        std::fs::write(path, config).unwrap();
813    }
814
815    #[tokio::test]
816    async fn test_concurrent_config_reads_during_reload() {
817        // Test that config reads don't block or panic during reload
818        let initial_config = Config::default_for_testing();
819        let temp_dir = tempfile::tempdir().unwrap();
820        let config_path = temp_dir.path().join("config.kdl");
821
822        write_config_with_routes(&config_path, 5);
823
824        let manager = Arc::new(
825            ConfigManager::new(&config_path, initial_config)
826                .await
827                .unwrap(),
828        );
829
830        // Spawn multiple readers that continuously read config
831        let mut readers = Vec::new();
832        for _ in 0..10 {
833            let manager_clone = Arc::clone(&manager);
834            readers.push(tokio::spawn(async move {
835                let mut read_count = 0;
836                for _ in 0..100 {
837                    let config = manager_clone.current();
838                    // Access config to ensure it's valid
839                    let _ = config.routes.len();
840                    read_count += 1;
841                    tokio::task::yield_now().await;
842                }
843                read_count
844            }));
845        }
846
847        // Simultaneously trigger reload
848        let manager_reload = Arc::clone(&manager);
849        let reload_handle = tokio::spawn(async move {
850            manager_reload.reload(ReloadTrigger::Manual).await
851        });
852
853        // Wait for all readers and the reload
854        let mut total_reads = 0;
855        for reader in readers {
856            total_reads += reader.await.unwrap();
857        }
858
859        let reload_result = reload_handle.await.unwrap();
860        assert!(reload_result.is_ok(), "Reload should succeed");
861        assert_eq!(total_reads, 1000, "All reads should complete");
862    }
863
864    #[tokio::test]
865    async fn test_multiple_concurrent_reloads() {
866        // Test that multiple simultaneous reloads don't cause panics or corruption
867        let initial_config = Config::default_for_testing();
868        let temp_dir = tempfile::tempdir().unwrap();
869        let config_path = temp_dir.path().join("config.kdl");
870
871        write_config_with_routes(&config_path, 3);
872
873        let manager = Arc::new(
874            ConfigManager::new(&config_path, initial_config)
875                .await
876                .unwrap(),
877        );
878
879        // Trigger multiple reloads concurrently
880        let mut reload_handles = Vec::new();
881        for i in 0..5 {
882            let manager_clone = Arc::clone(&manager);
883            let trigger = if i % 2 == 0 {
884                ReloadTrigger::Manual
885            } else {
886                ReloadTrigger::Signal
887            };
888            reload_handles.push(tokio::spawn(async move {
889                manager_clone.reload(trigger).await
890            }));
891        }
892
893        // All reloads should complete (some may fail due to racing, but no panics)
894        let mut success_count = 0;
895        for handle in reload_handles {
896            if handle.await.unwrap().is_ok() {
897                success_count += 1;
898            }
899        }
900
901        // At least one reload should succeed
902        assert!(success_count >= 1, "At least one reload should succeed");
903
904        // Stats should reflect all attempts
905        let total = manager
906            .stats()
907            .total_reloads
908            .load(std::sync::atomic::Ordering::Relaxed);
909        assert_eq!(total, 5, "All reload attempts should be counted");
910    }
911
912    #[tokio::test]
913    async fn test_config_visibility_after_reload() {
914        // Test that new config is immediately visible after reload completes
915        let initial_config = Config::default_for_testing();
916        let initial_route_count = initial_config.routes.len();
917
918        let temp_dir = tempfile::tempdir().unwrap();
919        let config_path = temp_dir.path().join("config.kdl");
920
921        // Start with 2 routes
922        write_config_with_routes(&config_path, 2);
923
924        let manager = ConfigManager::new(&config_path, initial_config)
925            .await
926            .unwrap();
927
928        // Verify initial config
929        assert_eq!(manager.current().routes.len(), initial_route_count);
930
931        // Reload to get 2 routes from file
932        manager.reload(ReloadTrigger::Manual).await.unwrap();
933        assert_eq!(manager.current().routes.len(), 2);
934
935        // Update file to 5 routes and reload
936        write_config_with_routes(&config_path, 5);
937        manager.reload(ReloadTrigger::Manual).await.unwrap();
938        assert_eq!(
939            manager.current().routes.len(),
940            5,
941            "New config should be visible immediately after reload"
942        );
943
944        // Update file to 1 route and reload
945        write_config_with_routes(&config_path, 1);
946        manager.reload(ReloadTrigger::Manual).await.unwrap();
947        assert_eq!(
948            manager.current().routes.len(),
949            1,
950            "Config changes should be visible after each reload"
951        );
952    }
953
954    #[tokio::test]
955    async fn test_rapid_successive_reloads() {
956        // Test rapid-fire reloads don't cause issues
957        let initial_config = Config::default_for_testing();
958        let temp_dir = tempfile::tempdir().unwrap();
959        let config_path = temp_dir.path().join("config.kdl");
960
961        write_config_with_routes(&config_path, 3);
962
963        let manager = ConfigManager::new(&config_path, initial_config)
964            .await
965            .unwrap();
966
967        // Perform 20 rapid reloads
968        for i in 0..20 {
969            // Alternate between different route counts
970            write_config_with_routes(&config_path, (i % 5) + 1);
971            let result = manager.reload(ReloadTrigger::Manual).await;
972            assert!(result.is_ok(), "Reload {} should succeed", i);
973        }
974
975        // Verify final state
976        let stats = manager.stats();
977        assert_eq!(
978            stats
979                .successful_reloads
980                .load(std::sync::atomic::Ordering::Relaxed),
981            20,
982            "All 20 reloads should succeed"
983        );
984        assert_eq!(
985            stats
986                .failed_reloads
987                .load(std::sync::atomic::Ordering::Relaxed),
988            0,
989            "No reloads should fail"
990        );
991    }
992
993    #[tokio::test]
994    async fn test_rollback_preserves_previous_config() {
995        // Test that rollback correctly restores previous configuration
996        let initial_config = Config::default_for_testing();
997        let temp_dir = tempfile::tempdir().unwrap();
998        let config_path = temp_dir.path().join("config.kdl");
999
1000        // Start with 3 routes
1001        write_config_with_routes(&config_path, 3);
1002
1003        let manager = ConfigManager::new(&config_path, initial_config)
1004            .await
1005            .unwrap();
1006
1007        // First reload to establish baseline
1008        manager.reload(ReloadTrigger::Manual).await.unwrap();
1009        assert_eq!(manager.current().routes.len(), 3);
1010
1011        // Second reload with 5 routes
1012        write_config_with_routes(&config_path, 5);
1013        manager.reload(ReloadTrigger::Manual).await.unwrap();
1014        assert_eq!(manager.current().routes.len(), 5);
1015
1016        // Rollback should restore 3 routes
1017        manager
1018            .rollback("Testing rollback".to_string())
1019            .await
1020            .unwrap();
1021        assert_eq!(
1022            manager.current().routes.len(),
1023            3,
1024            "Rollback should restore previous config"
1025        );
1026
1027        // Verify rollback was recorded
1028        assert_eq!(
1029            manager
1030                .stats()
1031                .rollbacks
1032                .load(std::sync::atomic::Ordering::Relaxed),
1033            1,
1034            "Rollback should be recorded in stats"
1035        );
1036    }
1037
1038    #[tokio::test]
1039    async fn test_reload_events_broadcast() {
1040        // Test that reload events are properly broadcast to subscribers
1041        let initial_config = Config::default_for_testing();
1042        let temp_dir = tempfile::tempdir().unwrap();
1043        let config_path = temp_dir.path().join("config.kdl");
1044
1045        write_config_with_routes(&config_path, 2);
1046
1047        let manager = ConfigManager::new(&config_path, initial_config)
1048            .await
1049            .unwrap();
1050
1051        // Subscribe to reload events
1052        let mut receiver = manager.subscribe();
1053
1054        // Trigger reload
1055        manager.reload(ReloadTrigger::Manual).await.unwrap();
1056
1057        // Collect events (non-blocking with timeout)
1058        let mut events = Vec::new();
1059        loop {
1060            match tokio::time::timeout(Duration::from_millis(100), receiver.recv()).await {
1061                Ok(Ok(event)) => events.push(event),
1062                _ => break,
1063            }
1064        }
1065
1066        // Verify we received the expected events
1067        assert!(events.len() >= 2, "Should receive at least Started and Applied/Validated events");
1068
1069        // Check for Started event
1070        assert!(
1071            events.iter().any(|e| matches!(e, ReloadEvent::Started { .. })),
1072            "Should receive Started event"
1073        );
1074
1075        // Check for Applied event (successful reload)
1076        assert!(
1077            events.iter().any(|e| matches!(e, ReloadEvent::Applied { .. })),
1078            "Should receive Applied event on success"
1079        );
1080    }
1081
1082    #[tokio::test]
1083    async fn test_graceful_coordinator_with_reload() {
1084        // Test that GracefulReloadCoordinator correctly tracks requests during reload
1085        let coordinator = GracefulReloadCoordinator::new(Duration::from_secs(5));
1086
1087        // Simulate requests starting
1088        coordinator.inc_requests();
1089        coordinator.inc_requests();
1090        coordinator.inc_requests();
1091        assert_eq!(coordinator.active_count(), 3);
1092
1093        // Simulate one request completing during reload prep
1094        coordinator.dec_requests();
1095        assert_eq!(coordinator.active_count(), 2);
1096
1097        // Start drain in background
1098        let coord_clone = Arc::new(coordinator);
1099        let coord_for_drain = Arc::clone(&coord_clone);
1100        let drain_handle = tokio::spawn(async move {
1101            coord_for_drain.wait_for_drain().await
1102        });
1103
1104        // Simulate remaining requests completing
1105        tokio::time::sleep(Duration::from_millis(50)).await;
1106        coord_clone.dec_requests();
1107        tokio::time::sleep(Duration::from_millis(50)).await;
1108        coord_clone.dec_requests();
1109
1110        // Drain should complete successfully
1111        let drained = drain_handle.await.unwrap();
1112        assert!(drained, "All requests should drain successfully");
1113    }
1114
1115    #[tokio::test]
1116    async fn test_graceful_coordinator_drain_timeout() {
1117        // Test that drain times out correctly when requests don't complete
1118        let coordinator = GracefulReloadCoordinator::new(Duration::from_millis(200));
1119
1120        // Simulate stuck requests
1121        coordinator.inc_requests();
1122        coordinator.inc_requests();
1123
1124        // Start drain - should timeout
1125        let drained = coordinator.wait_for_drain().await;
1126        assert!(!drained, "Drain should timeout with stuck requests");
1127        assert_eq!(coordinator.active_count(), 2, "Requests should still be tracked");
1128    }
1129}