1mod coordinator;
13mod signals;
14mod validators;
15
16pub use coordinator::GracefulReloadCoordinator;
17pub use signals::{SignalManager, SignalType};
18pub use validators::{RouteValidator, UpstreamValidator};
19
20use arc_swap::ArcSwap;
23use notify::{Event, EventKind, RecursiveMode, Watcher};
24use std::path::{Path, PathBuf};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tokio::sync::{broadcast, RwLock};
28use tracing::{debug, error, info, trace, warn};
29
30use sentinel_common::errors::{SentinelError, SentinelResult};
31use sentinel_config::Config;
32
33use crate::logging::{AuditLogEntry, SharedLogManager};
34use crate::tls::CertificateReloader;
35
36#[derive(Debug, Clone)]
42pub enum ReloadEvent {
43 Started {
45 timestamp: Instant,
46 trigger: ReloadTrigger,
47 },
48 Validated { timestamp: Instant },
50 Applied { timestamp: Instant, version: String },
52 Failed { timestamp: Instant, error: String },
54 RolledBack { timestamp: Instant, reason: String },
56}
57
58#[derive(Debug, Clone)]
60pub enum ReloadTrigger {
61 Manual,
63 FileChange,
65 Signal,
67 Scheduled,
69}
70
71#[async_trait::async_trait]
77pub trait ConfigValidator: Send + Sync {
78 async fn validate(&self, config: &Config) -> SentinelResult<()>;
80
81 fn name(&self) -> &str;
83}
84
85#[async_trait::async_trait]
87pub trait ReloadHook: Send + Sync {
88 async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> SentinelResult<()>;
90
91 async fn post_reload(&self, old_config: &Config, new_config: &Config);
93
94 async fn on_failure(&self, config: &Config, error: &SentinelError);
96
97 fn name(&self) -> &str;
99}
100
101#[derive(Default)]
107pub struct ReloadStats {
108 pub total_reloads: std::sync::atomic::AtomicU64,
110 pub successful_reloads: std::sync::atomic::AtomicU64,
112 pub failed_reloads: std::sync::atomic::AtomicU64,
114 pub rollbacks: std::sync::atomic::AtomicU64,
116 pub last_success: RwLock<Option<Instant>>,
118 pub last_failure: RwLock<Option<Instant>>,
120 pub avg_duration_ms: RwLock<f64>,
122}
123
124pub struct ConfigManager {
130 current_config: Arc<ArcSwap<Config>>,
132 previous_config: Arc<RwLock<Option<Arc<Config>>>>,
134 config_path: PathBuf,
136 watcher: Arc<RwLock<Option<notify::RecommendedWatcher>>>,
138 reload_tx: broadcast::Sender<ReloadEvent>,
140 stats: Arc<ReloadStats>,
142 validators: Arc<RwLock<Vec<Box<dyn ConfigValidator>>>>,
144 reload_hooks: Arc<RwLock<Vec<Box<dyn ReloadHook>>>>,
146 cert_reloader: Arc<CertificateReloader>,
148}
149
150impl ConfigManager {
151 pub async fn new(
153 config_path: impl AsRef<Path>,
154 initial_config: Config,
155 ) -> SentinelResult<Self> {
156 let config_path = config_path.as_ref().to_path_buf();
157 let (reload_tx, _) = broadcast::channel(100);
158
159 info!(
160 config_path = %config_path.display(),
161 route_count = initial_config.routes.len(),
162 upstream_count = initial_config.upstreams.len(),
163 listener_count = initial_config.listeners.len(),
164 "Initializing configuration manager"
165 );
166
167 trace!(
168 config_path = %config_path.display(),
169 "Creating ArcSwap for configuration"
170 );
171
172 Ok(Self {
173 current_config: Arc::new(ArcSwap::from_pointee(initial_config)),
174 previous_config: Arc::new(RwLock::new(None)),
175 config_path,
176 watcher: Arc::new(RwLock::new(None)),
177 reload_tx,
178 stats: Arc::new(ReloadStats::default()),
179 validators: Arc::new(RwLock::new(Vec::new())),
180 reload_hooks: Arc::new(RwLock::new(Vec::new())),
181 cert_reloader: Arc::new(CertificateReloader::new()),
182 })
183 }
184
185 pub fn cert_reloader(&self) -> Arc<CertificateReloader> {
187 Arc::clone(&self.cert_reloader)
188 }
189
190 pub fn current(&self) -> Arc<Config> {
192 self.current_config.load_full()
193 }
194
195 pub async fn start_watching(&self) -> SentinelResult<()> {
200 if self.watcher.read().await.is_some() {
202 warn!("File watcher already active, skipping");
203 return Ok(());
204 }
205
206 let config_path = self.config_path.clone();
207
208 let (tx, mut rx) = tokio::sync::mpsc::channel(10);
210
211 let mut watcher =
212 notify::recommended_watcher(move |event: Result<Event, notify::Error>| {
213 if let Ok(event) = event {
214 let _ = tx.blocking_send(event);
215 }
216 })
217 .map_err(|e| SentinelError::Config {
218 message: format!("Failed to create file watcher: {}", e),
219 source: None,
220 })?;
221
222 watcher
224 .watch(&config_path, RecursiveMode::NonRecursive)
225 .map_err(|e| SentinelError::Config {
226 message: format!("Failed to watch config file: {}", e),
227 source: None,
228 })?;
229
230 *self.watcher.write().await = Some(watcher);
232
233 let manager = Arc::new(self.clone_for_task());
235 tokio::spawn(async move {
236 while let Some(event) = rx.recv().await {
237 if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_)) {
238 info!("Configuration file changed, triggering reload");
239
240 tokio::time::sleep(Duration::from_millis(100)).await;
242
243 if let Err(e) = manager.reload(ReloadTrigger::FileChange).await {
244 error!("Auto-reload failed: {}", e);
245 error!("Continuing with current configuration");
246 }
247 }
248 }
249 });
250
251 info!(
252 "Auto-reload enabled: watching configuration file {:?}",
253 self.config_path
254 );
255 Ok(())
256 }
257
258 pub async fn reload(&self, trigger: ReloadTrigger) -> SentinelResult<()> {
260 let start = Instant::now();
261 let reload_num = self
262 .stats
263 .total_reloads
264 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
265 + 1;
266
267 info!(
268 trigger = ?trigger,
269 reload_num = reload_num,
270 config_path = %self.config_path.display(),
271 "Starting configuration reload"
272 );
273
274 let _ = self.reload_tx.send(ReloadEvent::Started {
276 timestamp: Instant::now(),
277 trigger: trigger.clone(),
278 });
279
280 trace!(
281 config_path = %self.config_path.display(),
282 "Reading configuration file"
283 );
284
285 let new_config = match Config::from_file(&self.config_path) {
287 Ok(config) => {
288 debug!(
289 route_count = config.routes.len(),
290 upstream_count = config.upstreams.len(),
291 listener_count = config.listeners.len(),
292 "Configuration file parsed successfully"
293 );
294 config
295 }
296 Err(e) => {
297 let error_msg = format!("Failed to load configuration: {}", e);
298 error!(
299 config_path = %self.config_path.display(),
300 error = %e,
301 "Failed to load configuration file"
302 );
303 self.stats
304 .failed_reloads
305 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
306 *self.stats.last_failure.write().await = Some(Instant::now());
307
308 let _ = self.reload_tx.send(ReloadEvent::Failed {
309 timestamp: Instant::now(),
310 error: error_msg.clone(),
311 });
312
313 return Err(SentinelError::Config {
314 message: error_msg,
315 source: None,
316 });
317 }
318 };
319
320 trace!("Starting configuration validation");
321
322 if let Err(e) = self.validate_config(&new_config).await {
325 error!(
326 error = %e,
327 "Configuration validation failed - new configuration REJECTED"
328 );
329 self.stats
330 .failed_reloads
331 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
332 *self.stats.last_failure.write().await = Some(Instant::now());
333
334 let _ = self.reload_tx.send(ReloadEvent::Failed {
335 timestamp: Instant::now(),
336 error: e.to_string(),
337 });
338
339 return Err(e);
340 }
341
342 info!(
343 route_count = new_config.routes.len(),
344 upstream_count = new_config.upstreams.len(),
345 "Configuration validation passed, applying new configuration"
346 );
347
348 let _ = self.reload_tx.send(ReloadEvent::Validated {
349 timestamp: Instant::now(),
350 });
351
352 let old_config = self.current_config.load_full();
354
355 trace!(
356 old_routes = old_config.routes.len(),
357 new_routes = new_config.routes.len(),
358 "Preparing configuration swap"
359 );
360
361 let hooks = self.reload_hooks.read().await;
363 for hook in hooks.iter() {
364 trace!(hook_name = %hook.name(), "Running pre-reload hook");
365 if let Err(e) = hook.pre_reload(&old_config, &new_config).await {
366 warn!(
367 hook_name = %hook.name(),
368 error = %e,
369 "Pre-reload hook failed"
370 );
371 }
373 }
374 drop(hooks);
375
376 trace!("Saving previous configuration for potential rollback");
378 *self.previous_config.write().await = Some(old_config.clone());
379
380 trace!("Applying new configuration atomically");
382 self.current_config.store(Arc::new(new_config.clone()));
383
384 let hooks = self.reload_hooks.read().await;
386 for hook in hooks.iter() {
387 trace!(hook_name = %hook.name(), "Running post-reload hook");
388 hook.post_reload(&old_config, &new_config).await;
389 }
390 drop(hooks);
391
392 let duration = start.elapsed();
394 let successful_count = self
395 .stats
396 .successful_reloads
397 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
398 + 1;
399 *self.stats.last_success.write().await = Some(Instant::now());
400
401 {
403 let mut avg = self.stats.avg_duration_ms.write().await;
404 let total = successful_count as f64;
405 *avg = (*avg * (total - 1.0) + duration.as_millis() as f64) / total;
406 }
407
408 let _ = self.reload_tx.send(ReloadEvent::Applied {
409 timestamp: Instant::now(),
410 version: format!("{:?}", Instant::now()), });
412
413 let (cert_success, cert_errors) = self.cert_reloader.reload_all();
416 if !cert_errors.is_empty() {
417 for (listener_id, error) in &cert_errors {
418 error!(
419 listener_id = %listener_id,
420 error = %error,
421 "TLS certificate reload failed for listener"
422 );
423 }
424 }
425
426 info!(
427 duration_ms = duration.as_millis(),
428 successful_reloads = successful_count,
429 route_count = new_config.routes.len(),
430 upstream_count = new_config.upstreams.len(),
431 cert_reload_success = cert_success,
432 cert_reload_errors = cert_errors.len(),
433 "Configuration reload completed successfully"
434 );
435
436 Ok(())
437 }
438
439 pub async fn rollback(&self, reason: String) -> SentinelResult<()> {
441 info!(
442 reason = %reason,
443 "Starting configuration rollback"
444 );
445
446 let previous = self.previous_config.read().await.clone();
447
448 if let Some(prev_config) = previous {
449 trace!(
450 route_count = prev_config.routes.len(),
451 "Found previous configuration for rollback"
452 );
453
454 trace!("Validating previous configuration");
456 if let Err(e) = self.validate_config(&prev_config).await {
457 error!(
458 error = %e,
459 "Previous configuration validation failed during rollback"
460 );
461 return Err(e);
462 }
463
464 trace!("Applying previous configuration");
466 self.current_config.store(prev_config.clone());
467 let rollback_count = self
468 .stats
469 .rollbacks
470 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
471 + 1;
472
473 let _ = self.reload_tx.send(ReloadEvent::RolledBack {
474 timestamp: Instant::now(),
475 reason: reason.clone(),
476 });
477
478 info!(
479 reason = %reason,
480 rollback_count = rollback_count,
481 route_count = prev_config.routes.len(),
482 "Configuration rolled back successfully"
483 );
484 Ok(())
485 } else {
486 warn!("No previous configuration available for rollback");
487 Err(SentinelError::Config {
488 message: "No previous configuration available".to_string(),
489 source: None,
490 })
491 }
492 }
493
494 async fn validate_config(&self, config: &Config) -> SentinelResult<()> {
496 trace!(
497 route_count = config.routes.len(),
498 upstream_count = config.upstreams.len(),
499 "Starting configuration validation"
500 );
501
502 trace!("Running built-in config validation");
504 config.validate()?;
505
506 let validators = self.validators.read().await;
508 trace!(
509 validator_count = validators.len(),
510 "Running custom validators"
511 );
512 for validator in validators.iter() {
513 trace!(validator_name = %validator.name(), "Running validator");
514 validator.validate(config).await.map_err(|e| {
515 error!(
516 validator_name = %validator.name(),
517 error = %e,
518 "Validator failed"
519 );
520 e
521 })?;
522 }
523
524 debug!(
525 route_count = config.routes.len(),
526 upstream_count = config.upstreams.len(),
527 "Configuration validation passed"
528 );
529
530 Ok(())
531 }
532
533 pub async fn add_validator(&self, validator: Box<dyn ConfigValidator>) {
535 info!("Adding configuration validator: {}", validator.name());
536 self.validators.write().await.push(validator);
537 }
538
539 pub async fn add_hook(&self, hook: Box<dyn ReloadHook>) {
541 info!("Adding reload hook: {}", hook.name());
542 self.reload_hooks.write().await.push(hook);
543 }
544
545 pub fn subscribe(&self) -> broadcast::Receiver<ReloadEvent> {
547 self.reload_tx.subscribe()
548 }
549
550 pub fn stats(&self) -> &ReloadStats {
552 &self.stats
553 }
554
555 fn clone_for_task(&self) -> ConfigManager {
557 ConfigManager {
558 current_config: Arc::clone(&self.current_config),
559 previous_config: Arc::clone(&self.previous_config),
560 config_path: self.config_path.clone(),
561 watcher: self.watcher.clone(),
562 reload_tx: self.reload_tx.clone(),
563 stats: Arc::clone(&self.stats),
564 validators: Arc::clone(&self.validators),
565 reload_hooks: Arc::clone(&self.reload_hooks),
566 cert_reloader: Arc::clone(&self.cert_reloader),
567 }
568 }
569}
570
571pub struct AuditReloadHook {
577 log_manager: SharedLogManager,
578}
579
580impl AuditReloadHook {
581 pub fn new(log_manager: SharedLogManager) -> Self {
583 Self { log_manager }
584 }
585}
586
587#[async_trait::async_trait]
588impl ReloadHook for AuditReloadHook {
589 async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> SentinelResult<()> {
590 let trace_id = uuid::Uuid::new_v4().to_string();
592 let audit_entry = AuditLogEntry::config_change(
593 &trace_id,
594 "reload_started",
595 format!(
596 "Configuration reload starting: {} routes -> {} routes, {} upstreams -> {} upstreams",
597 old_config.routes.len(),
598 new_config.routes.len(),
599 old_config.upstreams.len(),
600 new_config.upstreams.len()
601 ),
602 );
603 self.log_manager.log_audit(&audit_entry);
604 Ok(())
605 }
606
607 async fn post_reload(&self, old_config: &Config, new_config: &Config) {
608 let trace_id = uuid::Uuid::new_v4().to_string();
610 let audit_entry = AuditLogEntry::config_change(
611 &trace_id,
612 "reload_success",
613 format!(
614 "Configuration reload successful: {} routes, {} upstreams, {} listeners",
615 new_config.routes.len(),
616 new_config.upstreams.len(),
617 new_config.listeners.len()
618 ),
619 )
620 .with_metadata("old_routes", old_config.routes.len().to_string())
621 .with_metadata("new_routes", new_config.routes.len().to_string())
622 .with_metadata("old_upstreams", old_config.upstreams.len().to_string())
623 .with_metadata("new_upstreams", new_config.upstreams.len().to_string());
624 self.log_manager.log_audit(&audit_entry);
625 }
626
627 async fn on_failure(&self, config: &Config, error: &SentinelError) {
628 let trace_id = uuid::Uuid::new_v4().to_string();
630 let audit_entry = AuditLogEntry::config_change(
631 &trace_id,
632 "reload_failed",
633 format!("Configuration reload failed: {}", error),
634 )
635 .with_metadata("current_routes", config.routes.len().to_string())
636 .with_metadata("current_upstreams", config.upstreams.len().to_string());
637 self.log_manager.log_audit(&audit_entry);
638 }
639
640 fn name(&self) -> &str {
641 "audit_reload_hook"
642 }
643}
644
645#[cfg(test)]
646mod tests {
647 use super::*;
648
649 #[tokio::test]
650 async fn test_config_reload_rejects_invalid_config() {
651 let initial_config = Config::default_for_testing();
653 let initial_routes = initial_config.routes.len();
654
655 let temp_dir = tempfile::tempdir().unwrap();
656 let config_path = temp_dir.path().join("config.kdl");
657
658 std::fs::write(&config_path, "this is not valid KDL { {{{{ broken").unwrap();
660
661 let manager = ConfigManager::new(&config_path, initial_config)
663 .await
664 .unwrap();
665
666 assert_eq!(manager.current().routes.len(), initial_routes);
668
669 let result = manager.reload(ReloadTrigger::Manual).await;
671 assert!(result.is_err(), "Reload should fail for invalid config");
672
673 assert_eq!(
675 manager.current().routes.len(),
676 initial_routes,
677 "Original config should be preserved after failed reload"
678 );
679
680 assert_eq!(
682 manager
683 .stats()
684 .failed_reloads
685 .load(std::sync::atomic::Ordering::Relaxed),
686 1,
687 "Failed reload should be recorded"
688 );
689 }
690
691 #[tokio::test]
692 async fn test_config_reload_accepts_valid_config() {
693 let initial_config = Config::default_for_testing();
695 let temp_dir = tempfile::tempdir().unwrap();
696 let config_path = temp_dir.path().join("config.kdl");
697
698 let static_dir = temp_dir.path().join("static");
700 std::fs::create_dir_all(&static_dir).unwrap();
701
702 let valid_config = r#"
704server {
705 worker-threads 4
706}
707
708listeners {
709 listener "http" {
710 address "0.0.0.0:8080"
711 protocol "http"
712 }
713}
714
715upstreams {
716 upstream "backend" {
717 target "127.0.0.1:3000"
718 }
719}
720
721routes {
722 route "api" {
723 priority "high"
724 matches {
725 path-prefix "/api/"
726 }
727 upstream "backend"
728 }
729}
730"#;
731 std::fs::write(&config_path, valid_config).unwrap();
732
733 let manager = ConfigManager::new(&config_path, initial_config)
735 .await
736 .unwrap();
737
738 let result = manager.reload(ReloadTrigger::Manual).await;
740 assert!(
741 result.is_ok(),
742 "Reload should succeed for valid config: {:?}",
743 result.err()
744 );
745
746 assert_eq!(
748 manager
749 .stats()
750 .successful_reloads
751 .load(std::sync::atomic::Ordering::Relaxed),
752 1,
753 "Successful reload should be recorded"
754 );
755 }
756}