sentinel_proxy/reload/
mod.rs

1//! Configuration hot reload module for Sentinel proxy.
2//!
3//! This module implements zero-downtime configuration reloading with validation,
4//! atomic swaps, and rollback support for production reliability.
5//!
6//! ## Submodules
7//!
8//! - [`coordinator`]: Graceful reload coordination and request draining
9//! - [`signals`]: OS signal handling (SIGHUP, SIGTERM)
10//! - [`validators`]: Runtime configuration validators
11
12mod coordinator;
13mod signals;
14mod validators;
15
16pub use coordinator::GracefulReloadCoordinator;
17pub use signals::{SignalManager, SignalType};
18pub use validators::{RouteValidator, UpstreamValidator};
19
20use arc_swap::ArcSwap;
21use notify::{Event, EventKind, RecursiveMode, Watcher};
22use std::path::{Path, PathBuf};
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25use tokio::sync::{broadcast, RwLock};
26use tracing::{debug, error, info, warn};
27
28use sentinel_common::errors::{SentinelError, SentinelResult};
29use sentinel_config::Config;
30
31// ============================================================================
32// Reload Events and Types
33// ============================================================================
34
35/// Reload event types
36#[derive(Debug, Clone)]
37pub enum ReloadEvent {
38    /// Configuration reload started
39    Started {
40        timestamp: Instant,
41        trigger: ReloadTrigger,
42    },
43    /// Configuration validated successfully
44    Validated { timestamp: Instant },
45    /// Configuration applied successfully
46    Applied { timestamp: Instant, version: String },
47    /// Configuration reload failed
48    Failed { timestamp: Instant, error: String },
49    /// Configuration rolled back
50    RolledBack { timestamp: Instant, reason: String },
51}
52
53/// Reload trigger source
54#[derive(Debug, Clone)]
55pub enum ReloadTrigger {
56    /// Manual reload via API
57    Manual,
58    /// File change detected
59    FileChange,
60    /// Signal received (SIGHUP)
61    Signal,
62    /// Scheduled reload
63    Scheduled,
64}
65
66// ============================================================================
67// Traits
68// ============================================================================
69
70/// Configuration validator trait
71#[async_trait::async_trait]
72pub trait ConfigValidator: Send + Sync {
73    /// Validate configuration before applying
74    async fn validate(&self, config: &Config) -> SentinelResult<()>;
75
76    /// Validator name for logging
77    fn name(&self) -> &str;
78}
79
80/// Reload hook trait for custom actions
81#[async_trait::async_trait]
82pub trait ReloadHook: Send + Sync {
83    /// Called before reload starts
84    async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> SentinelResult<()>;
85
86    /// Called after successful reload
87    async fn post_reload(&self, old_config: &Config, new_config: &Config);
88
89    /// Called on reload failure
90    async fn on_failure(&self, config: &Config, error: &SentinelError);
91
92    /// Hook name for logging
93    fn name(&self) -> &str;
94}
95
96// ============================================================================
97// Reload Statistics
98// ============================================================================
99
100/// Reload statistics
101#[derive(Default)]
102pub struct ReloadStats {
103    /// Total reload attempts
104    pub total_reloads: std::sync::atomic::AtomicU64,
105    /// Successful reloads
106    pub successful_reloads: std::sync::atomic::AtomicU64,
107    /// Failed reloads
108    pub failed_reloads: std::sync::atomic::AtomicU64,
109    /// Rollbacks performed
110    pub rollbacks: std::sync::atomic::AtomicU64,
111    /// Last successful reload time
112    pub last_success: RwLock<Option<Instant>>,
113    /// Last failure time
114    pub last_failure: RwLock<Option<Instant>>,
115    /// Average reload duration
116    pub avg_duration_ms: RwLock<f64>,
117}
118
119// ============================================================================
120// Configuration Manager
121// ============================================================================
122
123/// Configuration manager with hot reload support
124pub struct ConfigManager {
125    /// Current active configuration
126    current_config: Arc<ArcSwap<Config>>,
127    /// Previous configuration for rollback
128    previous_config: Arc<RwLock<Option<Arc<Config>>>>,
129    /// Configuration file path
130    config_path: PathBuf,
131    /// File watcher for auto-reload (uses RwLock for interior mutability)
132    watcher: Arc<RwLock<Option<notify::RecommendedWatcher>>>,
133    /// Reload event broadcaster
134    reload_tx: broadcast::Sender<ReloadEvent>,
135    /// Reload statistics
136    stats: Arc<ReloadStats>,
137    /// Validation hooks
138    validators: Arc<RwLock<Vec<Box<dyn ConfigValidator>>>>,
139    /// Reload hooks
140    reload_hooks: Arc<RwLock<Vec<Box<dyn ReloadHook>>>>,
141}
142
143impl ConfigManager {
144    /// Create new configuration manager
145    pub async fn new(
146        config_path: impl AsRef<Path>,
147        initial_config: Config,
148    ) -> SentinelResult<Self> {
149        let config_path = config_path.as_ref().to_path_buf();
150        let (reload_tx, _) = broadcast::channel(100);
151
152        info!("Initializing configuration manager for: {:?}", config_path);
153
154        Ok(Self {
155            current_config: Arc::new(ArcSwap::from_pointee(initial_config)),
156            previous_config: Arc::new(RwLock::new(None)),
157            config_path,
158            watcher: Arc::new(RwLock::new(None)),
159            reload_tx,
160            stats: Arc::new(ReloadStats::default()),
161            validators: Arc::new(RwLock::new(Vec::new())),
162            reload_hooks: Arc::new(RwLock::new(Vec::new())),
163        })
164    }
165
166    /// Get current configuration
167    pub fn current(&self) -> Arc<Config> {
168        self.current_config.load_full()
169    }
170
171    /// Start watching configuration file for changes
172    ///
173    /// When enabled, the proxy will automatically reload configuration
174    /// when the config file is modified.
175    pub async fn start_watching(&self) -> SentinelResult<()> {
176        // Check if already watching
177        if self.watcher.read().await.is_some() {
178            warn!("File watcher already active, skipping");
179            return Ok(());
180        }
181
182        let config_path = self.config_path.clone();
183
184        // Create file watcher
185        let (tx, mut rx) = tokio::sync::mpsc::channel(10);
186
187        let mut watcher =
188            notify::recommended_watcher(move |event: Result<Event, notify::Error>| {
189                if let Ok(event) = event {
190                    let _ = tx.blocking_send(event);
191                }
192            })
193            .map_err(|e| SentinelError::Config {
194                message: format!("Failed to create file watcher: {}", e),
195                source: None,
196            })?;
197
198        // Watch config file
199        watcher
200            .watch(&config_path, RecursiveMode::NonRecursive)
201            .map_err(|e| SentinelError::Config {
202                message: format!("Failed to watch config file: {}", e),
203                source: None,
204            })?;
205
206        // Store watcher using interior mutability
207        *self.watcher.write().await = Some(watcher);
208
209        // Spawn event handler task
210        let manager = Arc::new(self.clone_for_task());
211        tokio::spawn(async move {
212            while let Some(event) = rx.recv().await {
213                if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_)) {
214                    info!("Configuration file changed, triggering reload");
215
216                    // Debounce rapid changes
217                    tokio::time::sleep(Duration::from_millis(100)).await;
218
219                    if let Err(e) = manager.reload(ReloadTrigger::FileChange).await {
220                        error!("Auto-reload failed: {}", e);
221                        error!("Continuing with current configuration");
222                    }
223                }
224            }
225        });
226
227        info!(
228            "Auto-reload enabled: watching configuration file {:?}",
229            self.config_path
230        );
231        Ok(())
232    }
233
234    /// Reload configuration
235    pub async fn reload(&self, trigger: ReloadTrigger) -> SentinelResult<()> {
236        let start = Instant::now();
237        self.stats
238            .total_reloads
239            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
240
241        info!("Starting configuration reload (trigger: {:?})", trigger);
242
243        // Notify reload started
244        let _ = self.reload_tx.send(ReloadEvent::Started {
245            timestamp: Instant::now(),
246            trigger: trigger.clone(),
247        });
248
249        // Load new configuration
250        let new_config = match Config::from_file(&self.config_path) {
251            Ok(config) => config,
252            Err(e) => {
253                let error_msg = format!("Failed to load configuration: {}", e);
254                error!("{}", error_msg);
255                self.stats
256                    .failed_reloads
257                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
258                *self.stats.last_failure.write().await = Some(Instant::now());
259
260                let _ = self.reload_tx.send(ReloadEvent::Failed {
261                    timestamp: Instant::now(),
262                    error: error_msg.clone(),
263                });
264
265                return Err(SentinelError::Config {
266                    message: error_msg,
267                    source: None,
268                });
269            }
270        };
271
272        // Validate new configuration BEFORE applying
273        // This is critical - invalid configs must never be loaded
274        if let Err(e) = self.validate_config(&new_config).await {
275            error!("Configuration validation failed: {}", e);
276            error!("REJECTED: New configuration will NOT be applied. Continuing with current configuration.");
277            self.stats
278                .failed_reloads
279                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
280            *self.stats.last_failure.write().await = Some(Instant::now());
281
282            let _ = self.reload_tx.send(ReloadEvent::Failed {
283                timestamp: Instant::now(),
284                error: e.to_string(),
285            });
286
287            return Err(e);
288        }
289
290        info!("Configuration validation passed, applying new configuration");
291
292        let _ = self.reload_tx.send(ReloadEvent::Validated {
293            timestamp: Instant::now(),
294        });
295
296        // Get current config for rollback
297        let old_config = self.current_config.load_full();
298
299        // Run pre-reload hooks
300        for hook in self.reload_hooks.read().await.iter() {
301            if let Err(e) = hook.pre_reload(&old_config, &new_config).await {
302                warn!("Pre-reload hook '{}' failed: {}", hook.name(), e);
303                // Continue with reload despite hook failure
304            }
305        }
306
307        // Save previous config for rollback
308        *self.previous_config.write().await = Some(old_config.clone());
309
310        // Apply new configuration atomically
311        self.current_config.store(Arc::new(new_config.clone()));
312
313        // Run post-reload hooks
314        for hook in self.reload_hooks.read().await.iter() {
315            hook.post_reload(&old_config, &new_config).await;
316        }
317
318        // Update statistics
319        let duration = start.elapsed();
320        self.stats
321            .successful_reloads
322            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
323        *self.stats.last_success.write().await = Some(Instant::now());
324
325        // Update average duration
326        {
327            let mut avg = self.stats.avg_duration_ms.write().await;
328            let total = self
329                .stats
330                .successful_reloads
331                .load(std::sync::atomic::Ordering::Relaxed) as f64;
332            *avg = (*avg * (total - 1.0) + duration.as_millis() as f64) / total;
333        }
334
335        let _ = self.reload_tx.send(ReloadEvent::Applied {
336            timestamp: Instant::now(),
337            version: format!("{:?}", Instant::now()), // TODO: Use proper versioning
338        });
339
340        info!(
341            "Configuration reload completed successfully in {:?}",
342            duration
343        );
344
345        Ok(())
346    }
347
348    /// Rollback to previous configuration
349    pub async fn rollback(&self, reason: String) -> SentinelResult<()> {
350        let previous = self.previous_config.read().await.clone();
351
352        if let Some(prev_config) = previous {
353            info!("Rolling back configuration: {}", reason);
354
355            // Validate previous config (should always pass)
356            if let Err(e) = self.validate_config(&prev_config).await {
357                error!("Previous configuration validation failed: {}", e);
358                return Err(e);
359            }
360
361            // Apply previous configuration
362            self.current_config.store(prev_config);
363            self.stats
364                .rollbacks
365                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
366
367            let _ = self.reload_tx.send(ReloadEvent::RolledBack {
368                timestamp: Instant::now(),
369                reason: reason.clone(),
370            });
371
372            info!("Configuration rolled back successfully");
373            Ok(())
374        } else {
375            warn!("No previous configuration available for rollback");
376            Err(SentinelError::Config {
377                message: "No previous configuration available".to_string(),
378                source: None,
379            })
380        }
381    }
382
383    /// Validate configuration
384    async fn validate_config(&self, config: &Config) -> SentinelResult<()> {
385        // Built-in validation
386        config.validate()?;
387
388        // Run custom validators
389        for validator in self.validators.read().await.iter() {
390            debug!("Running validator: {}", validator.name());
391            validator.validate(config).await.map_err(|e| {
392                error!("Validator '{}' failed: {}", validator.name(), e);
393                e
394            })?;
395        }
396
397        Ok(())
398    }
399
400    /// Add configuration validator
401    pub async fn add_validator(&self, validator: Box<dyn ConfigValidator>) {
402        info!("Adding configuration validator: {}", validator.name());
403        self.validators.write().await.push(validator);
404    }
405
406    /// Add reload hook
407    pub async fn add_hook(&self, hook: Box<dyn ReloadHook>) {
408        info!("Adding reload hook: {}", hook.name());
409        self.reload_hooks.write().await.push(hook);
410    }
411
412    /// Subscribe to reload events
413    pub fn subscribe(&self) -> broadcast::Receiver<ReloadEvent> {
414        self.reload_tx.subscribe()
415    }
416
417    /// Get reload statistics
418    pub fn stats(&self) -> &ReloadStats {
419        &self.stats
420    }
421
422    /// Create a lightweight clone for async tasks
423    fn clone_for_task(&self) -> ConfigManager {
424        ConfigManager {
425            current_config: Arc::clone(&self.current_config),
426            previous_config: Arc::clone(&self.previous_config),
427            config_path: self.config_path.clone(),
428            watcher: self.watcher.clone(),
429            reload_tx: self.reload_tx.clone(),
430            stats: Arc::clone(&self.stats),
431            validators: Arc::clone(&self.validators),
432            reload_hooks: Arc::clone(&self.reload_hooks),
433        }
434    }
435}
436
437#[cfg(test)]
438mod tests {
439    use super::*;
440
441    #[tokio::test]
442    async fn test_config_reload_rejects_invalid_config() {
443        // Create valid initial config
444        let initial_config = Config::default_for_testing();
445        let initial_routes = initial_config.routes.len();
446
447        let temp_dir = tempfile::tempdir().unwrap();
448        let config_path = temp_dir.path().join("config.kdl");
449
450        // Write INVALID config (not valid KDL)
451        std::fs::write(&config_path, "this is not valid KDL { {{{{ broken").unwrap();
452
453        // Create config manager with valid initial config
454        let manager = ConfigManager::new(&config_path, initial_config)
455            .await
456            .unwrap();
457
458        // Verify initial config is loaded
459        assert_eq!(manager.current().routes.len(), initial_routes);
460
461        // Attempt reload with invalid config - should fail
462        let result = manager.reload(ReloadTrigger::Manual).await;
463        assert!(result.is_err(), "Reload should fail for invalid config");
464
465        // Verify original config is STILL loaded (not replaced)
466        assert_eq!(
467            manager.current().routes.len(),
468            initial_routes,
469            "Original config should be preserved after failed reload"
470        );
471
472        // Verify failure was recorded in stats
473        assert_eq!(
474            manager
475                .stats()
476                .failed_reloads
477                .load(std::sync::atomic::Ordering::Relaxed),
478            1,
479            "Failed reload should be recorded"
480        );
481    }
482
483    #[tokio::test]
484    async fn test_config_reload_accepts_valid_config() {
485        // Create valid initial config
486        let initial_config = Config::default_for_testing();
487        let temp_dir = tempfile::tempdir().unwrap();
488        let config_path = temp_dir.path().join("config.kdl");
489
490        // Create a static files directory for the test
491        let static_dir = temp_dir.path().join("static");
492        std::fs::create_dir_all(&static_dir).unwrap();
493
494        // Write a valid config with upstream
495        let valid_config = r#"
496server {
497    worker-threads 4
498}
499
500listeners {
501    listener "http" {
502        address "0.0.0.0:8080"
503        protocol "http"
504    }
505}
506
507upstreams {
508    upstream "backend" {
509        target "127.0.0.1:3000"
510    }
511}
512
513routes {
514    route "api" {
515        priority "high"
516        matches {
517            path-prefix "/api/"
518        }
519        upstream "backend"
520    }
521}
522"#;
523        std::fs::write(&config_path, valid_config).unwrap();
524
525        // Create config manager
526        let manager = ConfigManager::new(&config_path, initial_config)
527            .await
528            .unwrap();
529
530        // Reload should succeed with valid config
531        let result = manager.reload(ReloadTrigger::Manual).await;
532        assert!(
533            result.is_ok(),
534            "Reload should succeed for valid config: {:?}",
535            result.err()
536        );
537
538        // Verify success was recorded
539        assert_eq!(
540            manager
541                .stats()
542                .successful_reloads
543                .load(std::sync::atomic::Ordering::Relaxed),
544            1,
545            "Successful reload should be recorded"
546        );
547    }
548}