1use std::path::PathBuf;
138use std::sync::Arc;
139use std::time::{Duration, Instant, SystemTime};
140
141use tokio::task::JoinHandle;
142use tracing::{debug, error, info, warn};
143
144use super::shared::SharedConfig;
145
146type BoxError = Box<dyn std::error::Error + Send + Sync>;
148
149#[derive(Debug, Clone)]
151pub struct ReloaderConfig {
152 pub config_path: Option<PathBuf>,
154
155 pub poll_interval: Duration,
158
159 pub periodic_interval: Duration,
162
163 pub debounce: Duration,
166
167 pub enable_sighup: bool,
170}
171
172impl Default for ReloaderConfig {
173 fn default() -> Self {
174 Self {
175 config_path: None,
176 poll_interval: Duration::from_secs(5),
177 periodic_interval: Duration::ZERO,
178 debounce: Duration::from_millis(500),
179 enable_sighup: true,
180 }
181 }
182}
183
184type PostReloadHook<T> = Arc<dyn Fn(&T) + Send + Sync>;
195
196pub struct ConfigReloader<T: Clone + Send + Sync + 'static> {
197 config: ReloaderConfig,
198 shared: SharedConfig<T>,
199 reload_fn: Arc<dyn Fn() -> Result<T, BoxError> + Send + Sync>,
200 validate_fn: Arc<dyn Fn(&T) -> Result<(), BoxError> + Send + Sync>,
201 post_reload_hooks: Vec<PostReloadHook<T>>,
202}
203
204impl<T: Clone + Send + Sync + 'static> ConfigReloader<T> {
205 pub fn new(
212 config: ReloaderConfig,
213 shared: SharedConfig<T>,
214 reload_fn: impl Fn() -> Result<T, BoxError> + Send + Sync + 'static,
215 validate_fn: impl Fn(&T) -> Result<(), BoxError> + Send + Sync + 'static,
216 ) -> Self {
217 Self {
218 config,
219 shared,
220 reload_fn: Arc::new(reload_fn),
221 validate_fn: Arc::new(validate_fn),
222 post_reload_hooks: Vec::new(),
223 }
224 }
225
226 #[must_use]
236 pub fn with_post_reload_hook(mut self, hook: impl Fn(&T) + Send + Sync + 'static) -> Self {
237 self.post_reload_hooks.push(Arc::new(hook));
238 self
239 }
240
241 #[must_use]
247 pub fn with_registry_update(self, key: &str) -> Self
248 where
249 T: serde::Serialize + Default,
250 {
251 let key = key.to_string();
252 self.with_post_reload_hook(move |config| {
253 super::registry::update::<T>(&key, config);
254 })
255 }
256
257 pub fn start(self) -> JoinHandle<()> {
262 let has_file = self.config.config_path.is_some();
263 let has_periodic = self.config.periodic_interval > Duration::ZERO;
264 let has_sighup = self.config.enable_sighup;
265
266 info!(
267 file_watch = has_file,
268 periodic = has_periodic,
269 sighup = has_sighup,
270 "Config reloader started"
271 );
272
273 tokio::spawn(async move {
274 self.run_loop().await;
275 })
276 }
277
278 async fn run_loop(self) {
280 #[cfg(feature = "shutdown")]
281 let shutdown_token = crate::shutdown::token();
282
283 let mut last_modified: Option<SystemTime> = match self.config.config_path.as_ref() {
285 Some(p) => file_mtime_async(p).await,
286 None => None,
287 };
288 let mut last_reload = Instant::now();
289
290 let mut poll_timer = self
292 .config
293 .config_path
294 .as_ref()
295 .map(|_| tokio::time::interval(self.config.poll_interval));
296
297 let mut periodic_timer = if self.config.periodic_interval > Duration::ZERO {
299 Some(tokio::time::interval(self.config.periodic_interval))
300 } else {
301 None
302 };
303
304 #[cfg(unix)]
306 let mut sighup = if self.config.enable_sighup {
307 Some(
308 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
309 .expect("failed to register SIGHUP handler"),
310 )
311 } else {
312 None
313 };
314
315 loop {
316 #[cfg(feature = "shutdown")]
318 if shutdown_token.is_cancelled() {
319 info!("Config reloader stopping (shutdown)");
320 return;
321 }
322
323 let trigger_result = {
324 #[cfg(feature = "shutdown")]
325 {
326 tokio::select! {
327 trigger = self.wait_for_trigger(
328 &mut poll_timer,
329 &mut periodic_timer,
330 #[cfg(unix)]
331 &mut sighup,
332 &mut last_modified,
333 ) => Some(trigger),
334 () = shutdown_token.cancelled() => None,
335 }
336 }
337 #[cfg(not(feature = "shutdown"))]
338 {
339 Some(
340 self.wait_for_trigger(
341 &mut poll_timer,
342 &mut periodic_timer,
343 #[cfg(unix)]
344 &mut sighup,
345 &mut last_modified,
346 )
347 .await,
348 )
349 }
350 };
351
352 let Some(trigger) = trigger_result else {
353 info!("Config reloader stopping (shutdown)");
354 return;
355 };
356
357 if last_reload.elapsed() < self.config.debounce {
359 debug!("Debouncing config reload");
360 continue;
361 }
362
363 match trigger {
364 ReloadTrigger::FileChanged => {
365 info!(
366 path = ?self.config.config_path,
367 "Config file changed, reloading"
368 );
369 }
370 ReloadTrigger::Periodic => {
371 info!("Periodic config reload triggered");
372 }
373 ReloadTrigger::Sighup => {
374 info!("SIGHUP received, reloading configuration");
375 }
376 }
377
378 self.do_reload();
379 last_reload = Instant::now();
380 }
381 }
382
383 async fn wait_for_trigger(
387 &self,
388 poll_timer: &mut Option<tokio::time::Interval>,
389 periodic_timer: &mut Option<tokio::time::Interval>,
390 #[cfg(unix)] sighup: &mut Option<tokio::signal::unix::Signal>,
391 last_modified: &mut Option<SystemTime>,
392 ) -> ReloadTrigger {
393 loop {
394 let trigger = self
395 .select_trigger(
396 poll_timer,
397 periodic_timer,
398 #[cfg(unix)]
399 sighup,
400 )
401 .await;
402
403 match trigger {
404 ReloadTrigger::FileChanged => {
405 if let Some(ref path) = self.config.config_path {
407 let current_mtime = file_mtime_async(path).await;
408 let changed = match (&*last_modified, ¤t_mtime) {
409 (Some(last), Some(current)) => current > last,
410 (None, Some(_)) => true,
411 _ => false,
412 };
413 if changed {
414 *last_modified = current_mtime;
415 return ReloadTrigger::FileChanged;
416 }
417 }
418 }
420 other => return other,
421 }
422 }
423 }
424
425 #[cfg(unix)]
427 async fn select_trigger(
428 &self,
429 poll_timer: &mut Option<tokio::time::Interval>,
430 periodic_timer: &mut Option<tokio::time::Interval>,
431 sighup: &mut Option<tokio::signal::unix::Signal>,
432 ) -> ReloadTrigger {
433 tokio::select! {
434 _ = async {
435 match poll_timer.as_mut() {
436 Some(timer) => timer.tick().await,
437 None => std::future::pending().await,
438 }
439 } => ReloadTrigger::FileChanged,
440
441 _ = async {
442 match periodic_timer.as_mut() {
443 Some(timer) => timer.tick().await,
444 None => std::future::pending().await,
445 }
446 } => ReloadTrigger::Periodic,
447
448 () = async {
449 match sighup.as_mut() {
450 Some(sig) => { sig.recv().await; },
451 None => std::future::pending::<()>().await,
452 }
453 } => ReloadTrigger::Sighup,
454 }
455 }
456
457 #[cfg(not(unix))]
459 async fn select_trigger(
460 &self,
461 poll_timer: &mut Option<tokio::time::Interval>,
462 periodic_timer: &mut Option<tokio::time::Interval>,
463 ) -> ReloadTrigger {
464 tokio::select! {
465 _ = async {
466 match poll_timer.as_mut() {
467 Some(timer) => timer.tick().await,
468 None => std::future::pending().await,
469 }
470 } => ReloadTrigger::FileChanged,
471
472 _ = async {
473 match periodic_timer.as_mut() {
474 Some(timer) => timer.tick().await,
475 None => std::future::pending().await,
476 }
477 } => ReloadTrigger::Periodic,
478 }
479 }
480
481 fn do_reload(&self) {
483 match (self.reload_fn)() {
484 Ok(new_config) => {
485 if let Err(e) = (self.validate_fn)(&new_config) {
486 error!(error = %e, "Config reload validation failed, keeping current config");
487 #[cfg(feature = "metrics")]
488 metrics::counter!("config_reloads_total", "result" => "error").increment(1);
489 return;
490 }
491
492 let old_version = self.shared.version();
493 self.shared.update(new_config.clone());
494 let new_version = self.shared.version();
495
496 for hook in &self.post_reload_hooks {
498 hook(&new_config);
499 }
500
501 #[cfg(feature = "metrics")]
502 metrics::counter!("config_reloads_total", "result" => "success").increment(1);
503
504 info!(
505 old_version = old_version,
506 new_version = new_version,
507 "Configuration reloaded successfully"
508 );
509 }
510 Err(e) => {
511 warn!(error = %e, "Config reload failed, keeping current config");
512 #[cfg(feature = "metrics")]
513 metrics::counter!("config_reloads_total", "result" => "error").increment(1);
514 }
515 }
516 }
517}
518
519#[derive(Debug, Clone, Copy, PartialEq, Eq)]
521enum ReloadTrigger {
522 FileChanged,
523 Periodic,
524 #[allow(dead_code)]
525 Sighup,
526}
527
528async fn file_mtime_async(path: &PathBuf) -> Option<SystemTime> {
531 tokio::fs::metadata(path)
532 .await
533 .ok()
534 .and_then(|m| m.modified().ok())
535}
536
537#[cfg(test)]
539fn file_mtime(path: &PathBuf) -> Option<SystemTime> {
540 std::fs::metadata(path).ok().and_then(|m| m.modified().ok())
541}
542
543#[cfg(test)]
544mod tests {
545 use super::*;
546 use std::fs;
547 use std::io::Write;
548 use std::sync::atomic::{AtomicBool, Ordering};
549 use tempfile::TempDir;
550
551 #[derive(Clone, Debug, Default, PartialEq)]
552 struct TestConfig {
553 pub value: String,
554 }
555
556 #[test]
557 fn test_reloader_config_defaults() {
558 let config = ReloaderConfig::default();
559 assert!(config.config_path.is_none());
560 assert_eq!(config.poll_interval, Duration::from_secs(5));
561 assert_eq!(config.periodic_interval, Duration::ZERO);
562 assert_eq!(config.debounce, Duration::from_millis(500));
563 assert!(config.enable_sighup);
564 }
565
566 #[tokio::test]
567 async fn test_periodic_reload() {
568 let shared = SharedConfig::new(TestConfig {
569 value: "initial".into(),
570 });
571 let mut rx = shared.subscribe();
572
573 let call_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
574 let call_count_clone = call_count.clone();
575
576 let reloader = ConfigReloader::new(
577 ReloaderConfig {
578 periodic_interval: Duration::from_millis(50),
579 debounce: Duration::from_millis(10),
580 enable_sighup: false,
581 ..Default::default()
582 },
583 shared.clone(),
584 move || {
585 call_count_clone.fetch_add(1, Ordering::Relaxed);
586 Ok(TestConfig {
587 value: "reloaded".into(),
588 })
589 },
590 |_| Ok(()),
591 );
592
593 let handle = reloader.start();
594
595 let result = tokio::time::timeout(Duration::from_secs(2), rx.changed()).await;
597 assert!(result.is_ok(), "Should receive reload notification");
598
599 assert_eq!(shared.read().value, "reloaded");
600 assert!(shared.version() >= 1);
601 assert!(call_count.load(Ordering::Relaxed) >= 1);
602
603 handle.abort();
604 }
605
606 #[tokio::test]
607 async fn test_file_change_triggers_reload() {
608 let dir = TempDir::new().unwrap();
609 let config_path = dir.path().join("config.yaml");
610 fs::write(&config_path, "initial content").unwrap();
611
612 let shared = SharedConfig::new(TestConfig {
613 value: "initial".into(),
614 });
615 let mut rx = shared.subscribe();
616
617 let path_for_reload = config_path.clone();
618 let reloader = ConfigReloader::new(
619 ReloaderConfig {
620 config_path: Some(config_path.clone()),
621 poll_interval: Duration::from_millis(50),
622 debounce: Duration::from_millis(10),
623 enable_sighup: false,
624 ..Default::default()
625 },
626 shared.clone(),
627 move || {
628 let content = fs::read_to_string(&path_for_reload).unwrap_or_default();
629 Ok(TestConfig { value: content })
630 },
631 |_| Ok(()),
632 );
633
634 let handle = reloader.start();
635
636 tokio::time::sleep(Duration::from_millis(150)).await;
638
639 {
641 let mut file = fs::OpenOptions::new()
642 .write(true)
643 .truncate(true)
644 .open(&config_path)
645 .unwrap();
646 file.write_all(b"updated content").unwrap();
647 file.sync_all().unwrap();
648 }
649
650 let result = tokio::time::timeout(Duration::from_secs(2), rx.changed()).await;
652 if result.is_ok() {
653 assert_eq!(shared.read().value, "updated content");
654 }
655
656 handle.abort();
657 }
658
659 #[tokio::test]
660 async fn test_validation_failure_preserves_config() {
661 let shared = SharedConfig::new(TestConfig {
662 value: "good".into(),
663 });
664
665 let should_fail = Arc::new(AtomicBool::new(true));
666 let should_fail_clone = should_fail.clone();
667
668 let reloader = ConfigReloader::new(
669 ReloaderConfig {
670 periodic_interval: Duration::from_millis(50),
671 debounce: Duration::from_millis(10),
672 enable_sighup: false,
673 ..Default::default()
674 },
675 shared.clone(),
676 || {
677 Ok(TestConfig {
678 value: "bad".into(),
679 })
680 },
681 move |_cfg| {
682 if should_fail_clone.load(Ordering::Relaxed) {
683 Err("validation failed".into())
684 } else {
685 Ok(())
686 }
687 },
688 );
689
690 let handle = reloader.start();
691
692 tokio::time::sleep(Duration::from_millis(200)).await;
694
695 assert_eq!(shared.read().value, "good");
697 assert_eq!(shared.version(), 0);
698
699 handle.abort();
700 }
701
702 #[tokio::test]
703 async fn test_reload_fn_error_preserves_config() {
704 let shared = SharedConfig::new(TestConfig {
705 value: "good".into(),
706 });
707
708 let reloader = ConfigReloader::new(
709 ReloaderConfig {
710 periodic_interval: Duration::from_millis(50),
711 debounce: Duration::from_millis(10),
712 enable_sighup: false,
713 ..Default::default()
714 },
715 shared.clone(),
716 || Err("load failed".into()),
717 |_| Ok(()),
718 );
719
720 let handle = reloader.start();
721
722 tokio::time::sleep(Duration::from_millis(200)).await;
724
725 assert_eq!(shared.read().value, "good");
727 assert_eq!(shared.version(), 0);
728
729 handle.abort();
730 }
731
732 #[test]
733 fn test_file_mtime() {
734 let dir = TempDir::new().unwrap();
735 let path = dir.path().join("test.txt");
736 fs::write(&path, "content").unwrap();
737
738 let mtime = file_mtime(&path);
739 assert!(mtime.is_some());
740
741 let mtime = file_mtime(&PathBuf::from("/nonexistent/file.txt"));
743 assert!(mtime.is_none());
744 }
745}