1mod coordinator;
13mod signals;
14mod validators;
15
16pub use coordinator::GracefulReloadCoordinator;
17pub use signals::{SignalManager, SignalType};
18pub use validators::{RouteValidator, UpstreamValidator};
19
20use arc_swap::ArcSwap;
21use notify::{Event, EventKind, RecursiveMode, Watcher};
22use std::path::{Path, PathBuf};
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25use tokio::sync::{broadcast, RwLock};
26use tracing::{debug, error, info, trace, warn};
27
28use sentinel_common::errors::{SentinelError, SentinelResult};
29use sentinel_config::Config;
30
31#[derive(Debug, Clone)]
37pub enum ReloadEvent {
38 Started {
40 timestamp: Instant,
41 trigger: ReloadTrigger,
42 },
43 Validated { timestamp: Instant },
45 Applied { timestamp: Instant, version: String },
47 Failed { timestamp: Instant, error: String },
49 RolledBack { timestamp: Instant, reason: String },
51}
52
53#[derive(Debug, Clone)]
55pub enum ReloadTrigger {
56 Manual,
58 FileChange,
60 Signal,
62 Scheduled,
64}
65
66#[async_trait::async_trait]
72pub trait ConfigValidator: Send + Sync {
73 async fn validate(&self, config: &Config) -> SentinelResult<()>;
75
76 fn name(&self) -> &str;
78}
79
80#[async_trait::async_trait]
82pub trait ReloadHook: Send + Sync {
83 async fn pre_reload(&self, old_config: &Config, new_config: &Config) -> SentinelResult<()>;
85
86 async fn post_reload(&self, old_config: &Config, new_config: &Config);
88
89 async fn on_failure(&self, config: &Config, error: &SentinelError);
91
92 fn name(&self) -> &str;
94}
95
96#[derive(Default)]
102pub struct ReloadStats {
103 pub total_reloads: std::sync::atomic::AtomicU64,
105 pub successful_reloads: std::sync::atomic::AtomicU64,
107 pub failed_reloads: std::sync::atomic::AtomicU64,
109 pub rollbacks: std::sync::atomic::AtomicU64,
111 pub last_success: RwLock<Option<Instant>>,
113 pub last_failure: RwLock<Option<Instant>>,
115 pub avg_duration_ms: RwLock<f64>,
117}
118
119pub struct ConfigManager {
125 current_config: Arc<ArcSwap<Config>>,
127 previous_config: Arc<RwLock<Option<Arc<Config>>>>,
129 config_path: PathBuf,
131 watcher: Arc<RwLock<Option<notify::RecommendedWatcher>>>,
133 reload_tx: broadcast::Sender<ReloadEvent>,
135 stats: Arc<ReloadStats>,
137 validators: Arc<RwLock<Vec<Box<dyn ConfigValidator>>>>,
139 reload_hooks: Arc<RwLock<Vec<Box<dyn ReloadHook>>>>,
141}
142
143impl ConfigManager {
144 pub async fn new(
146 config_path: impl AsRef<Path>,
147 initial_config: Config,
148 ) -> SentinelResult<Self> {
149 let config_path = config_path.as_ref().to_path_buf();
150 let (reload_tx, _) = broadcast::channel(100);
151
152 info!(
153 config_path = %config_path.display(),
154 route_count = initial_config.routes.len(),
155 upstream_count = initial_config.upstreams.len(),
156 listener_count = initial_config.listeners.len(),
157 "Initializing configuration manager"
158 );
159
160 trace!(
161 config_path = %config_path.display(),
162 "Creating ArcSwap for configuration"
163 );
164
165 Ok(Self {
166 current_config: Arc::new(ArcSwap::from_pointee(initial_config)),
167 previous_config: Arc::new(RwLock::new(None)),
168 config_path,
169 watcher: Arc::new(RwLock::new(None)),
170 reload_tx,
171 stats: Arc::new(ReloadStats::default()),
172 validators: Arc::new(RwLock::new(Vec::new())),
173 reload_hooks: Arc::new(RwLock::new(Vec::new())),
174 })
175 }
176
177 pub fn current(&self) -> Arc<Config> {
179 self.current_config.load_full()
180 }
181
182 pub async fn start_watching(&self) -> SentinelResult<()> {
187 if self.watcher.read().await.is_some() {
189 warn!("File watcher already active, skipping");
190 return Ok(());
191 }
192
193 let config_path = self.config_path.clone();
194
195 let (tx, mut rx) = tokio::sync::mpsc::channel(10);
197
198 let mut watcher =
199 notify::recommended_watcher(move |event: Result<Event, notify::Error>| {
200 if let Ok(event) = event {
201 let _ = tx.blocking_send(event);
202 }
203 })
204 .map_err(|e| SentinelError::Config {
205 message: format!("Failed to create file watcher: {}", e),
206 source: None,
207 })?;
208
209 watcher
211 .watch(&config_path, RecursiveMode::NonRecursive)
212 .map_err(|e| SentinelError::Config {
213 message: format!("Failed to watch config file: {}", e),
214 source: None,
215 })?;
216
217 *self.watcher.write().await = Some(watcher);
219
220 let manager = Arc::new(self.clone_for_task());
222 tokio::spawn(async move {
223 while let Some(event) = rx.recv().await {
224 if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_)) {
225 info!("Configuration file changed, triggering reload");
226
227 tokio::time::sleep(Duration::from_millis(100)).await;
229
230 if let Err(e) = manager.reload(ReloadTrigger::FileChange).await {
231 error!("Auto-reload failed: {}", e);
232 error!("Continuing with current configuration");
233 }
234 }
235 }
236 });
237
238 info!(
239 "Auto-reload enabled: watching configuration file {:?}",
240 self.config_path
241 );
242 Ok(())
243 }
244
245 pub async fn reload(&self, trigger: ReloadTrigger) -> SentinelResult<()> {
247 let start = Instant::now();
248 let reload_num = self.stats
249 .total_reloads
250 .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
251
252 info!(
253 trigger = ?trigger,
254 reload_num = reload_num,
255 config_path = %self.config_path.display(),
256 "Starting configuration reload"
257 );
258
259 let _ = self.reload_tx.send(ReloadEvent::Started {
261 timestamp: Instant::now(),
262 trigger: trigger.clone(),
263 });
264
265 trace!(
266 config_path = %self.config_path.display(),
267 "Reading configuration file"
268 );
269
270 let new_config = match Config::from_file(&self.config_path) {
272 Ok(config) => {
273 debug!(
274 route_count = config.routes.len(),
275 upstream_count = config.upstreams.len(),
276 listener_count = config.listeners.len(),
277 "Configuration file parsed successfully"
278 );
279 config
280 }
281 Err(e) => {
282 let error_msg = format!("Failed to load configuration: {}", e);
283 error!(
284 config_path = %self.config_path.display(),
285 error = %e,
286 "Failed to load configuration file"
287 );
288 self.stats
289 .failed_reloads
290 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
291 *self.stats.last_failure.write().await = Some(Instant::now());
292
293 let _ = self.reload_tx.send(ReloadEvent::Failed {
294 timestamp: Instant::now(),
295 error: error_msg.clone(),
296 });
297
298 return Err(SentinelError::Config {
299 message: error_msg,
300 source: None,
301 });
302 }
303 };
304
305 trace!("Starting configuration validation");
306
307 if let Err(e) = self.validate_config(&new_config).await {
310 error!(
311 error = %e,
312 "Configuration validation failed - new configuration REJECTED"
313 );
314 self.stats
315 .failed_reloads
316 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
317 *self.stats.last_failure.write().await = Some(Instant::now());
318
319 let _ = self.reload_tx.send(ReloadEvent::Failed {
320 timestamp: Instant::now(),
321 error: e.to_string(),
322 });
323
324 return Err(e);
325 }
326
327 info!(
328 route_count = new_config.routes.len(),
329 upstream_count = new_config.upstreams.len(),
330 "Configuration validation passed, applying new configuration"
331 );
332
333 let _ = self.reload_tx.send(ReloadEvent::Validated {
334 timestamp: Instant::now(),
335 });
336
337 let old_config = self.current_config.load_full();
339
340 trace!(
341 old_routes = old_config.routes.len(),
342 new_routes = new_config.routes.len(),
343 "Preparing configuration swap"
344 );
345
346 let hooks = self.reload_hooks.read().await;
348 for hook in hooks.iter() {
349 trace!(hook_name = %hook.name(), "Running pre-reload hook");
350 if let Err(e) = hook.pre_reload(&old_config, &new_config).await {
351 warn!(
352 hook_name = %hook.name(),
353 error = %e,
354 "Pre-reload hook failed"
355 );
356 }
358 }
359 drop(hooks);
360
361 trace!("Saving previous configuration for potential rollback");
363 *self.previous_config.write().await = Some(old_config.clone());
364
365 trace!("Applying new configuration atomically");
367 self.current_config.store(Arc::new(new_config.clone()));
368
369 let hooks = self.reload_hooks.read().await;
371 for hook in hooks.iter() {
372 trace!(hook_name = %hook.name(), "Running post-reload hook");
373 hook.post_reload(&old_config, &new_config).await;
374 }
375 drop(hooks);
376
377 let duration = start.elapsed();
379 let successful_count = self.stats
380 .successful_reloads
381 .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
382 *self.stats.last_success.write().await = Some(Instant::now());
383
384 {
386 let mut avg = self.stats.avg_duration_ms.write().await;
387 let total = successful_count as f64;
388 *avg = (*avg * (total - 1.0) + duration.as_millis() as f64) / total;
389 }
390
391 let _ = self.reload_tx.send(ReloadEvent::Applied {
392 timestamp: Instant::now(),
393 version: format!("{:?}", Instant::now()), });
395
396 info!(
397 duration_ms = duration.as_millis(),
398 successful_reloads = successful_count,
399 route_count = new_config.routes.len(),
400 upstream_count = new_config.upstreams.len(),
401 "Configuration reload completed successfully"
402 );
403
404 Ok(())
405 }
406
407 pub async fn rollback(&self, reason: String) -> SentinelResult<()> {
409 info!(
410 reason = %reason,
411 "Starting configuration rollback"
412 );
413
414 let previous = self.previous_config.read().await.clone();
415
416 if let Some(prev_config) = previous {
417 trace!(
418 route_count = prev_config.routes.len(),
419 "Found previous configuration for rollback"
420 );
421
422 trace!("Validating previous configuration");
424 if let Err(e) = self.validate_config(&prev_config).await {
425 error!(
426 error = %e,
427 "Previous configuration validation failed during rollback"
428 );
429 return Err(e);
430 }
431
432 trace!("Applying previous configuration");
434 self.current_config.store(prev_config.clone());
435 let rollback_count = self.stats
436 .rollbacks
437 .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
438
439 let _ = self.reload_tx.send(ReloadEvent::RolledBack {
440 timestamp: Instant::now(),
441 reason: reason.clone(),
442 });
443
444 info!(
445 reason = %reason,
446 rollback_count = rollback_count,
447 route_count = prev_config.routes.len(),
448 "Configuration rolled back successfully"
449 );
450 Ok(())
451 } else {
452 warn!("No previous configuration available for rollback");
453 Err(SentinelError::Config {
454 message: "No previous configuration available".to_string(),
455 source: None,
456 })
457 }
458 }
459
460 async fn validate_config(&self, config: &Config) -> SentinelResult<()> {
462 trace!(
463 route_count = config.routes.len(),
464 upstream_count = config.upstreams.len(),
465 "Starting configuration validation"
466 );
467
468 trace!("Running built-in config validation");
470 config.validate()?;
471
472 let validators = self.validators.read().await;
474 trace!(
475 validator_count = validators.len(),
476 "Running custom validators"
477 );
478 for validator in validators.iter() {
479 trace!(validator_name = %validator.name(), "Running validator");
480 validator.validate(config).await.map_err(|e| {
481 error!(
482 validator_name = %validator.name(),
483 error = %e,
484 "Validator failed"
485 );
486 e
487 })?;
488 }
489
490 debug!(
491 route_count = config.routes.len(),
492 upstream_count = config.upstreams.len(),
493 "Configuration validation passed"
494 );
495
496 Ok(())
497 }
498
499 pub async fn add_validator(&self, validator: Box<dyn ConfigValidator>) {
501 info!("Adding configuration validator: {}", validator.name());
502 self.validators.write().await.push(validator);
503 }
504
505 pub async fn add_hook(&self, hook: Box<dyn ReloadHook>) {
507 info!("Adding reload hook: {}", hook.name());
508 self.reload_hooks.write().await.push(hook);
509 }
510
511 pub fn subscribe(&self) -> broadcast::Receiver<ReloadEvent> {
513 self.reload_tx.subscribe()
514 }
515
516 pub fn stats(&self) -> &ReloadStats {
518 &self.stats
519 }
520
521 fn clone_for_task(&self) -> ConfigManager {
523 ConfigManager {
524 current_config: Arc::clone(&self.current_config),
525 previous_config: Arc::clone(&self.previous_config),
526 config_path: self.config_path.clone(),
527 watcher: self.watcher.clone(),
528 reload_tx: self.reload_tx.clone(),
529 stats: Arc::clone(&self.stats),
530 validators: Arc::clone(&self.validators),
531 reload_hooks: Arc::clone(&self.reload_hooks),
532 }
533 }
534}
535
536#[cfg(test)]
537mod tests {
538 use super::*;
539
540 #[tokio::test]
541 async fn test_config_reload_rejects_invalid_config() {
542 let initial_config = Config::default_for_testing();
544 let initial_routes = initial_config.routes.len();
545
546 let temp_dir = tempfile::tempdir().unwrap();
547 let config_path = temp_dir.path().join("config.kdl");
548
549 std::fs::write(&config_path, "this is not valid KDL { {{{{ broken").unwrap();
551
552 let manager = ConfigManager::new(&config_path, initial_config)
554 .await
555 .unwrap();
556
557 assert_eq!(manager.current().routes.len(), initial_routes);
559
560 let result = manager.reload(ReloadTrigger::Manual).await;
562 assert!(result.is_err(), "Reload should fail for invalid config");
563
564 assert_eq!(
566 manager.current().routes.len(),
567 initial_routes,
568 "Original config should be preserved after failed reload"
569 );
570
571 assert_eq!(
573 manager
574 .stats()
575 .failed_reloads
576 .load(std::sync::atomic::Ordering::Relaxed),
577 1,
578 "Failed reload should be recorded"
579 );
580 }
581
582 #[tokio::test]
583 async fn test_config_reload_accepts_valid_config() {
584 let initial_config = Config::default_for_testing();
586 let temp_dir = tempfile::tempdir().unwrap();
587 let config_path = temp_dir.path().join("config.kdl");
588
589 let static_dir = temp_dir.path().join("static");
591 std::fs::create_dir_all(&static_dir).unwrap();
592
593 let valid_config = r#"
595server {
596 worker-threads 4
597}
598
599listeners {
600 listener "http" {
601 address "0.0.0.0:8080"
602 protocol "http"
603 }
604}
605
606upstreams {
607 upstream "backend" {
608 target "127.0.0.1:3000"
609 }
610}
611
612routes {
613 route "api" {
614 priority "high"
615 matches {
616 path-prefix "/api/"
617 }
618 upstream "backend"
619 }
620}
621"#;
622 std::fs::write(&config_path, valid_config).unwrap();
623
624 let manager = ConfigManager::new(&config_path, initial_config)
626 .await
627 .unwrap();
628
629 let result = manager.reload(ReloadTrigger::Manual).await;
631 assert!(
632 result.is_ok(),
633 "Reload should succeed for valid config: {:?}",
634 result.err()
635 );
636
637 assert_eq!(
639 manager
640 .stats()
641 .successful_reloads
642 .load(std::sync::atomic::Ordering::Relaxed),
643 1,
644 "Successful reload should be recorded"
645 );
646 }
647}