Skip to main content

grapsus_proxy/reload/
mod.rs

1//! Configuration hot reload module for Grapsus proxy.
2//!
3//! This module implements zero-downtime configuration reloading with validation,
4//! atomic swaps, and rollback support for production reliability.
5//!
6//! ## Submodules
7//!
8//! - `coordinator`: Graceful reload coordination and request draining
9//! - `signals`: OS signal handling (SIGHUP, SIGTERM)
10//! - `validators`: Runtime configuration validators
11
12mod coordinator;
13mod signals;
14mod validators;
15
16pub use coordinator::GracefulReloadCoordinator;
17pub use signals::{SignalManager, SignalType};
18pub use validators::{RouteValidator, UpstreamValidator};
19
20// Re-export for use by proxy initialization
21
22use arc_swap::ArcSwap;
23use notify::{Event, EventKind, RecursiveMode, Watcher};
24use std::path::{Path, PathBuf};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tokio::sync::{broadcast, RwLock};
28use tracing::{debug, error, info, trace, warn};
29
30use grapsus_common::errors::{GrapsusError, GrapsusResult};
31use grapsus_config::Config;
32
33use crate::logging::{AuditLogEntry, SharedLogManager};
34use crate::tls::CertificateReloader;
35
36// ============================================================================
37// Reload Events and Types
38// ============================================================================
39
40/// Reload event types
41#[derive(Debug, Clone)]
42pub enum ReloadEvent {
43    /// Configuration reload started
44    Started {
45        timestamp: Instant,
46        trigger: ReloadTrigger,
47    },
48    /// Configuration validated successfully
49    Validated { timestamp: Instant },
50    /// Configuration applied successfully
51    Applied { timestamp: Instant, version: String },
52    /// Configuration reload failed
53    Failed { timestamp: Instant, error: String },
54    /// Configuration rolled back
55    RolledBack { timestamp: Instant, reason: String },
56}
57
58/// Reload trigger source
59#[derive(Debug, Clone)]
60pub enum ReloadTrigger {
61    /// Manual reload via API
62    Manual,
63    /// File change detected
64    FileChange,
65    /// Signal received (SIGHUP)
66    Signal,
67    /// Scheduled reload
68    Scheduled,
69}
70
71// ============================================================================
72// Traits
73// ============================================================================
74
75/// Configuration validator trait
76#[async_trait::async_trait]
77pub trait ConfigValidator: Send + Sync {
78    /// Validate configuration before applying
79    async fn validate(&self, config: &Config) -> GrapsusResult<()>;
80
81    /// Validator name for logging
82    fn name(&self) -> &str;
83}
84
85/// Reload hook trait for custom actions
86#[async_trait::async_trait]
87pub trait ReloadHook: Send + Sync {
88    /// Called before reload starts
89    async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> GrapsusResult<()>;
90
91    /// Called after successful reload
92    async fn post_reload(&self, old_config: &Config, new_config: &Config);
93
94    /// Called on reload failure
95    async fn on_failure(&self, config: &Config, error: &GrapsusError);
96
97    /// Hook name for logging
98    fn name(&self) -> &str;
99}
100
101// ============================================================================
102// Reload Statistics
103// ============================================================================
104
105/// Reload statistics
106#[derive(Default)]
107pub struct ReloadStats {
108    /// Total reload attempts
109    pub total_reloads: std::sync::atomic::AtomicU64,
110    /// Successful reloads
111    pub successful_reloads: std::sync::atomic::AtomicU64,
112    /// Failed reloads
113    pub failed_reloads: std::sync::atomic::AtomicU64,
114    /// Rollbacks performed
115    pub rollbacks: std::sync::atomic::AtomicU64,
116    /// Current config version (incremented on each successful reload)
117    pub config_version: std::sync::atomic::AtomicU64,
118    /// Last successful reload time
119    pub last_success: RwLock<Option<Instant>>,
120    /// Last failure time
121    pub last_failure: RwLock<Option<Instant>>,
122    /// Average reload duration
123    pub avg_duration_ms: RwLock<f64>,
124}
125
126// ============================================================================
127// Configuration Manager
128// ============================================================================
129
130/// Configuration manager with hot reload support
131pub struct ConfigManager {
132    /// Current active configuration
133    current_config: Arc<ArcSwap<Config>>,
134    /// Previous configuration for rollback
135    previous_config: Arc<RwLock<Option<Arc<Config>>>>,
136    /// Configuration file path
137    config_path: PathBuf,
138    /// File watcher for auto-reload (uses RwLock for interior mutability)
139    watcher: Arc<RwLock<Option<notify::RecommendedWatcher>>>,
140    /// Reload event broadcaster
141    reload_tx: broadcast::Sender<ReloadEvent>,
142    /// Reload statistics
143    stats: Arc<ReloadStats>,
144    /// Validation hooks
145    validators: Arc<RwLock<Vec<Box<dyn ConfigValidator>>>>,
146    /// Reload hooks
147    reload_hooks: Arc<RwLock<Vec<Box<dyn ReloadHook>>>>,
148    /// Certificate reloader for TLS hot-reload
149    cert_reloader: Arc<CertificateReloader>,
150}
151
152impl ConfigManager {
153    /// Create new configuration manager
154    pub async fn new(
155        config_path: impl AsRef<Path>,
156        initial_config: Config,
157    ) -> GrapsusResult<Self> {
158        let config_path = config_path.as_ref().to_path_buf();
159        let (reload_tx, _) = broadcast::channel(100);
160
161        info!(
162            config_path = %config_path.display(),
163            route_count = initial_config.routes.len(),
164            upstream_count = initial_config.upstreams.len(),
165            listener_count = initial_config.listeners.len(),
166            "Initializing configuration manager"
167        );
168
169        trace!(
170            config_path = %config_path.display(),
171            "Creating ArcSwap for configuration"
172        );
173
174        Ok(Self {
175            current_config: Arc::new(ArcSwap::from_pointee(initial_config)),
176            previous_config: Arc::new(RwLock::new(None)),
177            config_path,
178            watcher: Arc::new(RwLock::new(None)),
179            reload_tx,
180            stats: Arc::new(ReloadStats::default()),
181            validators: Arc::new(RwLock::new(Vec::new())),
182            reload_hooks: Arc::new(RwLock::new(Vec::new())),
183            cert_reloader: Arc::new(CertificateReloader::new()),
184        })
185    }
186
187    /// Get the certificate reloader for registering TLS listeners
188    pub fn cert_reloader(&self) -> Arc<CertificateReloader> {
189        Arc::clone(&self.cert_reloader)
190    }
191
192    /// Get current configuration
193    pub fn current(&self) -> Arc<Config> {
194        self.current_config.load_full()
195    }
196
197    /// Start watching configuration file for changes
198    ///
199    /// When enabled, the proxy will automatically reload configuration
200    /// when the config file is modified.
201    pub async fn start_watching(&self) -> GrapsusResult<()> {
202        // Check if already watching
203        if self.watcher.read().await.is_some() {
204            warn!("File watcher already active, skipping");
205            return Ok(());
206        }
207
208        let config_path = self.config_path.clone();
209
210        // Create file watcher
211        let (tx, mut rx) = tokio::sync::mpsc::channel(10);
212
213        let mut watcher =
214            notify::recommended_watcher(move |event: Result<Event, notify::Error>| {
215                if let Ok(event) = event {
216                    let _ = tx.blocking_send(event);
217                }
218            })
219            .map_err(|e| GrapsusError::Config {
220                message: format!("Failed to create file watcher: {}", e),
221                source: None,
222            })?;
223
224        // Watch config file
225        watcher
226            .watch(&config_path, RecursiveMode::NonRecursive)
227            .map_err(|e| GrapsusError::Config {
228                message: format!("Failed to watch config file: {}", e),
229                source: None,
230            })?;
231
232        // Store watcher using interior mutability
233        *self.watcher.write().await = Some(watcher);
234
235        // Spawn event handler task
236        let manager = Arc::new(self.clone_for_task());
237        tokio::spawn(async move {
238            while let Some(event) = rx.recv().await {
239                if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_)) {
240                    info!("Configuration file changed, triggering reload");
241
242                    // Debounce rapid changes
243                    tokio::time::sleep(Duration::from_millis(100)).await;
244
245                    if let Err(e) = manager.reload(ReloadTrigger::FileChange).await {
246                        error!("Auto-reload failed: {}", e);
247                        error!("Continuing with current configuration");
248                    }
249                }
250            }
251        });
252
253        info!(
254            "Auto-reload enabled: watching configuration file {:?}",
255            self.config_path
256        );
257        Ok(())
258    }
259
260    /// Reload configuration
261    pub async fn reload(&self, trigger: ReloadTrigger) -> GrapsusResult<()> {
262        let start = Instant::now();
263        let reload_num = self
264            .stats
265            .total_reloads
266            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
267            + 1;
268
269        info!(
270            trigger = ?trigger,
271            reload_num = reload_num,
272            config_path = %self.config_path.display(),
273            "Starting configuration reload"
274        );
275
276        // Notify reload started
277        let _ = self.reload_tx.send(ReloadEvent::Started {
278            timestamp: Instant::now(),
279            trigger: trigger.clone(),
280        });
281
282        trace!(
283            config_path = %self.config_path.display(),
284            "Reading configuration file"
285        );
286
287        // Load new configuration
288        let new_config = match Config::from_file(&self.config_path) {
289            Ok(config) => {
290                debug!(
291                    route_count = config.routes.len(),
292                    upstream_count = config.upstreams.len(),
293                    listener_count = config.listeners.len(),
294                    "Configuration file parsed successfully"
295                );
296                config
297            }
298            Err(e) => {
299                let error_msg = format!("Failed to load configuration: {}", e);
300                error!(
301                    config_path = %self.config_path.display(),
302                    error = %e,
303                    "Failed to load configuration file"
304                );
305                self.stats
306                    .failed_reloads
307                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
308                *self.stats.last_failure.write().await = Some(Instant::now());
309
310                let _ = self.reload_tx.send(ReloadEvent::Failed {
311                    timestamp: Instant::now(),
312                    error: error_msg.clone(),
313                });
314
315                return Err(GrapsusError::Config {
316                    message: error_msg,
317                    source: None,
318                });
319            }
320        };
321
322        trace!("Starting configuration validation");
323
324        // Validate new configuration BEFORE applying
325        // This is critical - invalid configs must never be loaded
326        if let Err(e) = self.validate_config(&new_config).await {
327            error!(
328                error = %e,
329                "Configuration validation failed - new configuration REJECTED"
330            );
331            self.stats
332                .failed_reloads
333                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
334            *self.stats.last_failure.write().await = Some(Instant::now());
335
336            let _ = self.reload_tx.send(ReloadEvent::Failed {
337                timestamp: Instant::now(),
338                error: e.to_string(),
339            });
340
341            return Err(e);
342        }
343
344        info!(
345            route_count = new_config.routes.len(),
346            upstream_count = new_config.upstreams.len(),
347            "Configuration validation passed, applying new configuration"
348        );
349
350        let _ = self.reload_tx.send(ReloadEvent::Validated {
351            timestamp: Instant::now(),
352        });
353
354        // Get current config for rollback
355        let old_config = self.current_config.load_full();
356
357        trace!(
358            old_routes = old_config.routes.len(),
359            new_routes = new_config.routes.len(),
360            "Preparing configuration swap"
361        );
362
363        // Run pre-reload hooks
364        let hooks = self.reload_hooks.read().await;
365        for hook in hooks.iter() {
366            trace!(hook_name = %hook.name(), "Running pre-reload hook");
367            if let Err(e) = hook.pre_reload(&old_config, &new_config).await {
368                warn!(
369                    hook_name = %hook.name(),
370                    error = %e,
371                    "Pre-reload hook failed"
372                );
373                // Continue with reload despite hook failure
374            }
375        }
376        drop(hooks);
377
378        // Save previous config for rollback
379        trace!("Saving previous configuration for potential rollback");
380        *self.previous_config.write().await = Some(old_config.clone());
381
382        // Apply new configuration atomically
383        trace!("Applying new configuration atomically");
384        self.current_config.store(Arc::new(new_config.clone()));
385
386        // Run post-reload hooks
387        let hooks = self.reload_hooks.read().await;
388        for hook in hooks.iter() {
389            trace!(hook_name = %hook.name(), "Running post-reload hook");
390            hook.post_reload(&old_config, &new_config).await;
391        }
392        drop(hooks);
393
394        // Update statistics
395        let duration = start.elapsed();
396        let successful_count = self
397            .stats
398            .successful_reloads
399            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
400            + 1;
401        *self.stats.last_success.write().await = Some(Instant::now());
402
403        // Update average duration
404        {
405            let mut avg = self.stats.avg_duration_ms.write().await;
406            let total = successful_count as f64;
407            *avg = (*avg * (total - 1.0) + duration.as_millis() as f64) / total;
408        }
409
410        // Increment config version
411        let new_version = self
412            .stats
413            .config_version
414            .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
415            + 1;
416
417        let _ = self.reload_tx.send(ReloadEvent::Applied {
418            timestamp: Instant::now(),
419            version: format!("v{}", new_version),
420        });
421
422        // Reload TLS certificates (hot-reload)
423        // This picks up any certificate file changes without restart
424        let (cert_success, cert_errors) = self.cert_reloader.reload_all();
425        if !cert_errors.is_empty() {
426            for (listener_id, error) in &cert_errors {
427                error!(
428                    listener_id = %listener_id,
429                    error = %error,
430                    "TLS certificate reload failed for listener"
431                );
432            }
433        }
434
435        info!(
436            duration_ms = duration.as_millis(),
437            successful_reloads = successful_count,
438            route_count = new_config.routes.len(),
439            upstream_count = new_config.upstreams.len(),
440            cert_reload_success = cert_success,
441            cert_reload_errors = cert_errors.len(),
442            "Configuration reload completed successfully"
443        );
444
445        Ok(())
446    }
447
448    /// Rollback to previous configuration
449    pub async fn rollback(&self, reason: String) -> GrapsusResult<()> {
450        info!(
451            reason = %reason,
452            "Starting configuration rollback"
453        );
454
455        let previous = self.previous_config.read().await.clone();
456
457        if let Some(prev_config) = previous {
458            trace!(
459                route_count = prev_config.routes.len(),
460                "Found previous configuration for rollback"
461            );
462
463            // Validate previous config (should always pass)
464            trace!("Validating previous configuration");
465            if let Err(e) = self.validate_config(&prev_config).await {
466                error!(
467                    error = %e,
468                    "Previous configuration validation failed during rollback"
469                );
470                return Err(e);
471            }
472
473            // Apply previous configuration
474            trace!("Applying previous configuration");
475            self.current_config.store(prev_config.clone());
476            let rollback_count = self
477                .stats
478                .rollbacks
479                .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
480                + 1;
481
482            let _ = self.reload_tx.send(ReloadEvent::RolledBack {
483                timestamp: Instant::now(),
484                reason: reason.clone(),
485            });
486
487            info!(
488                reason = %reason,
489                rollback_count = rollback_count,
490                route_count = prev_config.routes.len(),
491                "Configuration rolled back successfully"
492            );
493            Ok(())
494        } else {
495            warn!("No previous configuration available for rollback");
496            Err(GrapsusError::Config {
497                message: "No previous configuration available".to_string(),
498                source: None,
499            })
500        }
501    }
502
503    /// Validate configuration
504    async fn validate_config(&self, config: &Config) -> GrapsusResult<()> {
505        trace!(
506            route_count = config.routes.len(),
507            upstream_count = config.upstreams.len(),
508            "Starting configuration validation"
509        );
510
511        // Built-in validation
512        trace!("Running built-in config validation");
513        config.validate()?;
514
515        // Run custom validators
516        let validators = self.validators.read().await;
517        trace!(
518            validator_count = validators.len(),
519            "Running custom validators"
520        );
521        for validator in validators.iter() {
522            trace!(validator_name = %validator.name(), "Running validator");
523            validator.validate(config).await.map_err(|e| {
524                error!(
525                    validator_name = %validator.name(),
526                    error = %e,
527                    "Validator failed"
528                );
529                e
530            })?;
531        }
532
533        debug!(
534            route_count = config.routes.len(),
535            upstream_count = config.upstreams.len(),
536            "Configuration validation passed"
537        );
538
539        Ok(())
540    }
541
542    /// Add configuration validator
543    pub async fn add_validator(&self, validator: Box<dyn ConfigValidator>) {
544        info!("Adding configuration validator: {}", validator.name());
545        self.validators.write().await.push(validator);
546    }
547
548    /// Add reload hook
549    pub async fn add_hook(&self, hook: Box<dyn ReloadHook>) {
550        info!("Adding reload hook: {}", hook.name());
551        self.reload_hooks.write().await.push(hook);
552    }
553
554    /// Subscribe to reload events
555    pub fn subscribe(&self) -> broadcast::Receiver<ReloadEvent> {
556        self.reload_tx.subscribe()
557    }
558
559    /// Get reload statistics
560    pub fn stats(&self) -> &ReloadStats {
561        &self.stats
562    }
563
564    /// Create a lightweight clone for async tasks
565    fn clone_for_task(&self) -> ConfigManager {
566        ConfigManager {
567            current_config: Arc::clone(&self.current_config),
568            previous_config: Arc::clone(&self.previous_config),
569            config_path: self.config_path.clone(),
570            watcher: self.watcher.clone(),
571            reload_tx: self.reload_tx.clone(),
572            stats: Arc::clone(&self.stats),
573            validators: Arc::clone(&self.validators),
574            reload_hooks: Arc::clone(&self.reload_hooks),
575            cert_reloader: Arc::clone(&self.cert_reloader),
576        }
577    }
578}
579
580// ============================================================================
581// Audit Reload Hook
582// ============================================================================
583
584/// Reload hook that logs configuration changes to the audit log.
585pub struct AuditReloadHook {
586    log_manager: SharedLogManager,
587}
588
589impl AuditReloadHook {
590    /// Create a new audit reload hook with the given log manager.
591    pub fn new(log_manager: SharedLogManager) -> Self {
592        Self { log_manager }
593    }
594}
595
596#[async_trait::async_trait]
597impl ReloadHook for AuditReloadHook {
598    async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> GrapsusResult<()> {
599        // Log that reload is starting
600        let trace_id = uuid::Uuid::new_v4().to_string();
601        let audit_entry = AuditLogEntry::config_change(
602            &trace_id,
603            "reload_started",
604            format!(
605                "Configuration reload starting: {} routes -> {} routes, {} upstreams -> {} upstreams",
606                old_config.routes.len(),
607                new_config.routes.len(),
608                old_config.upstreams.len(),
609                new_config.upstreams.len()
610            ),
611        );
612        self.log_manager.log_audit(&audit_entry);
613        Ok(())
614    }
615
616    async fn post_reload(&self, old_config: &Config, new_config: &Config) {
617        // Log successful reload
618        let trace_id = uuid::Uuid::new_v4().to_string();
619        let audit_entry = AuditLogEntry::config_change(
620            &trace_id,
621            "reload_success",
622            format!(
623                "Configuration reload successful: {} routes, {} upstreams, {} listeners",
624                new_config.routes.len(),
625                new_config.upstreams.len(),
626                new_config.listeners.len()
627            ),
628        )
629        .with_metadata("old_routes", old_config.routes.len().to_string())
630        .with_metadata("new_routes", new_config.routes.len().to_string())
631        .with_metadata("old_upstreams", old_config.upstreams.len().to_string())
632        .with_metadata("new_upstreams", new_config.upstreams.len().to_string());
633        self.log_manager.log_audit(&audit_entry);
634    }
635
636    async fn on_failure(&self, config: &Config, error: &GrapsusError) {
637        // Log failed reload
638        let trace_id = uuid::Uuid::new_v4().to_string();
639        let audit_entry = AuditLogEntry::config_change(
640            &trace_id,
641            "reload_failed",
642            format!("Configuration reload failed: {}", error),
643        )
644        .with_metadata("current_routes", config.routes.len().to_string())
645        .with_metadata("current_upstreams", config.upstreams.len().to_string());
646        self.log_manager.log_audit(&audit_entry);
647    }
648
649    fn name(&self) -> &str {
650        "audit_reload_hook"
651    }
652}
653
654#[cfg(test)]
655mod tests {
656    use super::*;
657
658    #[tokio::test]
659    async fn test_config_reload_rejects_invalid_config() {
660        // Create valid initial config
661        let initial_config = Config::default_for_testing();
662        let initial_routes = initial_config.routes.len();
663
664        let temp_dir = tempfile::tempdir().unwrap();
665        let config_path = temp_dir.path().join("config.kdl");
666
667        // Write INVALID config (not valid KDL)
668        std::fs::write(&config_path, "this is not valid KDL { {{{{ broken").unwrap();
669
670        // Create config manager with valid initial config
671        let manager = ConfigManager::new(&config_path, initial_config)
672            .await
673            .unwrap();
674
675        // Verify initial config is loaded
676        assert_eq!(manager.current().routes.len(), initial_routes);
677
678        // Attempt reload with invalid config - should fail
679        let result = manager.reload(ReloadTrigger::Manual).await;
680        assert!(result.is_err(), "Reload should fail for invalid config");
681
682        // Verify original config is STILL loaded (not replaced)
683        assert_eq!(
684            manager.current().routes.len(),
685            initial_routes,
686            "Original config should be preserved after failed reload"
687        );
688
689        // Verify failure was recorded in stats
690        assert_eq!(
691            manager
692                .stats()
693                .failed_reloads
694                .load(std::sync::atomic::Ordering::Relaxed),
695            1,
696            "Failed reload should be recorded"
697        );
698    }
699
700    #[tokio::test]
701    async fn test_config_reload_accepts_valid_config() {
702        // Create valid initial config
703        let initial_config = Config::default_for_testing();
704        let temp_dir = tempfile::tempdir().unwrap();
705        let config_path = temp_dir.path().join("config.kdl");
706
707        // Create a static files directory for the test
708        let static_dir = temp_dir.path().join("static");
709        std::fs::create_dir_all(&static_dir).unwrap();
710
711        // Write a valid config with upstream
712        let valid_config = r#"
713server {
714    worker-threads 4
715}
716
717listeners {
718    listener "http" {
719        address "0.0.0.0:8080"
720        protocol "http"
721    }
722}
723
724upstreams {
725    upstream "backend" {
726        target "127.0.0.1:3000"
727    }
728}
729
730routes {
731    route "api" {
732        priority "high"
733        matches {
734            path-prefix "/api/"
735        }
736        upstream "backend"
737    }
738}
739"#;
740        std::fs::write(&config_path, valid_config).unwrap();
741
742        // Create config manager
743        let manager = ConfigManager::new(&config_path, initial_config)
744            .await
745            .unwrap();
746
747        // Reload should succeed with valid config
748        let result = manager.reload(ReloadTrigger::Manual).await;
749        assert!(
750            result.is_ok(),
751            "Reload should succeed for valid config: {:?}",
752            result.err()
753        );
754
755        // Verify success was recorded
756        assert_eq!(
757            manager
758                .stats()
759                .successful_reloads
760                .load(std::sync::atomic::Ordering::Relaxed),
761            1,
762            "Successful reload should be recorded"
763        );
764    }
765
766    // ========================================================================
767    // Concurrent Reload Tests
768    // ========================================================================
769
770    /// Helper to create a valid config file with a specified route count
771    fn write_config_with_routes(path: &Path, route_count: usize) {
772        let mut routes = String::new();
773        for i in 0..route_count {
774            routes.push_str(&format!(
775                r#"
776    route "route{i}" {{
777        priority "medium"
778        matches {{
779            path-prefix "/route{i}/"
780        }}
781        upstream "backend"
782    }}
783"#
784            ));
785        }
786
787        let config = format!(
788            r#"
789server {{
790    worker-threads 4
791}}
792
793listeners {{
794    listener "http" {{
795        address "0.0.0.0:8080"
796        protocol "http"
797    }}
798}}
799
800upstreams {{
801    upstream "backend" {{
802        target "127.0.0.1:3000"
803    }}
804}}
805
806routes {{
807{routes}
808}}
809"#
810        );
811
812        std::fs::write(path, config).unwrap();
813    }
814
815    #[tokio::test]
816    async fn test_concurrent_config_reads_during_reload() {
817        // Test that config reads don't block or panic during reload
818        let initial_config = Config::default_for_testing();
819        let temp_dir = tempfile::tempdir().unwrap();
820        let config_path = temp_dir.path().join("config.kdl");
821
822        write_config_with_routes(&config_path, 5);
823
824        let manager = Arc::new(
825            ConfigManager::new(&config_path, initial_config)
826                .await
827                .unwrap(),
828        );
829
830        // Spawn multiple readers that continuously read config
831        let mut readers = Vec::new();
832        for _ in 0..10 {
833            let manager_clone = Arc::clone(&manager);
834            readers.push(tokio::spawn(async move {
835                let mut read_count = 0;
836                for _ in 0..100 {
837                    let config = manager_clone.current();
838                    // Access config to ensure it's valid
839                    let _ = config.routes.len();
840                    read_count += 1;
841                    tokio::task::yield_now().await;
842                }
843                read_count
844            }));
845        }
846
847        // Simultaneously trigger reload
848        let manager_reload = Arc::clone(&manager);
849        let reload_handle =
850            tokio::spawn(async move { manager_reload.reload(ReloadTrigger::Manual).await });
851
852        // Wait for all readers and the reload
853        let mut total_reads = 0;
854        for reader in readers {
855            total_reads += reader.await.unwrap();
856        }
857
858        let reload_result = reload_handle.await.unwrap();
859        assert!(reload_result.is_ok(), "Reload should succeed");
860        assert_eq!(total_reads, 1000, "All reads should complete");
861    }
862
863    #[tokio::test]
864    async fn test_multiple_concurrent_reloads() {
865        // Test that multiple simultaneous reloads don't cause panics or corruption
866        let initial_config = Config::default_for_testing();
867        let temp_dir = tempfile::tempdir().unwrap();
868        let config_path = temp_dir.path().join("config.kdl");
869
870        write_config_with_routes(&config_path, 3);
871
872        let manager = Arc::new(
873            ConfigManager::new(&config_path, initial_config)
874                .await
875                .unwrap(),
876        );
877
878        // Trigger multiple reloads concurrently
879        let mut reload_handles = Vec::new();
880        for i in 0..5 {
881            let manager_clone = Arc::clone(&manager);
882            let trigger = if i % 2 == 0 {
883                ReloadTrigger::Manual
884            } else {
885                ReloadTrigger::Signal
886            };
887            reload_handles.push(tokio::spawn(
888                async move { manager_clone.reload(trigger).await },
889            ));
890        }
891
892        // All reloads should complete (some may fail due to racing, but no panics)
893        let mut success_count = 0;
894        for handle in reload_handles {
895            if handle.await.unwrap().is_ok() {
896                success_count += 1;
897            }
898        }
899
900        // At least one reload should succeed
901        assert!(success_count >= 1, "At least one reload should succeed");
902
903        // Stats should reflect all attempts
904        let total = manager
905            .stats()
906            .total_reloads
907            .load(std::sync::atomic::Ordering::Relaxed);
908        assert_eq!(total, 5, "All reload attempts should be counted");
909    }
910
911    #[tokio::test]
912    async fn test_config_visibility_after_reload() {
913        // Test that new config is immediately visible after reload completes
914        let initial_config = Config::default_for_testing();
915        let initial_route_count = initial_config.routes.len();
916
917        let temp_dir = tempfile::tempdir().unwrap();
918        let config_path = temp_dir.path().join("config.kdl");
919
920        // Start with 2 routes
921        write_config_with_routes(&config_path, 2);
922
923        let manager = ConfigManager::new(&config_path, initial_config)
924            .await
925            .unwrap();
926
927        // Verify initial config
928        assert_eq!(manager.current().routes.len(), initial_route_count);
929
930        // Reload to get 2 routes from file
931        manager.reload(ReloadTrigger::Manual).await.unwrap();
932        assert_eq!(manager.current().routes.len(), 2);
933
934        // Update file to 5 routes and reload
935        write_config_with_routes(&config_path, 5);
936        manager.reload(ReloadTrigger::Manual).await.unwrap();
937        assert_eq!(
938            manager.current().routes.len(),
939            5,
940            "New config should be visible immediately after reload"
941        );
942
943        // Update file to 1 route and reload
944        write_config_with_routes(&config_path, 1);
945        manager.reload(ReloadTrigger::Manual).await.unwrap();
946        assert_eq!(
947            manager.current().routes.len(),
948            1,
949            "Config changes should be visible after each reload"
950        );
951    }
952
953    #[tokio::test]
954    async fn test_rapid_successive_reloads() {
955        // Test rapid-fire reloads don't cause issues
956        let initial_config = Config::default_for_testing();
957        let temp_dir = tempfile::tempdir().unwrap();
958        let config_path = temp_dir.path().join("config.kdl");
959
960        write_config_with_routes(&config_path, 3);
961
962        let manager = ConfigManager::new(&config_path, initial_config)
963            .await
964            .unwrap();
965
966        // Perform 20 rapid reloads
967        for i in 0..20 {
968            // Alternate between different route counts
969            write_config_with_routes(&config_path, (i % 5) + 1);
970            let result = manager.reload(ReloadTrigger::Manual).await;
971            assert!(result.is_ok(), "Reload {} should succeed", i);
972        }
973
974        // Verify final state
975        let stats = manager.stats();
976        assert_eq!(
977            stats
978                .successful_reloads
979                .load(std::sync::atomic::Ordering::Relaxed),
980            20,
981            "All 20 reloads should succeed"
982        );
983        assert_eq!(
984            stats
985                .failed_reloads
986                .load(std::sync::atomic::Ordering::Relaxed),
987            0,
988            "No reloads should fail"
989        );
990    }
991
992    #[tokio::test]
993    async fn test_rollback_preserves_previous_config() {
994        // Test that rollback correctly restores previous configuration
995        let initial_config = Config::default_for_testing();
996        let temp_dir = tempfile::tempdir().unwrap();
997        let config_path = temp_dir.path().join("config.kdl");
998
999        // Start with 3 routes
1000        write_config_with_routes(&config_path, 3);
1001
1002        let manager = ConfigManager::new(&config_path, initial_config)
1003            .await
1004            .unwrap();
1005
1006        // First reload to establish baseline
1007        manager.reload(ReloadTrigger::Manual).await.unwrap();
1008        assert_eq!(manager.current().routes.len(), 3);
1009
1010        // Second reload with 5 routes
1011        write_config_with_routes(&config_path, 5);
1012        manager.reload(ReloadTrigger::Manual).await.unwrap();
1013        assert_eq!(manager.current().routes.len(), 5);
1014
1015        // Rollback should restore 3 routes
1016        manager
1017            .rollback("Testing rollback".to_string())
1018            .await
1019            .unwrap();
1020        assert_eq!(
1021            manager.current().routes.len(),
1022            3,
1023            "Rollback should restore previous config"
1024        );
1025
1026        // Verify rollback was recorded
1027        assert_eq!(
1028            manager
1029                .stats()
1030                .rollbacks
1031                .load(std::sync::atomic::Ordering::Relaxed),
1032            1,
1033            "Rollback should be recorded in stats"
1034        );
1035    }
1036
1037    #[tokio::test]
1038    async fn test_reload_events_broadcast() {
1039        // Test that reload events are properly broadcast to subscribers
1040        let initial_config = Config::default_for_testing();
1041        let temp_dir = tempfile::tempdir().unwrap();
1042        let config_path = temp_dir.path().join("config.kdl");
1043
1044        write_config_with_routes(&config_path, 2);
1045
1046        let manager = ConfigManager::new(&config_path, initial_config)
1047            .await
1048            .unwrap();
1049
1050        // Subscribe to reload events
1051        let mut receiver = manager.subscribe();
1052
1053        // Trigger reload
1054        manager.reload(ReloadTrigger::Manual).await.unwrap();
1055
1056        // Collect events (non-blocking with timeout)
1057        let mut events = Vec::new();
1058        while let Ok(Ok(event)) =
1059            tokio::time::timeout(Duration::from_millis(100), receiver.recv()).await
1060        {
1061            events.push(event);
1062        }
1063
1064        // Verify we received the expected events
1065        assert!(
1066            events.len() >= 2,
1067            "Should receive at least Started and Applied/Validated events"
1068        );
1069
1070        // Check for Started event
1071        assert!(
1072            events
1073                .iter()
1074                .any(|e| matches!(e, ReloadEvent::Started { .. })),
1075            "Should receive Started event"
1076        );
1077
1078        // Check for Applied event (successful reload)
1079        assert!(
1080            events
1081                .iter()
1082                .any(|e| matches!(e, ReloadEvent::Applied { .. })),
1083            "Should receive Applied event on success"
1084        );
1085    }
1086
1087    #[tokio::test]
1088    async fn test_graceful_coordinator_with_reload() {
1089        // Test that GracefulReloadCoordinator correctly tracks requests during reload
1090        let coordinator = GracefulReloadCoordinator::new(Duration::from_secs(5));
1091
1092        // Simulate requests starting
1093        coordinator.inc_requests();
1094        coordinator.inc_requests();
1095        coordinator.inc_requests();
1096        assert_eq!(coordinator.active_count(), 3);
1097
1098        // Simulate one request completing during reload prep
1099        coordinator.dec_requests();
1100        assert_eq!(coordinator.active_count(), 2);
1101
1102        // Start drain in background
1103        let coord_clone = Arc::new(coordinator);
1104        let coord_for_drain = Arc::clone(&coord_clone);
1105        let drain_handle = tokio::spawn(async move { coord_for_drain.wait_for_drain().await });
1106
1107        // Simulate remaining requests completing
1108        tokio::time::sleep(Duration::from_millis(50)).await;
1109        coord_clone.dec_requests();
1110        tokio::time::sleep(Duration::from_millis(50)).await;
1111        coord_clone.dec_requests();
1112
1113        // Drain should complete successfully
1114        let drained = drain_handle.await.unwrap();
1115        assert!(drained, "All requests should drain successfully");
1116    }
1117
1118    #[tokio::test]
1119    async fn test_graceful_coordinator_drain_timeout() {
1120        // Test that drain times out correctly when requests don't complete
1121        let coordinator = GracefulReloadCoordinator::new(Duration::from_millis(200));
1122
1123        // Simulate stuck requests
1124        coordinator.inc_requests();
1125        coordinator.inc_requests();
1126
1127        // Start drain - should timeout
1128        let drained = coordinator.wait_for_drain().await;
1129        assert!(!drained, "Drain should timeout with stuck requests");
1130        assert_eq!(
1131            coordinator.active_count(),
1132            2,
1133            "Requests should still be tracked"
1134        );
1135    }
1136}