sentinel_proxy/reload/
mod.rs

1//! Configuration hot reload module for Sentinel proxy.
2//!
3//! This module implements zero-downtime configuration reloading with validation,
4//! atomic swaps, and rollback support for production reliability.
5//!
6//! ## Submodules
7//!
8//! - [`coordinator`]: Graceful reload coordination and request draining
9//! - [`signals`]: OS signal handling (SIGHUP, SIGTERM)
10//! - [`validators`]: Runtime configuration validators
11
12mod coordinator;
13mod signals;
14mod validators;
15
16pub use coordinator::GracefulReloadCoordinator;
17pub use signals::{SignalManager, SignalType};
18pub use validators::{RouteValidator, UpstreamValidator};
19
20use arc_swap::ArcSwap;
21use notify::{Event, EventKind, RecursiveMode, Watcher};
22use std::path::{Path, PathBuf};
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25use tokio::sync::{broadcast, RwLock};
26use tracing::{debug, error, info, trace, warn};
27
28use sentinel_common::errors::{SentinelError, SentinelResult};
29use sentinel_config::Config;
30
31// ============================================================================
32// Reload Events and Types
33// ============================================================================
34
35/// Reload event types
36#[derive(Debug, Clone)]
37pub enum ReloadEvent {
38    /// Configuration reload started
39    Started {
40        timestamp: Instant,
41        trigger: ReloadTrigger,
42    },
43    /// Configuration validated successfully
44    Validated { timestamp: Instant },
45    /// Configuration applied successfully
46    Applied { timestamp: Instant, version: String },
47    /// Configuration reload failed
48    Failed { timestamp: Instant, error: String },
49    /// Configuration rolled back
50    RolledBack { timestamp: Instant, reason: String },
51}
52
53/// Reload trigger source
54#[derive(Debug, Clone)]
55pub enum ReloadTrigger {
56    /// Manual reload via API
57    Manual,
58    /// File change detected
59    FileChange,
60    /// Signal received (SIGHUP)
61    Signal,
62    /// Scheduled reload
63    Scheduled,
64}
65
66// ============================================================================
67// Traits
68// ============================================================================
69
70/// Configuration validator trait
71#[async_trait::async_trait]
72pub trait ConfigValidator: Send + Sync {
73    /// Validate configuration before applying
74    async fn validate(&self, config: &Config) -> SentinelResult<()>;
75
76    /// Validator name for logging
77    fn name(&self) -> &str;
78}
79
80/// Reload hook trait for custom actions
81#[async_trait::async_trait]
82pub trait ReloadHook: Send + Sync {
83    /// Called before reload starts
84    async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> SentinelResult<()>;
85
86    /// Called after successful reload
87    async fn post_reload(&self, old_config: &Config, new_config: &Config);
88
89    /// Called on reload failure
90    async fn on_failure(&self, config: &Config, error: &SentinelError);
91
92    /// Hook name for logging
93    fn name(&self) -> &str;
94}
95
96// ============================================================================
97// Reload Statistics
98// ============================================================================
99
100/// Reload statistics
101#[derive(Default)]
102pub struct ReloadStats {
103    /// Total reload attempts
104    pub total_reloads: std::sync::atomic::AtomicU64,
105    /// Successful reloads
106    pub successful_reloads: std::sync::atomic::AtomicU64,
107    /// Failed reloads
108    pub failed_reloads: std::sync::atomic::AtomicU64,
109    /// Rollbacks performed
110    pub rollbacks: std::sync::atomic::AtomicU64,
111    /// Last successful reload time
112    pub last_success: RwLock<Option<Instant>>,
113    /// Last failure time
114    pub last_failure: RwLock<Option<Instant>>,
115    /// Average reload duration
116    pub avg_duration_ms: RwLock<f64>,
117}
118
119// ============================================================================
120// Configuration Manager
121// ============================================================================
122
123/// Configuration manager with hot reload support
124pub struct ConfigManager {
125    /// Current active configuration
126    current_config: Arc<ArcSwap<Config>>,
127    /// Previous configuration for rollback
128    previous_config: Arc<RwLock<Option<Arc<Config>>>>,
129    /// Configuration file path
130    config_path: PathBuf,
131    /// File watcher for auto-reload (uses RwLock for interior mutability)
132    watcher: Arc<RwLock<Option<notify::RecommendedWatcher>>>,
133    /// Reload event broadcaster
134    reload_tx: broadcast::Sender<ReloadEvent>,
135    /// Reload statistics
136    stats: Arc<ReloadStats>,
137    /// Validation hooks
138    validators: Arc<RwLock<Vec<Box<dyn ConfigValidator>>>>,
139    /// Reload hooks
140    reload_hooks: Arc<RwLock<Vec<Box<dyn ReloadHook>>>>,
141}
142
143impl ConfigManager {
144    /// Create new configuration manager
145    pub async fn new(
146        config_path: impl AsRef<Path>,
147        initial_config: Config,
148    ) -> SentinelResult<Self> {
149        let config_path = config_path.as_ref().to_path_buf();
150        let (reload_tx, _) = broadcast::channel(100);
151
152        info!(
153            config_path = %config_path.display(),
154            route_count = initial_config.routes.len(),
155            upstream_count = initial_config.upstreams.len(),
156            listener_count = initial_config.listeners.len(),
157            "Initializing configuration manager"
158        );
159
160        trace!(
161            config_path = %config_path.display(),
162            "Creating ArcSwap for configuration"
163        );
164
165        Ok(Self {
166            current_config: Arc::new(ArcSwap::from_pointee(initial_config)),
167            previous_config: Arc::new(RwLock::new(None)),
168            config_path,
169            watcher: Arc::new(RwLock::new(None)),
170            reload_tx,
171            stats: Arc::new(ReloadStats::default()),
172            validators: Arc::new(RwLock::new(Vec::new())),
173            reload_hooks: Arc::new(RwLock::new(Vec::new())),
174        })
175    }
176
177    /// Get current configuration
178    pub fn current(&self) -> Arc<Config> {
179        self.current_config.load_full()
180    }
181
182    /// Start watching configuration file for changes
183    ///
184    /// When enabled, the proxy will automatically reload configuration
185    /// when the config file is modified.
186    pub async fn start_watching(&self) -> SentinelResult<()> {
187        // Check if already watching
188        if self.watcher.read().await.is_some() {
189            warn!("File watcher already active, skipping");
190            return Ok(());
191        }
192
193        let config_path = self.config_path.clone();
194
195        // Create file watcher
196        let (tx, mut rx) = tokio::sync::mpsc::channel(10);
197
198        let mut watcher =
199            notify::recommended_watcher(move |event: Result<Event, notify::Error>| {
200                if let Ok(event) = event {
201                    let _ = tx.blocking_send(event);
202                }
203            })
204            .map_err(|e| SentinelError::Config {
205                message: format!("Failed to create file watcher: {}", e),
206                source: None,
207            })?;
208
209        // Watch config file
210        watcher
211            .watch(&config_path, RecursiveMode::NonRecursive)
212            .map_err(|e| SentinelError::Config {
213                message: format!("Failed to watch config file: {}", e),
214                source: None,
215            })?;
216
217        // Store watcher using interior mutability
218        *self.watcher.write().await = Some(watcher);
219
220        // Spawn event handler task
221        let manager = Arc::new(self.clone_for_task());
222        tokio::spawn(async move {
223            while let Some(event) = rx.recv().await {
224                if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_)) {
225                    info!("Configuration file changed, triggering reload");
226
227                    // Debounce rapid changes
228                    tokio::time::sleep(Duration::from_millis(100)).await;
229
230                    if let Err(e) = manager.reload(ReloadTrigger::FileChange).await {
231                        error!("Auto-reload failed: {}", e);
232                        error!("Continuing with current configuration");
233                    }
234                }
235            }
236        });
237
238        info!(
239            "Auto-reload enabled: watching configuration file {:?}",
240            self.config_path
241        );
242        Ok(())
243    }
244
245    /// Reload configuration
246    pub async fn reload(&self, trigger: ReloadTrigger) -> SentinelResult<()> {
247        let start = Instant::now();
248        let reload_num = self.stats
249            .total_reloads
250            .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
251
252        info!(
253            trigger = ?trigger,
254            reload_num = reload_num,
255            config_path = %self.config_path.display(),
256            "Starting configuration reload"
257        );
258
259        // Notify reload started
260        let _ = self.reload_tx.send(ReloadEvent::Started {
261            timestamp: Instant::now(),
262            trigger: trigger.clone(),
263        });
264
265        trace!(
266            config_path = %self.config_path.display(),
267            "Reading configuration file"
268        );
269
270        // Load new configuration
271        let new_config = match Config::from_file(&self.config_path) {
272            Ok(config) => {
273                debug!(
274                    route_count = config.routes.len(),
275                    upstream_count = config.upstreams.len(),
276                    listener_count = config.listeners.len(),
277                    "Configuration file parsed successfully"
278                );
279                config
280            }
281            Err(e) => {
282                let error_msg = format!("Failed to load configuration: {}", e);
283                error!(
284                    config_path = %self.config_path.display(),
285                    error = %e,
286                    "Failed to load configuration file"
287                );
288                self.stats
289                    .failed_reloads
290                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
291                *self.stats.last_failure.write().await = Some(Instant::now());
292
293                let _ = self.reload_tx.send(ReloadEvent::Failed {
294                    timestamp: Instant::now(),
295                    error: error_msg.clone(),
296                });
297
298                return Err(SentinelError::Config {
299                    message: error_msg,
300                    source: None,
301                });
302            }
303        };
304
305        trace!("Starting configuration validation");
306
307        // Validate new configuration BEFORE applying
308        // This is critical - invalid configs must never be loaded
309        if let Err(e) = self.validate_config(&new_config).await {
310            error!(
311                error = %e,
312                "Configuration validation failed - new configuration REJECTED"
313            );
314            self.stats
315                .failed_reloads
316                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
317            *self.stats.last_failure.write().await = Some(Instant::now());
318
319            let _ = self.reload_tx.send(ReloadEvent::Failed {
320                timestamp: Instant::now(),
321                error: e.to_string(),
322            });
323
324            return Err(e);
325        }
326
327        info!(
328            route_count = new_config.routes.len(),
329            upstream_count = new_config.upstreams.len(),
330            "Configuration validation passed, applying new configuration"
331        );
332
333        let _ = self.reload_tx.send(ReloadEvent::Validated {
334            timestamp: Instant::now(),
335        });
336
337        // Get current config for rollback
338        let old_config = self.current_config.load_full();
339
340        trace!(
341            old_routes = old_config.routes.len(),
342            new_routes = new_config.routes.len(),
343            "Preparing configuration swap"
344        );
345
346        // Run pre-reload hooks
347        let hooks = self.reload_hooks.read().await;
348        for hook in hooks.iter() {
349            trace!(hook_name = %hook.name(), "Running pre-reload hook");
350            if let Err(e) = hook.pre_reload(&old_config, &new_config).await {
351                warn!(
352                    hook_name = %hook.name(),
353                    error = %e,
354                    "Pre-reload hook failed"
355                );
356                // Continue with reload despite hook failure
357            }
358        }
359        drop(hooks);
360
361        // Save previous config for rollback
362        trace!("Saving previous configuration for potential rollback");
363        *self.previous_config.write().await = Some(old_config.clone());
364
365        // Apply new configuration atomically
366        trace!("Applying new configuration atomically");
367        self.current_config.store(Arc::new(new_config.clone()));
368
369        // Run post-reload hooks
370        let hooks = self.reload_hooks.read().await;
371        for hook in hooks.iter() {
372            trace!(hook_name = %hook.name(), "Running post-reload hook");
373            hook.post_reload(&old_config, &new_config).await;
374        }
375        drop(hooks);
376
377        // Update statistics
378        let duration = start.elapsed();
379        let successful_count = self.stats
380            .successful_reloads
381            .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
382        *self.stats.last_success.write().await = Some(Instant::now());
383
384        // Update average duration
385        {
386            let mut avg = self.stats.avg_duration_ms.write().await;
387            let total = successful_count as f64;
388            *avg = (*avg * (total - 1.0) + duration.as_millis() as f64) / total;
389        }
390
391        let _ = self.reload_tx.send(ReloadEvent::Applied {
392            timestamp: Instant::now(),
393            version: format!("{:?}", Instant::now()), // TODO: Use proper versioning
394        });
395
396        info!(
397            duration_ms = duration.as_millis(),
398            successful_reloads = successful_count,
399            route_count = new_config.routes.len(),
400            upstream_count = new_config.upstreams.len(),
401            "Configuration reload completed successfully"
402        );
403
404        Ok(())
405    }
406
407    /// Rollback to previous configuration
408    pub async fn rollback(&self, reason: String) -> SentinelResult<()> {
409        info!(
410            reason = %reason,
411            "Starting configuration rollback"
412        );
413
414        let previous = self.previous_config.read().await.clone();
415
416        if let Some(prev_config) = previous {
417            trace!(
418                route_count = prev_config.routes.len(),
419                "Found previous configuration for rollback"
420            );
421
422            // Validate previous config (should always pass)
423            trace!("Validating previous configuration");
424            if let Err(e) = self.validate_config(&prev_config).await {
425                error!(
426                    error = %e,
427                    "Previous configuration validation failed during rollback"
428                );
429                return Err(e);
430            }
431
432            // Apply previous configuration
433            trace!("Applying previous configuration");
434            self.current_config.store(prev_config.clone());
435            let rollback_count = self.stats
436                .rollbacks
437                .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
438
439            let _ = self.reload_tx.send(ReloadEvent::RolledBack {
440                timestamp: Instant::now(),
441                reason: reason.clone(),
442            });
443
444            info!(
445                reason = %reason,
446                rollback_count = rollback_count,
447                route_count = prev_config.routes.len(),
448                "Configuration rolled back successfully"
449            );
450            Ok(())
451        } else {
452            warn!("No previous configuration available for rollback");
453            Err(SentinelError::Config {
454                message: "No previous configuration available".to_string(),
455                source: None,
456            })
457        }
458    }
459
460    /// Validate configuration
461    async fn validate_config(&self, config: &Config) -> SentinelResult<()> {
462        trace!(
463            route_count = config.routes.len(),
464            upstream_count = config.upstreams.len(),
465            "Starting configuration validation"
466        );
467
468        // Built-in validation
469        trace!("Running built-in config validation");
470        config.validate()?;
471
472        // Run custom validators
473        let validators = self.validators.read().await;
474        trace!(
475            validator_count = validators.len(),
476            "Running custom validators"
477        );
478        for validator in validators.iter() {
479            trace!(validator_name = %validator.name(), "Running validator");
480            validator.validate(config).await.map_err(|e| {
481                error!(
482                    validator_name = %validator.name(),
483                    error = %e,
484                    "Validator failed"
485                );
486                e
487            })?;
488        }
489
490        debug!(
491            route_count = config.routes.len(),
492            upstream_count = config.upstreams.len(),
493            "Configuration validation passed"
494        );
495
496        Ok(())
497    }
498
499    /// Add configuration validator
500    pub async fn add_validator(&self, validator: Box<dyn ConfigValidator>) {
501        info!("Adding configuration validator: {}", validator.name());
502        self.validators.write().await.push(validator);
503    }
504
505    /// Add reload hook
506    pub async fn add_hook(&self, hook: Box<dyn ReloadHook>) {
507        info!("Adding reload hook: {}", hook.name());
508        self.reload_hooks.write().await.push(hook);
509    }
510
511    /// Subscribe to reload events
512    pub fn subscribe(&self) -> broadcast::Receiver<ReloadEvent> {
513        self.reload_tx.subscribe()
514    }
515
516    /// Get reload statistics
517    pub fn stats(&self) -> &ReloadStats {
518        &self.stats
519    }
520
521    /// Create a lightweight clone for async tasks
522    fn clone_for_task(&self) -> ConfigManager {
523        ConfigManager {
524            current_config: Arc::clone(&self.current_config),
525            previous_config: Arc::clone(&self.previous_config),
526            config_path: self.config_path.clone(),
527            watcher: self.watcher.clone(),
528            reload_tx: self.reload_tx.clone(),
529            stats: Arc::clone(&self.stats),
530            validators: Arc::clone(&self.validators),
531            reload_hooks: Arc::clone(&self.reload_hooks),
532        }
533    }
534}
535
536#[cfg(test)]
537mod tests {
538    use super::*;
539
540    #[tokio::test]
541    async fn test_config_reload_rejects_invalid_config() {
542        // Create valid initial config
543        let initial_config = Config::default_for_testing();
544        let initial_routes = initial_config.routes.len();
545
546        let temp_dir = tempfile::tempdir().unwrap();
547        let config_path = temp_dir.path().join("config.kdl");
548
549        // Write INVALID config (not valid KDL)
550        std::fs::write(&config_path, "this is not valid KDL { {{{{ broken").unwrap();
551
552        // Create config manager with valid initial config
553        let manager = ConfigManager::new(&config_path, initial_config)
554            .await
555            .unwrap();
556
557        // Verify initial config is loaded
558        assert_eq!(manager.current().routes.len(), initial_routes);
559
560        // Attempt reload with invalid config - should fail
561        let result = manager.reload(ReloadTrigger::Manual).await;
562        assert!(result.is_err(), "Reload should fail for invalid config");
563
564        // Verify original config is STILL loaded (not replaced)
565        assert_eq!(
566            manager.current().routes.len(),
567            initial_routes,
568            "Original config should be preserved after failed reload"
569        );
570
571        // Verify failure was recorded in stats
572        assert_eq!(
573            manager
574                .stats()
575                .failed_reloads
576                .load(std::sync::atomic::Ordering::Relaxed),
577            1,
578            "Failed reload should be recorded"
579        );
580    }
581
582    #[tokio::test]
583    async fn test_config_reload_accepts_valid_config() {
584        // Create valid initial config
585        let initial_config = Config::default_for_testing();
586        let temp_dir = tempfile::tempdir().unwrap();
587        let config_path = temp_dir.path().join("config.kdl");
588
589        // Create a static files directory for the test
590        let static_dir = temp_dir.path().join("static");
591        std::fs::create_dir_all(&static_dir).unwrap();
592
593        // Write a valid config with upstream
594        let valid_config = r#"
595server {
596    worker-threads 4
597}
598
599listeners {
600    listener "http" {
601        address "0.0.0.0:8080"
602        protocol "http"
603    }
604}
605
606upstreams {
607    upstream "backend" {
608        target "127.0.0.1:3000"
609    }
610}
611
612routes {
613    route "api" {
614        priority "high"
615        matches {
616            path-prefix "/api/"
617        }
618        upstream "backend"
619    }
620}
621"#;
622        std::fs::write(&config_path, valid_config).unwrap();
623
624        // Create config manager
625        let manager = ConfigManager::new(&config_path, initial_config)
626            .await
627            .unwrap();
628
629        // Reload should succeed with valid config
630        let result = manager.reload(ReloadTrigger::Manual).await;
631        assert!(
632            result.is_ok(),
633            "Reload should succeed for valid config: {:?}",
634            result.err()
635        );
636
637        // Verify success was recorded
638        assert_eq!(
639            manager
640                .stats()
641                .successful_reloads
642                .load(std::sync::atomic::Ordering::Relaxed),
643            1,
644            "Successful reload should be recorded"
645        );
646    }
647}