Skip to main content

zentinel_proxy/reload/
mod.rs

1//! Configuration hot reload module for Zentinel proxy.
2//!
3//! This module implements zero-downtime configuration reloading with validation,
4//! atomic swaps, and rollback support for production reliability.
5//!
6//! ## Submodules
7//!
8//! - `coordinator`: Graceful reload coordination and request draining
9//! - `signals`: OS signal handling (SIGHUP, SIGTERM)
10//! - `validators`: Runtime configuration validators
11
12mod coordinator;
13mod signals;
14mod validators;
15
16pub use coordinator::GracefulReloadCoordinator;
17pub use signals::{SignalManager, SignalType};
18pub use validators::{RouteValidator, UpstreamValidator};
19
20// Re-export for use by proxy initialization
21
22use arc_swap::ArcSwap;
23use notify::{Event, EventKind, RecursiveMode, Watcher};
24use std::path::{Path, PathBuf};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tokio::sync::{broadcast, Mutex, RwLock};
28use tracing::{debug, error, info, trace, warn};
29
30use zentinel_common::errors::{ZentinelError, ZentinelResult};
31use zentinel_config::Config;
32
33use crate::logging::{AuditLogEntry, SharedLogManager};
34use crate::tls::CertificateReloader;
35
36// ============================================================================
37// Reload Events and Types
38// ============================================================================
39
40/// Reload event types
41#[derive(Debug, Clone)]
42pub enum ReloadEvent {
43    /// Configuration reload started
44    Started {
45        timestamp: Instant,
46        trigger: ReloadTrigger,
47    },
48    /// Configuration validated successfully
49    Validated { timestamp: Instant },
50    /// Configuration applied successfully
51    Applied { timestamp: Instant, version: String },
52    /// Configuration reload failed
53    Failed { timestamp: Instant, error: String },
54    /// Configuration rolled back
55    RolledBack { timestamp: Instant, reason: String },
56}
57
58/// Reload trigger source
59#[derive(Debug, Clone)]
60pub enum ReloadTrigger {
61    /// Manual reload via API
62    Manual,
63    /// File change detected
64    FileChange,
65    /// Signal received (SIGHUP)
66    Signal,
67    /// Scheduled reload
68    Scheduled,
69    /// Gateway API controller reconciliation
70    GatewayApi,
71}
72
73// ============================================================================
74// Traits
75// ============================================================================
76
77/// Configuration validator trait
78#[async_trait::async_trait]
79pub trait ConfigValidator: Send + Sync {
80    /// Validate configuration before applying
81    async fn validate(&self, config: &Config) -> ZentinelResult<()>;
82
83    /// Validator name for logging
84    fn name(&self) -> &str;
85}
86
87/// Reload hook trait for custom actions
88#[async_trait::async_trait]
89pub trait ReloadHook: Send + Sync {
90    /// Called before reload starts
91    async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> ZentinelResult<()>;
92
93    /// Called after successful reload
94    async fn post_reload(&self, old_config: &Config, new_config: &Config);
95
96    /// Called on reload failure
97    async fn on_failure(&self, config: &Config, error: &ZentinelError);
98
99    /// Hook name for logging
100    fn name(&self) -> &str;
101}
102
103// ============================================================================
104// Reload Statistics
105// ============================================================================
106
107/// Reload statistics
108#[derive(Default)]
109pub struct ReloadStats {
110    /// Total reload attempts
111    pub total_reloads: std::sync::atomic::AtomicU64,
112    /// Successful reloads
113    pub successful_reloads: std::sync::atomic::AtomicU64,
114    /// Failed reloads
115    pub failed_reloads: std::sync::atomic::AtomicU64,
116    /// Rollbacks performed
117    pub rollbacks: std::sync::atomic::AtomicU64,
118    /// Current config version (incremented on each successful reload)
119    pub config_version: std::sync::atomic::AtomicU64,
120    /// Last successful reload time
121    pub last_success: RwLock<Option<Instant>>,
122    /// Last failure time
123    pub last_failure: RwLock<Option<Instant>>,
124    /// Average reload duration
125    pub avg_duration_ms: RwLock<f64>,
126}
127
128// ============================================================================
129// Configuration Manager
130// ============================================================================
131
132/// Configuration manager with hot reload support
133pub struct ConfigManager {
134    /// Current active configuration
135    current_config: Arc<ArcSwap<Config>>,
136    /// Previous configuration for rollback
137    previous_config: Arc<RwLock<Option<Arc<Config>>>>,
138    /// Configuration file path
139    config_path: PathBuf,
140    /// File watcher for auto-reload (uses RwLock for interior mutability)
141    watcher: Arc<RwLock<Option<notify::RecommendedWatcher>>>,
142    /// Reload event broadcaster
143    reload_tx: broadcast::Sender<ReloadEvent>,
144    /// Reload statistics
145    stats: Arc<ReloadStats>,
146    /// Validation hooks
147    validators: Arc<RwLock<Vec<Box<dyn ConfigValidator>>>>,
148    /// Reload hooks
149    reload_hooks: Arc<RwLock<Vec<Box<dyn ReloadHook>>>>,
150    /// Certificate reloader for TLS hot-reload
151    cert_reloader: Arc<CertificateReloader>,
152    /// Serializes reload operations so only one runs at a time
153    reload_mutex: Arc<Mutex<()>>,
154}
155
156impl ConfigManager {
157    /// Create new configuration manager
158    pub async fn new(
159        config_path: impl AsRef<Path>,
160        initial_config: Config,
161    ) -> ZentinelResult<Self> {
162        let config_path = config_path.as_ref().to_path_buf();
163        let (reload_tx, _) = broadcast::channel(100);
164
165        info!(
166            config_path = %config_path.display(),
167            route_count = initial_config.routes.len(),
168            upstream_count = initial_config.upstreams.len(),
169            listener_count = initial_config.listeners.len(),
170            "Initializing configuration manager"
171        );
172
173        trace!(
174            config_path = %config_path.display(),
175            "Creating ArcSwap for configuration"
176        );
177
178        Ok(Self {
179            current_config: Arc::new(ArcSwap::from_pointee(initial_config)),
180            previous_config: Arc::new(RwLock::new(None)),
181            config_path,
182            watcher: Arc::new(RwLock::new(None)),
183            reload_tx,
184            stats: Arc::new(ReloadStats::default()),
185            validators: Arc::new(RwLock::new(Vec::new())),
186            reload_hooks: Arc::new(RwLock::new(Vec::new())),
187            cert_reloader: Arc::new(CertificateReloader::new()),
188            reload_mutex: Arc::new(Mutex::new(())),
189        })
190    }
191
192    /// Get the certificate reloader for registering TLS listeners
193    pub fn cert_reloader(&self) -> Arc<CertificateReloader> {
194        Arc::clone(&self.cert_reloader)
195    }
196
197    /// Get current configuration
198    pub fn current(&self) -> Arc<Config> {
199        self.current_config.load_full()
200    }
201
202    /// Start watching configuration file for changes
203    ///
204    /// When enabled, the proxy will automatically reload configuration
205    /// when the config file is modified. Also watches the config file's
206    /// parent directory to catch included files in multi-file configs.
207    pub async fn start_watching(&self) -> ZentinelResult<()> {
208        // Check if already watching
209        if self.watcher.read().await.is_some() {
210            warn!("File watcher already active, skipping");
211            return Ok(());
212        }
213
214        let config_path = self.config_path.clone();
215
216        // Use a notify channel to signal that a relevant file changed.
217        // We use a tokio::sync::Notify instead of an mpsc channel — multiple
218        // rapid events coalesce into a single notification automatically.
219        let notify = Arc::new(tokio::sync::Notify::new());
220        let notify_sender = Arc::clone(&notify);
221
222        let watched_path = config_path.clone();
223        let mut watcher =
224            notify::recommended_watcher(move |event: Result<Event, notify::Error>| {
225                match event {
226                    Ok(event) => {
227                        if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_)) {
228                            // Check if the event is for a .kdl file or our config file
229                            let dominated = event.paths.iter().any(|p| {
230                                p == &watched_path || p.extension().is_some_and(|ext| ext == "kdl")
231                            });
232                            if dominated {
233                                notify_sender.notify_one();
234                            }
235                        }
236                    }
237                    Err(e) => {
238                        warn!(error = %e, "File watcher error");
239                    }
240                }
241            })
242            .map_err(|e| ZentinelError::Config {
243                message: format!("Failed to create file watcher: {}", e),
244                source: None,
245            })?;
246
247        // Watch the config file itself
248        watcher
249            .watch(&config_path, RecursiveMode::NonRecursive)
250            .map_err(|e| ZentinelError::Config {
251                message: format!("Failed to watch config file: {}", e),
252                source: None,
253            })?;
254
255        // Also watch the config file's parent directory to catch included files.
256        // Multi-file configs use `include "routes/*.kdl"` — those files live
257        // alongside or under the main config directory.
258        if let Some(parent) = config_path.parent() {
259            if let Err(e) = watcher.watch(parent, RecursiveMode::Recursive) {
260                warn!(
261                    path = %parent.display(),
262                    error = %e,
263                    "Could not watch config directory for included files, \
264                     only the main config file will trigger auto-reload"
265                );
266            } else {
267                debug!(
268                    path = %parent.display(),
269                    "Watching config directory recursively for included file changes"
270                );
271            }
272        }
273
274        // Store watcher using interior mutability
275        *self.watcher.write().await = Some(watcher);
276
277        // Spawn event handler task with proper debounce
278        let manager = Arc::new(self.clone_for_task());
279        let config_path_log = self.config_path.clone();
280        tokio::spawn(async move {
281            loop {
282                // Wait for at least one file change notification
283                notify.notified().await;
284
285                // Debounce: wait for changes to settle. If more notifications
286                // arrive during this window, we consume them and keep waiting
287                // until no new changes arrive for 200ms.
288                while let Ok(()) =
289                    tokio::time::timeout(Duration::from_millis(200), notify.notified()).await
290                {
291                    // Another change arrived within the window, keep waiting
292                    trace!("Debounce: additional file change, resetting timer");
293                }
294
295                info!("Configuration file changed, triggering reload");
296
297                if let Err(e) = manager.reload(ReloadTrigger::FileChange).await {
298                    error!(error = %e, "Auto-reload failed, continuing with current configuration");
299                }
300            }
301        });
302
303        // Spawn a fallback polling loop that checks the file's content hash
304        // every 1 second. inotify can miss rename-based atomic writes on some
305        // filesystems (emptyDir, overlayfs). The poll acts as a safety net.
306        // We compare content length + first/last bytes as a cheap hash to avoid
307        // re-reading the entire file on every poll.
308        let poll_manager = Arc::new(self.clone_for_task());
309        let poll_path = self.config_path.clone();
310        tokio::spawn(async move {
311            use std::io::Read;
312            let content_sig = |p: &std::path::Path| -> Option<(u64, Vec<u8>)> {
313                let mut f = std::fs::File::open(p).ok()?;
314                let meta = f.metadata().ok()?;
315                let len = meta.len();
316                // Read first 256 bytes as a fingerprint (covers schema + listeners)
317                let mut buf = vec![0u8; 256.min(len as usize)];
318                f.read_exact(&mut buf).ok()?;
319                Some((len, buf))
320            };
321            let mut last_sig = content_sig(&poll_path);
322
323            loop {
324                tokio::time::sleep(Duration::from_secs(1)).await;
325
326                let current_sig = content_sig(&poll_path);
327                if current_sig != last_sig {
328                    last_sig = current_sig;
329                    debug!("Config file content changed (poll fallback), triggering reload");
330                    if let Err(e) = poll_manager.reload(ReloadTrigger::FileChange).await {
331                        error!(error = %e, "Poll-triggered reload failed");
332                    }
333                }
334            }
335        });
336
337        info!(
338            config_file = %self.config_path.display(),
339            "Auto-reload enabled: watching for configuration changes (with poll fallback)"
340        );
341        Ok(())
342    }
343
344    /// Reload configuration
345    ///
346    /// Only one reload runs at a time. If a reload is already in progress
347    /// (from file watcher, SIGHUP, or manual trigger), this call waits
348    /// for it to finish before starting.
349    pub async fn reload(&self, trigger: ReloadTrigger) -> ZentinelResult<()> {
350        // Serialize reloads — only one at a time
351        let _reload_guard = self.reload_mutex.lock().await;
352
353        let start = Instant::now();
354        let reload_num = self
355            .stats
356            .total_reloads
357            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
358            + 1;
359
360        info!(
361            trigger = ?trigger,
362            reload_num = reload_num,
363            config_path = %self.config_path.display(),
364            "Starting configuration reload"
365        );
366
367        // Notify reload started
368        let _ = self.reload_tx.send(ReloadEvent::Started {
369            timestamp: Instant::now(),
370            trigger: trigger.clone(),
371        });
372
373        trace!(
374            config_path = %self.config_path.display(),
375            "Reading configuration file"
376        );
377
378        // Load new configuration
379        let new_config = match Config::from_file(&self.config_path) {
380            Ok(config) => {
381                debug!(
382                    route_count = config.routes.len(),
383                    upstream_count = config.upstreams.len(),
384                    listener_count = config.listeners.len(),
385                    "Configuration file parsed successfully"
386                );
387                config
388            }
389            Err(e) => {
390                let error_msg = format!("Failed to load configuration: {}", e);
391                error!(
392                    config_path = %self.config_path.display(),
393                    error = %e,
394                    "Failed to load configuration file"
395                );
396                self.stats
397                    .failed_reloads
398                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
399                *self.stats.last_failure.write().await = Some(Instant::now());
400
401                let _ = self.reload_tx.send(ReloadEvent::Failed {
402                    timestamp: Instant::now(),
403                    error: error_msg.clone(),
404                });
405
406                return Err(ZentinelError::Config {
407                    message: error_msg,
408                    source: None,
409                });
410            }
411        };
412
413        trace!("Starting configuration validation");
414
415        // Validate new configuration BEFORE applying
416        // This is critical - invalid configs must never be loaded
417        if let Err(e) = self.validate_config(&new_config).await {
418            error!(
419                error = %e,
420                "Configuration validation failed - new configuration REJECTED"
421            );
422            self.stats
423                .failed_reloads
424                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
425            *self.stats.last_failure.write().await = Some(Instant::now());
426
427            let _ = self.reload_tx.send(ReloadEvent::Failed {
428                timestamp: Instant::now(),
429                error: e.to_string(),
430            });
431
432            return Err(e);
433        }
434
435        info!(
436            route_count = new_config.routes.len(),
437            upstream_count = new_config.upstreams.len(),
438            "Configuration validation passed, applying new configuration"
439        );
440
441        let _ = self.reload_tx.send(ReloadEvent::Validated {
442            timestamp: Instant::now(),
443        });
444
445        // Get current config for rollback
446        let old_config = self.current_config.load_full();
447
448        trace!(
449            old_routes = old_config.routes.len(),
450            new_routes = new_config.routes.len(),
451            "Preparing configuration swap"
452        );
453
454        // Run pre-reload hooks
455        let hooks = self.reload_hooks.read().await;
456        for hook in hooks.iter() {
457            trace!(hook_name = %hook.name(), "Running pre-reload hook");
458            if let Err(e) = hook.pre_reload(&old_config, &new_config).await {
459                warn!(
460                    hook_name = %hook.name(),
461                    error = %e,
462                    "Pre-reload hook failed"
463                );
464                // Continue with reload despite hook failure
465            }
466        }
467        drop(hooks);
468
469        // Save previous config for rollback
470        trace!("Saving previous configuration for potential rollback");
471        *self.previous_config.write().await = Some(old_config.clone());
472
473        // Apply new configuration atomically
474        trace!("Applying new configuration atomically");
475        self.current_config.store(Arc::new(new_config.clone()));
476
477        // Run post-reload hooks
478        let hooks = self.reload_hooks.read().await;
479        for hook in hooks.iter() {
480            trace!(hook_name = %hook.name(), "Running post-reload hook");
481            hook.post_reload(&old_config, &new_config).await;
482        }
483        drop(hooks);
484
485        // Update statistics
486        let duration = start.elapsed();
487        let successful_count = self
488            .stats
489            .successful_reloads
490            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
491            + 1;
492        *self.stats.last_success.write().await = Some(Instant::now());
493
494        // Update average duration
495        {
496            let mut avg = self.stats.avg_duration_ms.write().await;
497            let total = successful_count as f64;
498            *avg = (*avg * (total - 1.0) + duration.as_millis() as f64) / total;
499        }
500
501        // Increment config version
502        let new_version = self
503            .stats
504            .config_version
505            .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
506            + 1;
507
508        let _ = self.reload_tx.send(ReloadEvent::Applied {
509            timestamp: Instant::now(),
510            version: format!("v{}", new_version),
511        });
512
513        // Reload TLS certificates (hot-reload)
514        // This picks up any certificate file changes without restart
515        let (cert_success, cert_errors) = self.cert_reloader.reload_all();
516        if !cert_errors.is_empty() {
517            for (listener_id, error) in &cert_errors {
518                error!(
519                    listener_id = %listener_id,
520                    error = %error,
521                    "TLS certificate reload failed for listener"
522                );
523            }
524        }
525
526        info!(
527            duration_ms = duration.as_millis(),
528            successful_reloads = successful_count,
529            route_count = new_config.routes.len(),
530            upstream_count = new_config.upstreams.len(),
531            cert_reload_success = cert_success,
532            cert_reload_errors = cert_errors.len(),
533            "Configuration reload completed successfully"
534        );
535
536        Ok(())
537    }
538
539    /// Apply a programmatically-generated configuration directly.
540    ///
541    /// This is the same as `reload()` but accepts a `Config` instead of
542    /// reading from disk. Used by the Gateway API controller to push
543    /// translated Kubernetes resources into the proxy without file I/O.
544    ///
545    /// Runs the full validation → hooks → atomic swap → hooks pipeline.
546    pub async fn apply_config(
547        &self,
548        new_config: Config,
549        trigger: ReloadTrigger,
550    ) -> ZentinelResult<()> {
551        // Serialize reloads — only one at a time
552        let _reload_guard = self.reload_mutex.lock().await;
553
554        let start = Instant::now();
555        let reload_num = self
556            .stats
557            .total_reloads
558            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
559            + 1;
560
561        info!(
562            trigger = ?trigger,
563            reload_num = reload_num,
564            routes = new_config.routes.len(),
565            upstreams = new_config.upstreams.len(),
566            listeners = new_config.listeners.len(),
567            "Applying programmatic configuration"
568        );
569
570        let _ = self.reload_tx.send(ReloadEvent::Started {
571            timestamp: Instant::now(),
572            trigger,
573        });
574
575        // Validate
576        if let Err(e) = self.validate_config(&new_config).await {
577            error!(error = %e, "Programmatic configuration validation failed");
578            self.stats
579                .failed_reloads
580                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
581            *self.stats.last_failure.write().await = Some(Instant::now());
582
583            let _ = self.reload_tx.send(ReloadEvent::Failed {
584                timestamp: Instant::now(),
585                error: e.to_string(),
586            });
587
588            return Err(e);
589        }
590
591        let _ = self.reload_tx.send(ReloadEvent::Validated {
592            timestamp: Instant::now(),
593        });
594
595        // Get current config for rollback and hooks
596        let old_config = self.current_config.load_full();
597
598        // Run pre-reload hooks
599        let hooks = self.reload_hooks.read().await;
600        for hook in hooks.iter() {
601            if let Err(e) = hook.pre_reload(&old_config, &new_config).await {
602                warn!(hook_name = %hook.name(), error = %e, "Pre-reload hook failed");
603            }
604        }
605        drop(hooks);
606
607        // Save previous config for rollback
608        *self.previous_config.write().await = Some(old_config.clone());
609
610        // Atomic swap
611        self.current_config.store(Arc::new(new_config.clone()));
612
613        // Run post-reload hooks
614        let hooks = self.reload_hooks.read().await;
615        for hook in hooks.iter() {
616            hook.post_reload(&old_config, &new_config).await;
617        }
618        drop(hooks);
619
620        // Update statistics
621        let duration = start.elapsed();
622        let successful_count = self
623            .stats
624            .successful_reloads
625            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
626            + 1;
627        *self.stats.last_success.write().await = Some(Instant::now());
628
629        {
630            let mut avg = self.stats.avg_duration_ms.write().await;
631            let total = successful_count as f64;
632            *avg = (*avg * (total - 1.0) + duration.as_millis() as f64) / total;
633        }
634
635        let new_version = self
636            .stats
637            .config_version
638            .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
639            + 1;
640
641        let _ = self.reload_tx.send(ReloadEvent::Applied {
642            timestamp: Instant::now(),
643            version: format!("v{}", new_version),
644        });
645
646        // Reload TLS certificates
647        let (cert_success, cert_errors) = self.cert_reloader.reload_all();
648        if !cert_errors.is_empty() {
649            for (listener_id, error) in &cert_errors {
650                error!(
651                    listener_id = %listener_id,
652                    error = %error,
653                    "TLS certificate reload failed for listener"
654                );
655            }
656        }
657
658        info!(
659            duration_ms = duration.as_millis(),
660            successful_reloads = successful_count,
661            route_count = new_config.routes.len(),
662            upstream_count = new_config.upstreams.len(),
663            cert_reload_success = cert_success,
664            cert_reload_errors = cert_errors.len(),
665            "Programmatic configuration applied successfully"
666        );
667
668        Ok(())
669    }
670
671    /// Get a handle to the underlying ArcSwap for direct reads.
672    ///
673    /// Used by the Gateway API controller to share the same config store.
674    pub fn config_store(&self) -> Arc<ArcSwap<Config>> {
675        Arc::clone(&self.current_config)
676    }
677
678    /// Rollback to previous configuration
679    pub async fn rollback(&self, reason: String) -> ZentinelResult<()> {
680        info!(
681            reason = %reason,
682            "Starting configuration rollback"
683        );
684
685        let previous = self.previous_config.read().await.clone();
686
687        if let Some(prev_config) = previous {
688            trace!(
689                route_count = prev_config.routes.len(),
690                "Found previous configuration for rollback"
691            );
692
693            // Validate previous config (should always pass)
694            trace!("Validating previous configuration");
695            if let Err(e) = self.validate_config(&prev_config).await {
696                error!(
697                    error = %e,
698                    "Previous configuration validation failed during rollback"
699                );
700                return Err(e);
701            }
702
703            // Apply previous configuration
704            trace!("Applying previous configuration");
705            self.current_config.store(prev_config.clone());
706            let rollback_count = self
707                .stats
708                .rollbacks
709                .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
710                + 1;
711
712            let _ = self.reload_tx.send(ReloadEvent::RolledBack {
713                timestamp: Instant::now(),
714                reason: reason.clone(),
715            });
716
717            info!(
718                reason = %reason,
719                rollback_count = rollback_count,
720                route_count = prev_config.routes.len(),
721                "Configuration rolled back successfully"
722            );
723            Ok(())
724        } else {
725            warn!("No previous configuration available for rollback");
726            Err(ZentinelError::Config {
727                message: "No previous configuration available".to_string(),
728                source: None,
729            })
730        }
731    }
732
733    /// Validate configuration
734    async fn validate_config(&self, config: &Config) -> ZentinelResult<()> {
735        trace!(
736            route_count = config.routes.len(),
737            upstream_count = config.upstreams.len(),
738            "Starting configuration validation"
739        );
740
741        // Built-in validation
742        trace!("Running built-in config validation");
743        config.validate()?;
744
745        // Run custom validators
746        let validators = self.validators.read().await;
747        trace!(
748            validator_count = validators.len(),
749            "Running custom validators"
750        );
751        for validator in validators.iter() {
752            trace!(validator_name = %validator.name(), "Running validator");
753            validator.validate(config).await.map_err(|e| {
754                error!(
755                    validator_name = %validator.name(),
756                    error = %e,
757                    "Validator failed"
758                );
759                e
760            })?;
761        }
762
763        debug!(
764            route_count = config.routes.len(),
765            upstream_count = config.upstreams.len(),
766            "Configuration validation passed"
767        );
768
769        Ok(())
770    }
771
772    /// Add configuration validator
773    pub async fn add_validator(&self, validator: Box<dyn ConfigValidator>) {
774        info!("Adding configuration validator: {}", validator.name());
775        self.validators.write().await.push(validator);
776    }
777
778    /// Add reload hook
779    pub async fn add_hook(&self, hook: Box<dyn ReloadHook>) {
780        info!("Adding reload hook: {}", hook.name());
781        self.reload_hooks.write().await.push(hook);
782    }
783
784    /// Subscribe to reload events
785    pub fn subscribe(&self) -> broadcast::Receiver<ReloadEvent> {
786        self.reload_tx.subscribe()
787    }
788
789    /// Get reload statistics
790    pub fn stats(&self) -> &ReloadStats {
791        &self.stats
792    }
793
794    /// Create a lightweight clone for async tasks
795    fn clone_for_task(&self) -> ConfigManager {
796        ConfigManager {
797            current_config: Arc::clone(&self.current_config),
798            previous_config: Arc::clone(&self.previous_config),
799            config_path: self.config_path.clone(),
800            watcher: self.watcher.clone(),
801            reload_tx: self.reload_tx.clone(),
802            stats: Arc::clone(&self.stats),
803            validators: Arc::clone(&self.validators),
804            reload_hooks: Arc::clone(&self.reload_hooks),
805            cert_reloader: Arc::clone(&self.cert_reloader),
806            reload_mutex: Arc::clone(&self.reload_mutex),
807        }
808    }
809}
810
811// ============================================================================
812// Audit Reload Hook
813// ============================================================================
814
815/// Reload hook that logs configuration changes to the audit log.
816pub struct AuditReloadHook {
817    log_manager: SharedLogManager,
818}
819
820impl AuditReloadHook {
821    /// Create a new audit reload hook with the given log manager.
822    pub fn new(log_manager: SharedLogManager) -> Self {
823        Self { log_manager }
824    }
825}
826
827#[async_trait::async_trait]
828impl ReloadHook for AuditReloadHook {
829    async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> ZentinelResult<()> {
830        // Log that reload is starting
831        let trace_id = uuid::Uuid::new_v4().to_string();
832        let audit_entry = AuditLogEntry::config_change(
833            &trace_id,
834            "reload_started",
835            format!(
836                "Configuration reload starting: {} routes -> {} routes, {} upstreams -> {} upstreams",
837                old_config.routes.len(),
838                new_config.routes.len(),
839                old_config.upstreams.len(),
840                new_config.upstreams.len()
841            ),
842        );
843        self.log_manager.log_audit(&audit_entry);
844        Ok(())
845    }
846
847    async fn post_reload(&self, old_config: &Config, new_config: &Config) {
848        // Log successful reload
849        let trace_id = uuid::Uuid::new_v4().to_string();
850        let audit_entry = AuditLogEntry::config_change(
851            &trace_id,
852            "reload_success",
853            format!(
854                "Configuration reload successful: {} routes, {} upstreams, {} listeners",
855                new_config.routes.len(),
856                new_config.upstreams.len(),
857                new_config.listeners.len()
858            ),
859        )
860        .with_metadata("old_routes", old_config.routes.len().to_string())
861        .with_metadata("new_routes", new_config.routes.len().to_string())
862        .with_metadata("old_upstreams", old_config.upstreams.len().to_string())
863        .with_metadata("new_upstreams", new_config.upstreams.len().to_string());
864        self.log_manager.log_audit(&audit_entry);
865    }
866
867    async fn on_failure(&self, config: &Config, error: &ZentinelError) {
868        // Log failed reload
869        let trace_id = uuid::Uuid::new_v4().to_string();
870        let audit_entry = AuditLogEntry::config_change(
871            &trace_id,
872            "reload_failed",
873            format!("Configuration reload failed: {}", error),
874        )
875        .with_metadata("current_routes", config.routes.len().to_string())
876        .with_metadata("current_upstreams", config.upstreams.len().to_string());
877        self.log_manager.log_audit(&audit_entry);
878    }
879
880    fn name(&self) -> &str {
881        "audit_reload_hook"
882    }
883}
884
885#[cfg(test)]
886mod tests {
887    use super::*;
888
889    #[tokio::test]
890    async fn test_config_reload_rejects_invalid_config() {
891        // Create valid initial config
892        let initial_config = Config::default_for_testing();
893        let initial_routes = initial_config.routes.len();
894
895        let temp_dir = tempfile::tempdir().unwrap();
896        let config_path = temp_dir.path().join("config.kdl");
897
898        // Write INVALID config (not valid KDL)
899        std::fs::write(&config_path, "this is not valid KDL { {{{{ broken").unwrap();
900
901        // Create config manager with valid initial config
902        let manager = ConfigManager::new(&config_path, initial_config)
903            .await
904            .unwrap();
905
906        // Verify initial config is loaded
907        assert_eq!(manager.current().routes.len(), initial_routes);
908
909        // Attempt reload with invalid config - should fail
910        let result = manager.reload(ReloadTrigger::Manual).await;
911        assert!(result.is_err(), "Reload should fail for invalid config");
912
913        // Verify original config is STILL loaded (not replaced)
914        assert_eq!(
915            manager.current().routes.len(),
916            initial_routes,
917            "Original config should be preserved after failed reload"
918        );
919
920        // Verify failure was recorded in stats
921        assert_eq!(
922            manager
923                .stats()
924                .failed_reloads
925                .load(std::sync::atomic::Ordering::Relaxed),
926            1,
927            "Failed reload should be recorded"
928        );
929    }
930
931    #[tokio::test]
932    async fn test_config_reload_accepts_valid_config() {
933        // Create valid initial config
934        let initial_config = Config::default_for_testing();
935        let temp_dir = tempfile::tempdir().unwrap();
936        let config_path = temp_dir.path().join("config.kdl");
937
938        // Create a static files directory for the test
939        let static_dir = temp_dir.path().join("static");
940        std::fs::create_dir_all(&static_dir).unwrap();
941
942        // Write a valid config with upstream
943        let valid_config = r#"
944server {
945    worker-threads 4
946}
947
948listeners {
949    listener "http" {
950        address "0.0.0.0:8080"
951        protocol "http"
952    }
953}
954
955upstreams {
956    upstream "backend" {
957        target "127.0.0.1:3000"
958    }
959}
960
961routes {
962    route "api" {
963        priority "high"
964        matches {
965            path-prefix "/api/"
966        }
967        upstream "backend"
968    }
969}
970"#;
971        std::fs::write(&config_path, valid_config).unwrap();
972
973        // Create config manager
974        let manager = ConfigManager::new(&config_path, initial_config)
975            .await
976            .unwrap();
977
978        // Reload should succeed with valid config
979        let result = manager.reload(ReloadTrigger::Manual).await;
980        assert!(
981            result.is_ok(),
982            "Reload should succeed for valid config: {:?}",
983            result.err()
984        );
985
986        // Verify success was recorded
987        assert_eq!(
988            manager
989                .stats()
990                .successful_reloads
991                .load(std::sync::atomic::Ordering::Relaxed),
992            1,
993            "Successful reload should be recorded"
994        );
995    }
996
997    // ========================================================================
998    // Concurrent Reload Tests
999    // ========================================================================
1000
1001    /// Helper to create a valid config file with a specified route count
1002    fn write_config_with_routes(path: &Path, route_count: usize) {
1003        let mut routes = String::new();
1004        for i in 0..route_count {
1005            routes.push_str(&format!(
1006                r#"
1007    route "route{i}" {{
1008        priority "medium"
1009        matches {{
1010            path-prefix "/route{i}/"
1011        }}
1012        upstream "backend"
1013    }}
1014"#
1015            ));
1016        }
1017
1018        let config = format!(
1019            r#"
1020server {{
1021    worker-threads 4
1022}}
1023
1024listeners {{
1025    listener "http" {{
1026        address "0.0.0.0:8080"
1027        protocol "http"
1028    }}
1029}}
1030
1031upstreams {{
1032    upstream "backend" {{
1033        target "127.0.0.1:3000"
1034    }}
1035}}
1036
1037routes {{
1038{routes}
1039}}
1040"#
1041        );
1042
1043        std::fs::write(path, config).unwrap();
1044    }
1045
1046    #[tokio::test]
1047    async fn test_concurrent_config_reads_during_reload() {
1048        // Test that config reads don't block or panic during reload
1049        let initial_config = Config::default_for_testing();
1050        let temp_dir = tempfile::tempdir().unwrap();
1051        let config_path = temp_dir.path().join("config.kdl");
1052
1053        write_config_with_routes(&config_path, 5);
1054
1055        let manager = Arc::new(
1056            ConfigManager::new(&config_path, initial_config)
1057                .await
1058                .unwrap(),
1059        );
1060
1061        // Spawn multiple readers that continuously read config
1062        let mut readers = Vec::new();
1063        for _ in 0..10 {
1064            let manager_clone = Arc::clone(&manager);
1065            readers.push(tokio::spawn(async move {
1066                let mut read_count = 0;
1067                for _ in 0..100 {
1068                    let config = manager_clone.current();
1069                    // Access config to ensure it's valid
1070                    let _ = config.routes.len();
1071                    read_count += 1;
1072                    tokio::task::yield_now().await;
1073                }
1074                read_count
1075            }));
1076        }
1077
1078        // Simultaneously trigger reload
1079        let manager_reload = Arc::clone(&manager);
1080        let reload_handle =
1081            tokio::spawn(async move { manager_reload.reload(ReloadTrigger::Manual).await });
1082
1083        // Wait for all readers and the reload
1084        let mut total_reads = 0;
1085        for reader in readers {
1086            total_reads += reader.await.unwrap();
1087        }
1088
1089        let reload_result = reload_handle.await.unwrap();
1090        assert!(reload_result.is_ok(), "Reload should succeed");
1091        assert_eq!(total_reads, 1000, "All reads should complete");
1092    }
1093
1094    #[tokio::test]
1095    async fn test_multiple_concurrent_reloads() {
1096        // Test that multiple simultaneous reloads don't cause panics or corruption
1097        let initial_config = Config::default_for_testing();
1098        let temp_dir = tempfile::tempdir().unwrap();
1099        let config_path = temp_dir.path().join("config.kdl");
1100
1101        write_config_with_routes(&config_path, 3);
1102
1103        let manager = Arc::new(
1104            ConfigManager::new(&config_path, initial_config)
1105                .await
1106                .unwrap(),
1107        );
1108
1109        // Trigger multiple reloads concurrently
1110        let mut reload_handles = Vec::new();
1111        for i in 0..5 {
1112            let manager_clone = Arc::clone(&manager);
1113            let trigger = if i % 2 == 0 {
1114                ReloadTrigger::Manual
1115            } else {
1116                ReloadTrigger::Signal
1117            };
1118            reload_handles.push(tokio::spawn(
1119                async move { manager_clone.reload(trigger).await },
1120            ));
1121        }
1122
1123        // All reloads should complete (some may fail due to racing, but no panics)
1124        let mut success_count = 0;
1125        for handle in reload_handles {
1126            if handle.await.unwrap().is_ok() {
1127                success_count += 1;
1128            }
1129        }
1130
1131        // At least one reload should succeed
1132        assert!(success_count >= 1, "At least one reload should succeed");
1133
1134        // Stats should reflect all attempts
1135        let total = manager
1136            .stats()
1137            .total_reloads
1138            .load(std::sync::atomic::Ordering::Relaxed);
1139        assert_eq!(total, 5, "All reload attempts should be counted");
1140    }
1141
1142    #[tokio::test]
1143    async fn test_config_visibility_after_reload() {
1144        // Test that new config is immediately visible after reload completes
1145        let initial_config = Config::default_for_testing();
1146        let initial_route_count = initial_config.routes.len();
1147
1148        let temp_dir = tempfile::tempdir().unwrap();
1149        let config_path = temp_dir.path().join("config.kdl");
1150
1151        // Start with 2 routes
1152        write_config_with_routes(&config_path, 2);
1153
1154        let manager = ConfigManager::new(&config_path, initial_config)
1155            .await
1156            .unwrap();
1157
1158        // Verify initial config
1159        assert_eq!(manager.current().routes.len(), initial_route_count);
1160
1161        // Reload to get 2 routes from file
1162        manager.reload(ReloadTrigger::Manual).await.unwrap();
1163        assert_eq!(manager.current().routes.len(), 2);
1164
1165        // Update file to 5 routes and reload
1166        write_config_with_routes(&config_path, 5);
1167        manager.reload(ReloadTrigger::Manual).await.unwrap();
1168        assert_eq!(
1169            manager.current().routes.len(),
1170            5,
1171            "New config should be visible immediately after reload"
1172        );
1173
1174        // Update file to 1 route and reload
1175        write_config_with_routes(&config_path, 1);
1176        manager.reload(ReloadTrigger::Manual).await.unwrap();
1177        assert_eq!(
1178            manager.current().routes.len(),
1179            1,
1180            "Config changes should be visible after each reload"
1181        );
1182    }
1183
1184    #[tokio::test]
1185    async fn test_rapid_successive_reloads() {
1186        // Test rapid-fire reloads don't cause issues
1187        let initial_config = Config::default_for_testing();
1188        let temp_dir = tempfile::tempdir().unwrap();
1189        let config_path = temp_dir.path().join("config.kdl");
1190
1191        write_config_with_routes(&config_path, 3);
1192
1193        let manager = ConfigManager::new(&config_path, initial_config)
1194            .await
1195            .unwrap();
1196
1197        // Perform 20 rapid reloads
1198        for i in 0..20 {
1199            // Alternate between different route counts
1200            write_config_with_routes(&config_path, (i % 5) + 1);
1201            let result = manager.reload(ReloadTrigger::Manual).await;
1202            assert!(result.is_ok(), "Reload {} should succeed", i);
1203        }
1204
1205        // Verify final state
1206        let stats = manager.stats();
1207        assert_eq!(
1208            stats
1209                .successful_reloads
1210                .load(std::sync::atomic::Ordering::Relaxed),
1211            20,
1212            "All 20 reloads should succeed"
1213        );
1214        assert_eq!(
1215            stats
1216                .failed_reloads
1217                .load(std::sync::atomic::Ordering::Relaxed),
1218            0,
1219            "No reloads should fail"
1220        );
1221    }
1222
1223    #[tokio::test]
1224    async fn test_rollback_preserves_previous_config() {
1225        // Test that rollback correctly restores previous configuration
1226        let initial_config = Config::default_for_testing();
1227        let temp_dir = tempfile::tempdir().unwrap();
1228        let config_path = temp_dir.path().join("config.kdl");
1229
1230        // Start with 3 routes
1231        write_config_with_routes(&config_path, 3);
1232
1233        let manager = ConfigManager::new(&config_path, initial_config)
1234            .await
1235            .unwrap();
1236
1237        // First reload to establish baseline
1238        manager.reload(ReloadTrigger::Manual).await.unwrap();
1239        assert_eq!(manager.current().routes.len(), 3);
1240
1241        // Second reload with 5 routes
1242        write_config_with_routes(&config_path, 5);
1243        manager.reload(ReloadTrigger::Manual).await.unwrap();
1244        assert_eq!(manager.current().routes.len(), 5);
1245
1246        // Rollback should restore 3 routes
1247        manager
1248            .rollback("Testing rollback".to_string())
1249            .await
1250            .unwrap();
1251        assert_eq!(
1252            manager.current().routes.len(),
1253            3,
1254            "Rollback should restore previous config"
1255        );
1256
1257        // Verify rollback was recorded
1258        assert_eq!(
1259            manager
1260                .stats()
1261                .rollbacks
1262                .load(std::sync::atomic::Ordering::Relaxed),
1263            1,
1264            "Rollback should be recorded in stats"
1265        );
1266    }
1267
1268    #[tokio::test]
1269    async fn test_reload_events_broadcast() {
1270        // Test that reload events are properly broadcast to subscribers
1271        let initial_config = Config::default_for_testing();
1272        let temp_dir = tempfile::tempdir().unwrap();
1273        let config_path = temp_dir.path().join("config.kdl");
1274
1275        write_config_with_routes(&config_path, 2);
1276
1277        let manager = ConfigManager::new(&config_path, initial_config)
1278            .await
1279            .unwrap();
1280
1281        // Subscribe to reload events
1282        let mut receiver = manager.subscribe();
1283
1284        // Trigger reload
1285        manager.reload(ReloadTrigger::Manual).await.unwrap();
1286
1287        // Collect events (non-blocking with timeout)
1288        let mut events = Vec::new();
1289        while let Ok(Ok(event)) =
1290            tokio::time::timeout(Duration::from_millis(100), receiver.recv()).await
1291        {
1292            events.push(event);
1293        }
1294
1295        // Verify we received the expected events
1296        assert!(
1297            events.len() >= 2,
1298            "Should receive at least Started and Applied/Validated events"
1299        );
1300
1301        // Check for Started event
1302        assert!(
1303            events
1304                .iter()
1305                .any(|e| matches!(e, ReloadEvent::Started { .. })),
1306            "Should receive Started event"
1307        );
1308
1309        // Check for Applied event (successful reload)
1310        assert!(
1311            events
1312                .iter()
1313                .any(|e| matches!(e, ReloadEvent::Applied { .. })),
1314            "Should receive Applied event on success"
1315        );
1316    }
1317
1318    #[tokio::test]
1319    async fn test_graceful_coordinator_with_reload() {
1320        // Test that GracefulReloadCoordinator correctly tracks requests during reload
1321        let coordinator = GracefulReloadCoordinator::new(Duration::from_secs(5));
1322
1323        // Simulate requests starting
1324        coordinator.inc_requests();
1325        coordinator.inc_requests();
1326        coordinator.inc_requests();
1327        assert_eq!(coordinator.active_count(), 3);
1328
1329        // Simulate one request completing during reload prep
1330        coordinator.dec_requests();
1331        assert_eq!(coordinator.active_count(), 2);
1332
1333        // Start drain in background
1334        let coord_clone = Arc::new(coordinator);
1335        let coord_for_drain = Arc::clone(&coord_clone);
1336        let drain_handle = tokio::spawn(async move { coord_for_drain.wait_for_drain().await });
1337
1338        // Simulate remaining requests completing
1339        tokio::time::sleep(Duration::from_millis(50)).await;
1340        coord_clone.dec_requests();
1341        tokio::time::sleep(Duration::from_millis(50)).await;
1342        coord_clone.dec_requests();
1343
1344        // Drain should complete successfully
1345        let drained = drain_handle.await.unwrap();
1346        assert!(drained, "All requests should drain successfully");
1347    }
1348
1349    #[tokio::test]
1350    async fn test_graceful_coordinator_drain_timeout() {
1351        // Test that drain times out correctly when requests don't complete
1352        let coordinator = GracefulReloadCoordinator::new(Duration::from_millis(200));
1353
1354        // Simulate stuck requests
1355        coordinator.inc_requests();
1356        coordinator.inc_requests();
1357
1358        // Start drain - should timeout
1359        let drained = coordinator.wait_for_drain().await;
1360        assert!(!drained, "Drain should timeout with stuck requests");
1361        assert_eq!(
1362            coordinator.active_count(),
1363            2,
1364            "Requests should still be tracked"
1365        );
1366    }
1367}