1use std::path::PathBuf;
75use std::sync::Arc;
76use std::time::{Duration, Instant, SystemTime};
77
78use tokio::task::JoinHandle;
79use tracing::{debug, error, info, warn};
80
81use super::shared::SharedConfig;
82
83type BoxError = Box<dyn std::error::Error + Send + Sync>;
85
86#[derive(Debug, Clone)]
88pub struct ReloaderConfig {
89 pub config_path: Option<PathBuf>,
91
92 pub poll_interval: Duration,
95
96 pub periodic_interval: Duration,
99
100 pub debounce: Duration,
103
104 pub enable_sighup: bool,
107}
108
109impl Default for ReloaderConfig {
110 fn default() -> Self {
111 Self {
112 config_path: None,
113 poll_interval: Duration::from_secs(5),
114 periodic_interval: Duration::ZERO,
115 debounce: Duration::from_millis(500),
116 enable_sighup: true,
117 }
118 }
119}
120
121type PostReloadHook<T> = Arc<dyn Fn(&T) + Send + Sync>;
132
133pub struct ConfigReloader<T: Clone + Send + Sync + 'static> {
134 config: ReloaderConfig,
135 shared: SharedConfig<T>,
136 reload_fn: Arc<dyn Fn() -> Result<T, BoxError> + Send + Sync>,
137 validate_fn: Arc<dyn Fn(&T) -> Result<(), BoxError> + Send + Sync>,
138 post_reload_hooks: Vec<PostReloadHook<T>>,
139}
140
141impl<T: Clone + Send + Sync + 'static> ConfigReloader<T> {
142 pub fn new(
149 config: ReloaderConfig,
150 shared: SharedConfig<T>,
151 reload_fn: impl Fn() -> Result<T, BoxError> + Send + Sync + 'static,
152 validate_fn: impl Fn(&T) -> Result<(), BoxError> + Send + Sync + 'static,
153 ) -> Self {
154 Self {
155 config,
156 shared,
157 reload_fn: Arc::new(reload_fn),
158 validate_fn: Arc::new(validate_fn),
159 post_reload_hooks: Vec::new(),
160 }
161 }
162
163 #[must_use]
173 pub fn with_post_reload_hook(mut self, hook: impl Fn(&T) + Send + Sync + 'static) -> Self {
174 self.post_reload_hooks.push(Arc::new(hook));
175 self
176 }
177
178 #[must_use]
184 pub fn with_registry_update(self, key: &str) -> Self
185 where
186 T: serde::Serialize + Default,
187 {
188 let key = key.to_string();
189 self.with_post_reload_hook(move |config| {
190 super::registry::update::<T>(&key, config);
191 })
192 }
193
194 pub fn start(self) -> JoinHandle<()> {
199 let has_file = self.config.config_path.is_some();
200 let has_periodic = self.config.periodic_interval > Duration::ZERO;
201 let has_sighup = self.config.enable_sighup;
202
203 info!(
204 file_watch = has_file,
205 periodic = has_periodic,
206 sighup = has_sighup,
207 "Config reloader started"
208 );
209
210 tokio::spawn(async move {
211 self.run_loop().await;
212 })
213 }
214
215 async fn run_loop(self) {
217 #[cfg(feature = "shutdown")]
218 let shutdown_token = crate::shutdown::token();
219
220 let mut last_modified: Option<SystemTime> = match self.config.config_path.as_ref() {
222 Some(p) => file_mtime_async(p).await,
223 None => None,
224 };
225 let mut last_reload = Instant::now();
226
227 let mut poll_timer = self
229 .config
230 .config_path
231 .as_ref()
232 .map(|_| tokio::time::interval(self.config.poll_interval));
233
234 let mut periodic_timer = if self.config.periodic_interval > Duration::ZERO {
236 Some(tokio::time::interval(self.config.periodic_interval))
237 } else {
238 None
239 };
240
241 #[cfg(unix)]
243 let mut sighup = if self.config.enable_sighup {
244 Some(
245 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
246 .expect("failed to register SIGHUP handler"),
247 )
248 } else {
249 None
250 };
251
252 loop {
253 #[cfg(feature = "shutdown")]
255 if shutdown_token.is_cancelled() {
256 info!("Config reloader stopping (shutdown)");
257 return;
258 }
259
260 let trigger_result = {
261 #[cfg(feature = "shutdown")]
262 {
263 tokio::select! {
264 trigger = self.wait_for_trigger(
265 &mut poll_timer,
266 &mut periodic_timer,
267 #[cfg(unix)]
268 &mut sighup,
269 &mut last_modified,
270 ) => Some(trigger),
271 () = shutdown_token.cancelled() => None,
272 }
273 }
274 #[cfg(not(feature = "shutdown"))]
275 {
276 Some(
277 self.wait_for_trigger(
278 &mut poll_timer,
279 &mut periodic_timer,
280 #[cfg(unix)]
281 &mut sighup,
282 &mut last_modified,
283 )
284 .await,
285 )
286 }
287 };
288
289 let Some(trigger) = trigger_result else {
290 info!("Config reloader stopping (shutdown)");
291 return;
292 };
293
294 if last_reload.elapsed() < self.config.debounce {
296 debug!("Debouncing config reload");
297 continue;
298 }
299
300 match trigger {
301 ReloadTrigger::FileChanged => {
302 info!(
303 path = ?self.config.config_path,
304 "Config file changed, reloading"
305 );
306 }
307 ReloadTrigger::Periodic => {
308 info!("Periodic config reload triggered");
309 }
310 ReloadTrigger::Sighup => {
311 info!("SIGHUP received, reloading configuration");
312 }
313 }
314
315 self.do_reload();
316 last_reload = Instant::now();
317 }
318 }
319
320 async fn wait_for_trigger(
324 &self,
325 poll_timer: &mut Option<tokio::time::Interval>,
326 periodic_timer: &mut Option<tokio::time::Interval>,
327 #[cfg(unix)] sighup: &mut Option<tokio::signal::unix::Signal>,
328 last_modified: &mut Option<SystemTime>,
329 ) -> ReloadTrigger {
330 loop {
331 let trigger = self
332 .select_trigger(
333 poll_timer,
334 periodic_timer,
335 #[cfg(unix)]
336 sighup,
337 )
338 .await;
339
340 match trigger {
341 ReloadTrigger::FileChanged => {
342 if let Some(ref path) = self.config.config_path {
344 let current_mtime = file_mtime_async(path).await;
345 let changed = match (&*last_modified, ¤t_mtime) {
346 (Some(last), Some(current)) => current > last,
347 (None, Some(_)) => true,
348 _ => false,
349 };
350 if changed {
351 *last_modified = current_mtime;
352 return ReloadTrigger::FileChanged;
353 }
354 }
355 }
357 other => return other,
358 }
359 }
360 }
361
362 #[cfg(unix)]
364 async fn select_trigger(
365 &self,
366 poll_timer: &mut Option<tokio::time::Interval>,
367 periodic_timer: &mut Option<tokio::time::Interval>,
368 sighup: &mut Option<tokio::signal::unix::Signal>,
369 ) -> ReloadTrigger {
370 tokio::select! {
371 _ = async {
372 match poll_timer.as_mut() {
373 Some(timer) => timer.tick().await,
374 None => std::future::pending().await,
375 }
376 } => ReloadTrigger::FileChanged,
377
378 _ = async {
379 match periodic_timer.as_mut() {
380 Some(timer) => timer.tick().await,
381 None => std::future::pending().await,
382 }
383 } => ReloadTrigger::Periodic,
384
385 () = async {
386 match sighup.as_mut() {
387 Some(sig) => { sig.recv().await; },
388 None => std::future::pending::<()>().await,
389 }
390 } => ReloadTrigger::Sighup,
391 }
392 }
393
394 #[cfg(not(unix))]
396 async fn select_trigger(
397 &self,
398 poll_timer: &mut Option<tokio::time::Interval>,
399 periodic_timer: &mut Option<tokio::time::Interval>,
400 ) -> ReloadTrigger {
401 tokio::select! {
402 _ = async {
403 match poll_timer.as_mut() {
404 Some(timer) => timer.tick().await,
405 None => std::future::pending().await,
406 }
407 } => ReloadTrigger::FileChanged,
408
409 _ = async {
410 match periodic_timer.as_mut() {
411 Some(timer) => timer.tick().await,
412 None => std::future::pending().await,
413 }
414 } => ReloadTrigger::Periodic,
415 }
416 }
417
418 fn do_reload(&self) {
425 match (self.reload_fn)() {
426 Ok(new_config) => {
427 if let Err(e) = (self.validate_fn)(&new_config) {
428 error!(error = %e, "Config reload validation failed, keeping current config");
429 #[cfg(feature = "metrics")]
430 metrics::counter!("config_reloads_total", "result" => "error").increment(1);
431 return;
432 }
433
434 let old_version = self.shared.version();
435 self.shared.update(new_config.clone());
436 let new_version = self.shared.version();
437
438 for hook in &self.post_reload_hooks {
440 hook(&new_config);
441 }
442
443 #[cfg(feature = "metrics")]
444 metrics::counter!("config_reloads_total", "result" => "success").increment(1);
445
446 info!(
447 old_version = old_version,
448 new_version = new_version,
449 "Configuration reloaded successfully"
450 );
451 }
452 Err(e) => {
453 warn!(error = %e, "Config reload failed, keeping current config");
454 #[cfg(feature = "metrics")]
455 metrics::counter!("config_reloads_total", "result" => "error").increment(1);
456 }
457 }
458 }
459}
460
461#[derive(Debug, Clone, Copy, PartialEq, Eq)]
463enum ReloadTrigger {
464 FileChanged,
465 Periodic,
466 #[allow(dead_code)]
467 Sighup,
468}
469
470async fn file_mtime_async(path: &PathBuf) -> Option<SystemTime> {
473 tokio::fs::metadata(path)
474 .await
475 .ok()
476 .and_then(|m| m.modified().ok())
477}
478
479#[cfg(test)]
481fn file_mtime(path: &PathBuf) -> Option<SystemTime> {
482 std::fs::metadata(path).ok().and_then(|m| m.modified().ok())
483}
484
485#[cfg(test)]
486mod tests {
487 use super::*;
488 use std::fs;
489 use std::io::Write;
490 use std::sync::atomic::{AtomicBool, Ordering};
491 use tempfile::TempDir;
492
493 #[derive(Clone, Debug, Default, PartialEq)]
494 struct TestConfig {
495 pub value: String,
496 }
497
498 #[test]
499 fn test_reloader_config_defaults() {
500 let config = ReloaderConfig::default();
501 assert!(config.config_path.is_none());
502 assert_eq!(config.poll_interval, Duration::from_secs(5));
503 assert_eq!(config.periodic_interval, Duration::ZERO);
504 assert_eq!(config.debounce, Duration::from_millis(500));
505 assert!(config.enable_sighup);
506 }
507
508 #[tokio::test]
509 async fn test_periodic_reload() {
510 let shared = SharedConfig::new(TestConfig {
511 value: "initial".into(),
512 });
513 let mut rx = shared.subscribe();
514
515 let call_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
516 let call_count_clone = call_count.clone();
517
518 let reloader = ConfigReloader::new(
519 ReloaderConfig {
520 periodic_interval: Duration::from_millis(50),
521 debounce: Duration::from_millis(10),
522 enable_sighup: false,
523 ..Default::default()
524 },
525 shared.clone(),
526 move || {
527 call_count_clone.fetch_add(1, Ordering::Relaxed);
528 Ok(TestConfig {
529 value: "reloaded".into(),
530 })
531 },
532 |_| Ok(()),
533 );
534
535 let handle = reloader.start();
536
537 let result = tokio::time::timeout(Duration::from_secs(2), rx.changed()).await;
539 assert!(result.is_ok(), "Should receive reload notification");
540
541 assert_eq!(shared.read().value, "reloaded");
542 assert!(shared.version() >= 1);
543 assert!(call_count.load(Ordering::Relaxed) >= 1);
544
545 handle.abort();
546 }
547
548 #[tokio::test]
549 async fn test_file_change_triggers_reload() {
550 let dir = TempDir::new().unwrap();
551 let config_path = dir.path().join("config.yaml");
552 fs::write(&config_path, "initial content").unwrap();
553
554 let shared = SharedConfig::new(TestConfig {
555 value: "initial".into(),
556 });
557 let mut rx = shared.subscribe();
558
559 let path_for_reload = config_path.clone();
560 let reloader = ConfigReloader::new(
561 ReloaderConfig {
562 config_path: Some(config_path.clone()),
563 poll_interval: Duration::from_millis(50),
564 debounce: Duration::from_millis(10),
565 enable_sighup: false,
566 ..Default::default()
567 },
568 shared.clone(),
569 move || {
570 let content = fs::read_to_string(&path_for_reload).unwrap_or_default();
571 Ok(TestConfig { value: content })
572 },
573 |_| Ok(()),
574 );
575
576 let handle = reloader.start();
577
578 tokio::time::sleep(Duration::from_millis(150)).await;
580
581 {
583 let mut file = fs::OpenOptions::new()
584 .write(true)
585 .truncate(true)
586 .open(&config_path)
587 .unwrap();
588 file.write_all(b"updated content").unwrap();
589 file.sync_all().unwrap();
590 }
591
592 let result = tokio::time::timeout(Duration::from_secs(2), rx.changed()).await;
594 if result.is_ok() {
595 assert_eq!(shared.read().value, "updated content");
596 }
597
598 handle.abort();
599 }
600
601 #[tokio::test]
602 async fn test_validation_failure_preserves_config() {
603 let shared = SharedConfig::new(TestConfig {
604 value: "good".into(),
605 });
606
607 let should_fail = Arc::new(AtomicBool::new(true));
608 let should_fail_clone = should_fail.clone();
609
610 let reloader = ConfigReloader::new(
611 ReloaderConfig {
612 periodic_interval: Duration::from_millis(50),
613 debounce: Duration::from_millis(10),
614 enable_sighup: false,
615 ..Default::default()
616 },
617 shared.clone(),
618 || {
619 Ok(TestConfig {
620 value: "bad".into(),
621 })
622 },
623 move |_cfg| {
624 if should_fail_clone.load(Ordering::Relaxed) {
625 Err("validation failed".into())
626 } else {
627 Ok(())
628 }
629 },
630 );
631
632 let handle = reloader.start();
633
634 tokio::time::sleep(Duration::from_millis(200)).await;
636
637 assert_eq!(shared.read().value, "good");
639 assert_eq!(shared.version(), 0);
640
641 handle.abort();
642 }
643
644 #[tokio::test]
645 async fn test_reload_fn_error_preserves_config() {
646 let shared = SharedConfig::new(TestConfig {
647 value: "good".into(),
648 });
649
650 let reloader = ConfigReloader::new(
651 ReloaderConfig {
652 periodic_interval: Duration::from_millis(50),
653 debounce: Duration::from_millis(10),
654 enable_sighup: false,
655 ..Default::default()
656 },
657 shared.clone(),
658 || Err("load failed".into()),
659 |_| Ok(()),
660 );
661
662 let handle = reloader.start();
663
664 tokio::time::sleep(Duration::from_millis(200)).await;
666
667 assert_eq!(shared.read().value, "good");
669 assert_eq!(shared.version(), 0);
670
671 handle.abort();
672 }
673
674 #[test]
675 fn test_file_mtime() {
676 let dir = TempDir::new().unwrap();
677 let path = dir.path().join("test.txt");
678 fs::write(&path, "content").unwrap();
679
680 let mtime = file_mtime(&path);
681 assert!(mtime.is_some());
682
683 let mtime = file_mtime(&PathBuf::from("/nonexistent/file.txt"));
685 assert!(mtime.is_none());
686 }
687}