sentinel_proxy/reload/
mod.rs

1//! Configuration hot reload module for Sentinel proxy.
2//!
3//! This module implements zero-downtime configuration reloading with validation,
4//! atomic swaps, and rollback support for production reliability.
5//!
6//! ## Submodules
7//!
8//! - [`coordinator`]: Graceful reload coordination and request draining
9//! - [`signals`]: OS signal handling (SIGHUP, SIGTERM)
10//! - [`validators`]: Runtime configuration validators
11
12mod coordinator;
13mod signals;
14mod validators;
15
16pub use coordinator::GracefulReloadCoordinator;
17pub use signals::{SignalManager, SignalType};
18pub use validators::{RouteValidator, UpstreamValidator};
19
20// Re-export for use by proxy initialization
21
22use arc_swap::ArcSwap;
23use notify::{Event, EventKind, RecursiveMode, Watcher};
24use std::path::{Path, PathBuf};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tokio::sync::{broadcast, RwLock};
28use tracing::{debug, error, info, trace, warn};
29
30use sentinel_common::errors::{SentinelError, SentinelResult};
31use sentinel_config::Config;
32
33use crate::logging::{AuditLogEntry, SharedLogManager};
34use crate::tls::CertificateReloader;
35
36// ============================================================================
37// Reload Events and Types
38// ============================================================================
39
40/// Reload event types
41#[derive(Debug, Clone)]
42pub enum ReloadEvent {
43    /// Configuration reload started
44    Started {
45        timestamp: Instant,
46        trigger: ReloadTrigger,
47    },
48    /// Configuration validated successfully
49    Validated { timestamp: Instant },
50    /// Configuration applied successfully
51    Applied { timestamp: Instant, version: String },
52    /// Configuration reload failed
53    Failed { timestamp: Instant, error: String },
54    /// Configuration rolled back
55    RolledBack { timestamp: Instant, reason: String },
56}
57
58/// Reload trigger source
59#[derive(Debug, Clone)]
60pub enum ReloadTrigger {
61    /// Manual reload via API
62    Manual,
63    /// File change detected
64    FileChange,
65    /// Signal received (SIGHUP)
66    Signal,
67    /// Scheduled reload
68    Scheduled,
69}
70
71// ============================================================================
72// Traits
73// ============================================================================
74
75/// Configuration validator trait
76#[async_trait::async_trait]
77pub trait ConfigValidator: Send + Sync {
78    /// Validate configuration before applying
79    async fn validate(&self, config: &Config) -> SentinelResult<()>;
80
81    /// Validator name for logging
82    fn name(&self) -> &str;
83}
84
85/// Reload hook trait for custom actions
86#[async_trait::async_trait]
87pub trait ReloadHook: Send + Sync {
88    /// Called before reload starts
89    async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> SentinelResult<()>;
90
91    /// Called after successful reload
92    async fn post_reload(&self, old_config: &Config, new_config: &Config);
93
94    /// Called on reload failure
95    async fn on_failure(&self, config: &Config, error: &SentinelError);
96
97    /// Hook name for logging
98    fn name(&self) -> &str;
99}
100
101// ============================================================================
102// Reload Statistics
103// ============================================================================
104
105/// Reload statistics
106#[derive(Default)]
107pub struct ReloadStats {
108    /// Total reload attempts
109    pub total_reloads: std::sync::atomic::AtomicU64,
110    /// Successful reloads
111    pub successful_reloads: std::sync::atomic::AtomicU64,
112    /// Failed reloads
113    pub failed_reloads: std::sync::atomic::AtomicU64,
114    /// Rollbacks performed
115    pub rollbacks: std::sync::atomic::AtomicU64,
116    /// Last successful reload time
117    pub last_success: RwLock<Option<Instant>>,
118    /// Last failure time
119    pub last_failure: RwLock<Option<Instant>>,
120    /// Average reload duration
121    pub avg_duration_ms: RwLock<f64>,
122}
123
124// ============================================================================
125// Configuration Manager
126// ============================================================================
127
128/// Configuration manager with hot reload support
129pub struct ConfigManager {
130    /// Current active configuration
131    current_config: Arc<ArcSwap<Config>>,
132    /// Previous configuration for rollback
133    previous_config: Arc<RwLock<Option<Arc<Config>>>>,
134    /// Configuration file path
135    config_path: PathBuf,
136    /// File watcher for auto-reload (uses RwLock for interior mutability)
137    watcher: Arc<RwLock<Option<notify::RecommendedWatcher>>>,
138    /// Reload event broadcaster
139    reload_tx: broadcast::Sender<ReloadEvent>,
140    /// Reload statistics
141    stats: Arc<ReloadStats>,
142    /// Validation hooks
143    validators: Arc<RwLock<Vec<Box<dyn ConfigValidator>>>>,
144    /// Reload hooks
145    reload_hooks: Arc<RwLock<Vec<Box<dyn ReloadHook>>>>,
146    /// Certificate reloader for TLS hot-reload
147    cert_reloader: Arc<CertificateReloader>,
148}
149
150impl ConfigManager {
151    /// Create new configuration manager
152    pub async fn new(
153        config_path: impl AsRef<Path>,
154        initial_config: Config,
155    ) -> SentinelResult<Self> {
156        let config_path = config_path.as_ref().to_path_buf();
157        let (reload_tx, _) = broadcast::channel(100);
158
159        info!(
160            config_path = %config_path.display(),
161            route_count = initial_config.routes.len(),
162            upstream_count = initial_config.upstreams.len(),
163            listener_count = initial_config.listeners.len(),
164            "Initializing configuration manager"
165        );
166
167        trace!(
168            config_path = %config_path.display(),
169            "Creating ArcSwap for configuration"
170        );
171
172        Ok(Self {
173            current_config: Arc::new(ArcSwap::from_pointee(initial_config)),
174            previous_config: Arc::new(RwLock::new(None)),
175            config_path,
176            watcher: Arc::new(RwLock::new(None)),
177            reload_tx,
178            stats: Arc::new(ReloadStats::default()),
179            validators: Arc::new(RwLock::new(Vec::new())),
180            reload_hooks: Arc::new(RwLock::new(Vec::new())),
181            cert_reloader: Arc::new(CertificateReloader::new()),
182        })
183    }
184
185    /// Get the certificate reloader for registering TLS listeners
186    pub fn cert_reloader(&self) -> Arc<CertificateReloader> {
187        Arc::clone(&self.cert_reloader)
188    }
189
190    /// Get current configuration
191    pub fn current(&self) -> Arc<Config> {
192        self.current_config.load_full()
193    }
194
195    /// Start watching configuration file for changes
196    ///
197    /// When enabled, the proxy will automatically reload configuration
198    /// when the config file is modified.
199    pub async fn start_watching(&self) -> SentinelResult<()> {
200        // Check if already watching
201        if self.watcher.read().await.is_some() {
202            warn!("File watcher already active, skipping");
203            return Ok(());
204        }
205
206        let config_path = self.config_path.clone();
207
208        // Create file watcher
209        let (tx, mut rx) = tokio::sync::mpsc::channel(10);
210
211        let mut watcher =
212            notify::recommended_watcher(move |event: Result<Event, notify::Error>| {
213                if let Ok(event) = event {
214                    let _ = tx.blocking_send(event);
215                }
216            })
217            .map_err(|e| SentinelError::Config {
218                message: format!("Failed to create file watcher: {}", e),
219                source: None,
220            })?;
221
222        // Watch config file
223        watcher
224            .watch(&config_path, RecursiveMode::NonRecursive)
225            .map_err(|e| SentinelError::Config {
226                message: format!("Failed to watch config file: {}", e),
227                source: None,
228            })?;
229
230        // Store watcher using interior mutability
231        *self.watcher.write().await = Some(watcher);
232
233        // Spawn event handler task
234        let manager = Arc::new(self.clone_for_task());
235        tokio::spawn(async move {
236            while let Some(event) = rx.recv().await {
237                if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_)) {
238                    info!("Configuration file changed, triggering reload");
239
240                    // Debounce rapid changes
241                    tokio::time::sleep(Duration::from_millis(100)).await;
242
243                    if let Err(e) = manager.reload(ReloadTrigger::FileChange).await {
244                        error!("Auto-reload failed: {}", e);
245                        error!("Continuing with current configuration");
246                    }
247                }
248            }
249        });
250
251        info!(
252            "Auto-reload enabled: watching configuration file {:?}",
253            self.config_path
254        );
255        Ok(())
256    }
257
258    /// Reload configuration
259    pub async fn reload(&self, trigger: ReloadTrigger) -> SentinelResult<()> {
260        let start = Instant::now();
261        let reload_num = self
262            .stats
263            .total_reloads
264            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
265            + 1;
266
267        info!(
268            trigger = ?trigger,
269            reload_num = reload_num,
270            config_path = %self.config_path.display(),
271            "Starting configuration reload"
272        );
273
274        // Notify reload started
275        let _ = self.reload_tx.send(ReloadEvent::Started {
276            timestamp: Instant::now(),
277            trigger: trigger.clone(),
278        });
279
280        trace!(
281            config_path = %self.config_path.display(),
282            "Reading configuration file"
283        );
284
285        // Load new configuration
286        let new_config = match Config::from_file(&self.config_path) {
287            Ok(config) => {
288                debug!(
289                    route_count = config.routes.len(),
290                    upstream_count = config.upstreams.len(),
291                    listener_count = config.listeners.len(),
292                    "Configuration file parsed successfully"
293                );
294                config
295            }
296            Err(e) => {
297                let error_msg = format!("Failed to load configuration: {}", e);
298                error!(
299                    config_path = %self.config_path.display(),
300                    error = %e,
301                    "Failed to load configuration file"
302                );
303                self.stats
304                    .failed_reloads
305                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
306                *self.stats.last_failure.write().await = Some(Instant::now());
307
308                let _ = self.reload_tx.send(ReloadEvent::Failed {
309                    timestamp: Instant::now(),
310                    error: error_msg.clone(),
311                });
312
313                return Err(SentinelError::Config {
314                    message: error_msg,
315                    source: None,
316                });
317            }
318        };
319
320        trace!("Starting configuration validation");
321
322        // Validate new configuration BEFORE applying
323        // This is critical - invalid configs must never be loaded
324        if let Err(e) = self.validate_config(&new_config).await {
325            error!(
326                error = %e,
327                "Configuration validation failed - new configuration REJECTED"
328            );
329            self.stats
330                .failed_reloads
331                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
332            *self.stats.last_failure.write().await = Some(Instant::now());
333
334            let _ = self.reload_tx.send(ReloadEvent::Failed {
335                timestamp: Instant::now(),
336                error: e.to_string(),
337            });
338
339            return Err(e);
340        }
341
342        info!(
343            route_count = new_config.routes.len(),
344            upstream_count = new_config.upstreams.len(),
345            "Configuration validation passed, applying new configuration"
346        );
347
348        let _ = self.reload_tx.send(ReloadEvent::Validated {
349            timestamp: Instant::now(),
350        });
351
352        // Get current config for rollback
353        let old_config = self.current_config.load_full();
354
355        trace!(
356            old_routes = old_config.routes.len(),
357            new_routes = new_config.routes.len(),
358            "Preparing configuration swap"
359        );
360
361        // Run pre-reload hooks
362        let hooks = self.reload_hooks.read().await;
363        for hook in hooks.iter() {
364            trace!(hook_name = %hook.name(), "Running pre-reload hook");
365            if let Err(e) = hook.pre_reload(&old_config, &new_config).await {
366                warn!(
367                    hook_name = %hook.name(),
368                    error = %e,
369                    "Pre-reload hook failed"
370                );
371                // Continue with reload despite hook failure
372            }
373        }
374        drop(hooks);
375
376        // Save previous config for rollback
377        trace!("Saving previous configuration for potential rollback");
378        *self.previous_config.write().await = Some(old_config.clone());
379
380        // Apply new configuration atomically
381        trace!("Applying new configuration atomically");
382        self.current_config.store(Arc::new(new_config.clone()));
383
384        // Run post-reload hooks
385        let hooks = self.reload_hooks.read().await;
386        for hook in hooks.iter() {
387            trace!(hook_name = %hook.name(), "Running post-reload hook");
388            hook.post_reload(&old_config, &new_config).await;
389        }
390        drop(hooks);
391
392        // Update statistics
393        let duration = start.elapsed();
394        let successful_count = self
395            .stats
396            .successful_reloads
397            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
398            + 1;
399        *self.stats.last_success.write().await = Some(Instant::now());
400
401        // Update average duration
402        {
403            let mut avg = self.stats.avg_duration_ms.write().await;
404            let total = successful_count as f64;
405            *avg = (*avg * (total - 1.0) + duration.as_millis() as f64) / total;
406        }
407
408        let _ = self.reload_tx.send(ReloadEvent::Applied {
409            timestamp: Instant::now(),
410            version: format!("{:?}", Instant::now()), // TODO: Use proper versioning
411        });
412
413        // Reload TLS certificates (hot-reload)
414        // This picks up any certificate file changes without restart
415        let (cert_success, cert_errors) = self.cert_reloader.reload_all();
416        if !cert_errors.is_empty() {
417            for (listener_id, error) in &cert_errors {
418                error!(
419                    listener_id = %listener_id,
420                    error = %error,
421                    "TLS certificate reload failed for listener"
422                );
423            }
424        }
425
426        info!(
427            duration_ms = duration.as_millis(),
428            successful_reloads = successful_count,
429            route_count = new_config.routes.len(),
430            upstream_count = new_config.upstreams.len(),
431            cert_reload_success = cert_success,
432            cert_reload_errors = cert_errors.len(),
433            "Configuration reload completed successfully"
434        );
435
436        Ok(())
437    }
438
439    /// Rollback to previous configuration
440    pub async fn rollback(&self, reason: String) -> SentinelResult<()> {
441        info!(
442            reason = %reason,
443            "Starting configuration rollback"
444        );
445
446        let previous = self.previous_config.read().await.clone();
447
448        if let Some(prev_config) = previous {
449            trace!(
450                route_count = prev_config.routes.len(),
451                "Found previous configuration for rollback"
452            );
453
454            // Validate previous config (should always pass)
455            trace!("Validating previous configuration");
456            if let Err(e) = self.validate_config(&prev_config).await {
457                error!(
458                    error = %e,
459                    "Previous configuration validation failed during rollback"
460                );
461                return Err(e);
462            }
463
464            // Apply previous configuration
465            trace!("Applying previous configuration");
466            self.current_config.store(prev_config.clone());
467            let rollback_count = self
468                .stats
469                .rollbacks
470                .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
471                + 1;
472
473            let _ = self.reload_tx.send(ReloadEvent::RolledBack {
474                timestamp: Instant::now(),
475                reason: reason.clone(),
476            });
477
478            info!(
479                reason = %reason,
480                rollback_count = rollback_count,
481                route_count = prev_config.routes.len(),
482                "Configuration rolled back successfully"
483            );
484            Ok(())
485        } else {
486            warn!("No previous configuration available for rollback");
487            Err(SentinelError::Config {
488                message: "No previous configuration available".to_string(),
489                source: None,
490            })
491        }
492    }
493
494    /// Validate configuration
495    async fn validate_config(&self, config: &Config) -> SentinelResult<()> {
496        trace!(
497            route_count = config.routes.len(),
498            upstream_count = config.upstreams.len(),
499            "Starting configuration validation"
500        );
501
502        // Built-in validation
503        trace!("Running built-in config validation");
504        config.validate()?;
505
506        // Run custom validators
507        let validators = self.validators.read().await;
508        trace!(
509            validator_count = validators.len(),
510            "Running custom validators"
511        );
512        for validator in validators.iter() {
513            trace!(validator_name = %validator.name(), "Running validator");
514            validator.validate(config).await.map_err(|e| {
515                error!(
516                    validator_name = %validator.name(),
517                    error = %e,
518                    "Validator failed"
519                );
520                e
521            })?;
522        }
523
524        debug!(
525            route_count = config.routes.len(),
526            upstream_count = config.upstreams.len(),
527            "Configuration validation passed"
528        );
529
530        Ok(())
531    }
532
533    /// Add configuration validator
534    pub async fn add_validator(&self, validator: Box<dyn ConfigValidator>) {
535        info!("Adding configuration validator: {}", validator.name());
536        self.validators.write().await.push(validator);
537    }
538
539    /// Add reload hook
540    pub async fn add_hook(&self, hook: Box<dyn ReloadHook>) {
541        info!("Adding reload hook: {}", hook.name());
542        self.reload_hooks.write().await.push(hook);
543    }
544
545    /// Subscribe to reload events
546    pub fn subscribe(&self) -> broadcast::Receiver<ReloadEvent> {
547        self.reload_tx.subscribe()
548    }
549
550    /// Get reload statistics
551    pub fn stats(&self) -> &ReloadStats {
552        &self.stats
553    }
554
555    /// Create a lightweight clone for async tasks
556    fn clone_for_task(&self) -> ConfigManager {
557        ConfigManager {
558            current_config: Arc::clone(&self.current_config),
559            previous_config: Arc::clone(&self.previous_config),
560            config_path: self.config_path.clone(),
561            watcher: self.watcher.clone(),
562            reload_tx: self.reload_tx.clone(),
563            stats: Arc::clone(&self.stats),
564            validators: Arc::clone(&self.validators),
565            reload_hooks: Arc::clone(&self.reload_hooks),
566            cert_reloader: Arc::clone(&self.cert_reloader),
567        }
568    }
569}
570
571// ============================================================================
572// Audit Reload Hook
573// ============================================================================
574
575/// Reload hook that logs configuration changes to the audit log.
576pub struct AuditReloadHook {
577    log_manager: SharedLogManager,
578}
579
580impl AuditReloadHook {
581    /// Create a new audit reload hook with the given log manager.
582    pub fn new(log_manager: SharedLogManager) -> Self {
583        Self { log_manager }
584    }
585}
586
587#[async_trait::async_trait]
588impl ReloadHook for AuditReloadHook {
589    async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> SentinelResult<()> {
590        // Log that reload is starting
591        let trace_id = uuid::Uuid::new_v4().to_string();
592        let audit_entry = AuditLogEntry::config_change(
593            &trace_id,
594            "reload_started",
595            format!(
596                "Configuration reload starting: {} routes -> {} routes, {} upstreams -> {} upstreams",
597                old_config.routes.len(),
598                new_config.routes.len(),
599                old_config.upstreams.len(),
600                new_config.upstreams.len()
601            ),
602        );
603        self.log_manager.log_audit(&audit_entry);
604        Ok(())
605    }
606
607    async fn post_reload(&self, old_config: &Config, new_config: &Config) {
608        // Log successful reload
609        let trace_id = uuid::Uuid::new_v4().to_string();
610        let audit_entry = AuditLogEntry::config_change(
611            &trace_id,
612            "reload_success",
613            format!(
614                "Configuration reload successful: {} routes, {} upstreams, {} listeners",
615                new_config.routes.len(),
616                new_config.upstreams.len(),
617                new_config.listeners.len()
618            ),
619        )
620        .with_metadata("old_routes", old_config.routes.len().to_string())
621        .with_metadata("new_routes", new_config.routes.len().to_string())
622        .with_metadata("old_upstreams", old_config.upstreams.len().to_string())
623        .with_metadata("new_upstreams", new_config.upstreams.len().to_string());
624        self.log_manager.log_audit(&audit_entry);
625    }
626
627    async fn on_failure(&self, config: &Config, error: &SentinelError) {
628        // Log failed reload
629        let trace_id = uuid::Uuid::new_v4().to_string();
630        let audit_entry = AuditLogEntry::config_change(
631            &trace_id,
632            "reload_failed",
633            format!("Configuration reload failed: {}", error),
634        )
635        .with_metadata("current_routes", config.routes.len().to_string())
636        .with_metadata("current_upstreams", config.upstreams.len().to_string());
637        self.log_manager.log_audit(&audit_entry);
638    }
639
640    fn name(&self) -> &str {
641        "audit_reload_hook"
642    }
643}
644
645#[cfg(test)]
646mod tests {
647    use super::*;
648
649    #[tokio::test]
650    async fn test_config_reload_rejects_invalid_config() {
651        // Create valid initial config
652        let initial_config = Config::default_for_testing();
653        let initial_routes = initial_config.routes.len();
654
655        let temp_dir = tempfile::tempdir().unwrap();
656        let config_path = temp_dir.path().join("config.kdl");
657
658        // Write INVALID config (not valid KDL)
659        std::fs::write(&config_path, "this is not valid KDL { {{{{ broken").unwrap();
660
661        // Create config manager with valid initial config
662        let manager = ConfigManager::new(&config_path, initial_config)
663            .await
664            .unwrap();
665
666        // Verify initial config is loaded
667        assert_eq!(manager.current().routes.len(), initial_routes);
668
669        // Attempt reload with invalid config - should fail
670        let result = manager.reload(ReloadTrigger::Manual).await;
671        assert!(result.is_err(), "Reload should fail for invalid config");
672
673        // Verify original config is STILL loaded (not replaced)
674        assert_eq!(
675            manager.current().routes.len(),
676            initial_routes,
677            "Original config should be preserved after failed reload"
678        );
679
680        // Verify failure was recorded in stats
681        assert_eq!(
682            manager
683                .stats()
684                .failed_reloads
685                .load(std::sync::atomic::Ordering::Relaxed),
686            1,
687            "Failed reload should be recorded"
688        );
689    }
690
691    #[tokio::test]
692    async fn test_config_reload_accepts_valid_config() {
693        // Create valid initial config
694        let initial_config = Config::default_for_testing();
695        let temp_dir = tempfile::tempdir().unwrap();
696        let config_path = temp_dir.path().join("config.kdl");
697
698        // Create a static files directory for the test
699        let static_dir = temp_dir.path().join("static");
700        std::fs::create_dir_all(&static_dir).unwrap();
701
702        // Write a valid config with upstream
703        let valid_config = r#"
704server {
705    worker-threads 4
706}
707
708listeners {
709    listener "http" {
710        address "0.0.0.0:8080"
711        protocol "http"
712    }
713}
714
715upstreams {
716    upstream "backend" {
717        target "127.0.0.1:3000"
718    }
719}
720
721routes {
722    route "api" {
723        priority "high"
724        matches {
725            path-prefix "/api/"
726        }
727        upstream "backend"
728    }
729}
730"#;
731        std::fs::write(&config_path, valid_config).unwrap();
732
733        // Create config manager
734        let manager = ConfigManager::new(&config_path, initial_config)
735            .await
736            .unwrap();
737
738        // Reload should succeed with valid config
739        let result = manager.reload(ReloadTrigger::Manual).await;
740        assert!(
741            result.is_ok(),
742            "Reload should succeed for valid config: {:?}",
743            result.err()
744        );
745
746        // Verify success was recorded
747        assert_eq!(
748            manager
749                .stats()
750                .successful_reloads
751                .load(std::sync::atomic::Ordering::Relaxed),
752            1,
753            "Successful reload should be recorded"
754        );
755    }
756}