1mod coordinator;
13mod signals;
14mod validators;
15
16pub use coordinator::GracefulReloadCoordinator;
17pub use signals::{SignalManager, SignalType};
18pub use validators::{RouteValidator, UpstreamValidator};
19
20use arc_swap::ArcSwap;
21use notify::{Event, EventKind, RecursiveMode, Watcher};
22use std::path::{Path, PathBuf};
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25use tokio::sync::{broadcast, RwLock};
26use tracing::{debug, error, info, warn};
27
28use sentinel_common::errors::{SentinelError, SentinelResult};
29use sentinel_config::Config;
30
31#[derive(Debug, Clone)]
37pub enum ReloadEvent {
38 Started {
40 timestamp: Instant,
41 trigger: ReloadTrigger,
42 },
43 Validated { timestamp: Instant },
45 Applied { timestamp: Instant, version: String },
47 Failed { timestamp: Instant, error: String },
49 RolledBack { timestamp: Instant, reason: String },
51}
52
53#[derive(Debug, Clone)]
55pub enum ReloadTrigger {
56 Manual,
58 FileChange,
60 Signal,
62 Scheduled,
64}
65
66#[async_trait::async_trait]
72pub trait ConfigValidator: Send + Sync {
73 async fn validate(&self, config: &Config) -> SentinelResult<()>;
75
76 fn name(&self) -> &str;
78}
79
80#[async_trait::async_trait]
82pub trait ReloadHook: Send + Sync {
83 async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> SentinelResult<()>;
85
86 async fn post_reload(&self, old_config: &Config, new_config: &Config);
88
89 async fn on_failure(&self, config: &Config, error: &SentinelError);
91
92 fn name(&self) -> &str;
94}
95
96#[derive(Default)]
102pub struct ReloadStats {
103 pub total_reloads: std::sync::atomic::AtomicU64,
105 pub successful_reloads: std::sync::atomic::AtomicU64,
107 pub failed_reloads: std::sync::atomic::AtomicU64,
109 pub rollbacks: std::sync::atomic::AtomicU64,
111 pub last_success: RwLock<Option<Instant>>,
113 pub last_failure: RwLock<Option<Instant>>,
115 pub avg_duration_ms: RwLock<f64>,
117}
118
119pub struct ConfigManager {
125 current_config: Arc<ArcSwap<Config>>,
127 previous_config: Arc<RwLock<Option<Arc<Config>>>>,
129 config_path: PathBuf,
131 watcher: Arc<RwLock<Option<notify::RecommendedWatcher>>>,
133 reload_tx: broadcast::Sender<ReloadEvent>,
135 stats: Arc<ReloadStats>,
137 validators: Arc<RwLock<Vec<Box<dyn ConfigValidator>>>>,
139 reload_hooks: Arc<RwLock<Vec<Box<dyn ReloadHook>>>>,
141}
142
143impl ConfigManager {
144 pub async fn new(
146 config_path: impl AsRef<Path>,
147 initial_config: Config,
148 ) -> SentinelResult<Self> {
149 let config_path = config_path.as_ref().to_path_buf();
150 let (reload_tx, _) = broadcast::channel(100);
151
152 info!("Initializing configuration manager for: {:?}", config_path);
153
154 Ok(Self {
155 current_config: Arc::new(ArcSwap::from_pointee(initial_config)),
156 previous_config: Arc::new(RwLock::new(None)),
157 config_path,
158 watcher: Arc::new(RwLock::new(None)),
159 reload_tx,
160 stats: Arc::new(ReloadStats::default()),
161 validators: Arc::new(RwLock::new(Vec::new())),
162 reload_hooks: Arc::new(RwLock::new(Vec::new())),
163 })
164 }
165
166 pub fn current(&self) -> Arc<Config> {
168 self.current_config.load_full()
169 }
170
171 pub async fn start_watching(&self) -> SentinelResult<()> {
176 if self.watcher.read().await.is_some() {
178 warn!("File watcher already active, skipping");
179 return Ok(());
180 }
181
182 let config_path = self.config_path.clone();
183
184 let (tx, mut rx) = tokio::sync::mpsc::channel(10);
186
187 let mut watcher =
188 notify::recommended_watcher(move |event: Result<Event, notify::Error>| {
189 if let Ok(event) = event {
190 let _ = tx.blocking_send(event);
191 }
192 })
193 .map_err(|e| SentinelError::Config {
194 message: format!("Failed to create file watcher: {}", e),
195 source: None,
196 })?;
197
198 watcher
200 .watch(&config_path, RecursiveMode::NonRecursive)
201 .map_err(|e| SentinelError::Config {
202 message: format!("Failed to watch config file: {}", e),
203 source: None,
204 })?;
205
206 *self.watcher.write().await = Some(watcher);
208
209 let manager = Arc::new(self.clone_for_task());
211 tokio::spawn(async move {
212 while let Some(event) = rx.recv().await {
213 if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_)) {
214 info!("Configuration file changed, triggering reload");
215
216 tokio::time::sleep(Duration::from_millis(100)).await;
218
219 if let Err(e) = manager.reload(ReloadTrigger::FileChange).await {
220 error!("Auto-reload failed: {}", e);
221 error!("Continuing with current configuration");
222 }
223 }
224 }
225 });
226
227 info!(
228 "Auto-reload enabled: watching configuration file {:?}",
229 self.config_path
230 );
231 Ok(())
232 }
233
234 pub async fn reload(&self, trigger: ReloadTrigger) -> SentinelResult<()> {
236 let start = Instant::now();
237 self.stats
238 .total_reloads
239 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
240
241 info!("Starting configuration reload (trigger: {:?})", trigger);
242
243 let _ = self.reload_tx.send(ReloadEvent::Started {
245 timestamp: Instant::now(),
246 trigger: trigger.clone(),
247 });
248
249 let new_config = match Config::from_file(&self.config_path) {
251 Ok(config) => config,
252 Err(e) => {
253 let error_msg = format!("Failed to load configuration: {}", e);
254 error!("{}", error_msg);
255 self.stats
256 .failed_reloads
257 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
258 *self.stats.last_failure.write().await = Some(Instant::now());
259
260 let _ = self.reload_tx.send(ReloadEvent::Failed {
261 timestamp: Instant::now(),
262 error: error_msg.clone(),
263 });
264
265 return Err(SentinelError::Config {
266 message: error_msg,
267 source: None,
268 });
269 }
270 };
271
272 if let Err(e) = self.validate_config(&new_config).await {
275 error!("Configuration validation failed: {}", e);
276 error!("REJECTED: New configuration will NOT be applied. Continuing with current configuration.");
277 self.stats
278 .failed_reloads
279 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
280 *self.stats.last_failure.write().await = Some(Instant::now());
281
282 let _ = self.reload_tx.send(ReloadEvent::Failed {
283 timestamp: Instant::now(),
284 error: e.to_string(),
285 });
286
287 return Err(e);
288 }
289
290 info!("Configuration validation passed, applying new configuration");
291
292 let _ = self.reload_tx.send(ReloadEvent::Validated {
293 timestamp: Instant::now(),
294 });
295
296 let old_config = self.current_config.load_full();
298
299 for hook in self.reload_hooks.read().await.iter() {
301 if let Err(e) = hook.pre_reload(&old_config, &new_config).await {
302 warn!("Pre-reload hook '{}' failed: {}", hook.name(), e);
303 }
305 }
306
307 *self.previous_config.write().await = Some(old_config.clone());
309
310 self.current_config.store(Arc::new(new_config.clone()));
312
313 for hook in self.reload_hooks.read().await.iter() {
315 hook.post_reload(&old_config, &new_config).await;
316 }
317
318 let duration = start.elapsed();
320 self.stats
321 .successful_reloads
322 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
323 *self.stats.last_success.write().await = Some(Instant::now());
324
325 {
327 let mut avg = self.stats.avg_duration_ms.write().await;
328 let total = self
329 .stats
330 .successful_reloads
331 .load(std::sync::atomic::Ordering::Relaxed) as f64;
332 *avg = (*avg * (total - 1.0) + duration.as_millis() as f64) / total;
333 }
334
335 let _ = self.reload_tx.send(ReloadEvent::Applied {
336 timestamp: Instant::now(),
337 version: format!("{:?}", Instant::now()), });
339
340 info!(
341 "Configuration reload completed successfully in {:?}",
342 duration
343 );
344
345 Ok(())
346 }
347
348 pub async fn rollback(&self, reason: String) -> SentinelResult<()> {
350 let previous = self.previous_config.read().await.clone();
351
352 if let Some(prev_config) = previous {
353 info!("Rolling back configuration: {}", reason);
354
355 if let Err(e) = self.validate_config(&prev_config).await {
357 error!("Previous configuration validation failed: {}", e);
358 return Err(e);
359 }
360
361 self.current_config.store(prev_config);
363 self.stats
364 .rollbacks
365 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
366
367 let _ = self.reload_tx.send(ReloadEvent::RolledBack {
368 timestamp: Instant::now(),
369 reason: reason.clone(),
370 });
371
372 info!("Configuration rolled back successfully");
373 Ok(())
374 } else {
375 warn!("No previous configuration available for rollback");
376 Err(SentinelError::Config {
377 message: "No previous configuration available".to_string(),
378 source: None,
379 })
380 }
381 }
382
383 async fn validate_config(&self, config: &Config) -> SentinelResult<()> {
385 config.validate()?;
387
388 for validator in self.validators.read().await.iter() {
390 debug!("Running validator: {}", validator.name());
391 validator.validate(config).await.map_err(|e| {
392 error!("Validator '{}' failed: {}", validator.name(), e);
393 e
394 })?;
395 }
396
397 Ok(())
398 }
399
400 pub async fn add_validator(&self, validator: Box<dyn ConfigValidator>) {
402 info!("Adding configuration validator: {}", validator.name());
403 self.validators.write().await.push(validator);
404 }
405
406 pub async fn add_hook(&self, hook: Box<dyn ReloadHook>) {
408 info!("Adding reload hook: {}", hook.name());
409 self.reload_hooks.write().await.push(hook);
410 }
411
412 pub fn subscribe(&self) -> broadcast::Receiver<ReloadEvent> {
414 self.reload_tx.subscribe()
415 }
416
417 pub fn stats(&self) -> &ReloadStats {
419 &self.stats
420 }
421
422 fn clone_for_task(&self) -> ConfigManager {
424 ConfigManager {
425 current_config: Arc::clone(&self.current_config),
426 previous_config: Arc::clone(&self.previous_config),
427 config_path: self.config_path.clone(),
428 watcher: self.watcher.clone(),
429 reload_tx: self.reload_tx.clone(),
430 stats: Arc::clone(&self.stats),
431 validators: Arc::clone(&self.validators),
432 reload_hooks: Arc::clone(&self.reload_hooks),
433 }
434 }
435}
436
437#[cfg(test)]
438mod tests {
439 use super::*;
440
441 #[tokio::test]
442 async fn test_config_reload_rejects_invalid_config() {
443 let initial_config = Config::default_for_testing();
445 let initial_routes = initial_config.routes.len();
446
447 let temp_dir = tempfile::tempdir().unwrap();
448 let config_path = temp_dir.path().join("config.kdl");
449
450 std::fs::write(&config_path, "this is not valid KDL { {{{{ broken").unwrap();
452
453 let manager = ConfigManager::new(&config_path, initial_config)
455 .await
456 .unwrap();
457
458 assert_eq!(manager.current().routes.len(), initial_routes);
460
461 let result = manager.reload(ReloadTrigger::Manual).await;
463 assert!(result.is_err(), "Reload should fail for invalid config");
464
465 assert_eq!(
467 manager.current().routes.len(),
468 initial_routes,
469 "Original config should be preserved after failed reload"
470 );
471
472 assert_eq!(
474 manager
475 .stats()
476 .failed_reloads
477 .load(std::sync::atomic::Ordering::Relaxed),
478 1,
479 "Failed reload should be recorded"
480 );
481 }
482
483 #[tokio::test]
484 async fn test_config_reload_accepts_valid_config() {
485 let initial_config = Config::default_for_testing();
487 let temp_dir = tempfile::tempdir().unwrap();
488 let config_path = temp_dir.path().join("config.kdl");
489
490 let static_dir = temp_dir.path().join("static");
492 std::fs::create_dir_all(&static_dir).unwrap();
493
494 let valid_config = r#"
496server {
497 worker-threads 4
498}
499
500listeners {
501 listener "http" {
502 address "0.0.0.0:8080"
503 protocol "http"
504 }
505}
506
507upstreams {
508 upstream "backend" {
509 target "127.0.0.1:3000"
510 }
511}
512
513routes {
514 route "api" {
515 priority "high"
516 matches {
517 path-prefix "/api/"
518 }
519 upstream "backend"
520 }
521}
522"#;
523 std::fs::write(&config_path, valid_config).unwrap();
524
525 let manager = ConfigManager::new(&config_path, initial_config)
527 .await
528 .unwrap();
529
530 let result = manager.reload(ReloadTrigger::Manual).await;
532 assert!(
533 result.is_ok(),
534 "Reload should succeed for valid config: {:?}",
535 result.err()
536 );
537
538 assert_eq!(
540 manager
541 .stats()
542 .successful_reloads
543 .load(std::sync::atomic::Ordering::Relaxed),
544 1,
545 "Successful reload should be recorded"
546 );
547 }
548}