1use std::path::PathBuf;
75use std::sync::Arc;
76use std::time::{Duration, Instant, SystemTime};
77
78use tokio::task::JoinHandle;
79use tracing::{debug, error, info, warn};
80
81use super::shared::SharedConfig;
82
83type BoxError = Box<dyn std::error::Error + Send + Sync>;
85
86#[derive(Debug, Clone)]
88pub struct ReloaderConfig {
89 pub config_path: Option<PathBuf>,
91
92 pub poll_interval: Duration,
95
96 pub periodic_interval: Duration,
99
100 pub debounce: Duration,
103
104 pub enable_sighup: bool,
107}
108
109impl Default for ReloaderConfig {
110 fn default() -> Self {
111 Self {
112 config_path: None,
113 poll_interval: Duration::from_secs(5),
114 periodic_interval: Duration::ZERO,
115 debounce: Duration::from_millis(500),
116 enable_sighup: true,
117 }
118 }
119}
120
121type PostReloadHook<T> = Arc<dyn Fn(&T) + Send + Sync>;
132
133pub struct ConfigReloader<T: Clone + Send + Sync + 'static> {
134 config: ReloaderConfig,
135 shared: SharedConfig<T>,
136 reload_fn: Arc<dyn Fn() -> Result<T, BoxError> + Send + Sync>,
137 validate_fn: Arc<dyn Fn(&T) -> Result<(), BoxError> + Send + Sync>,
138 post_reload_hooks: Vec<PostReloadHook<T>>,
139}
140
141impl<T: Clone + Send + Sync + 'static> ConfigReloader<T> {
142 pub fn new(
149 config: ReloaderConfig,
150 shared: SharedConfig<T>,
151 reload_fn: impl Fn() -> Result<T, BoxError> + Send + Sync + 'static,
152 validate_fn: impl Fn(&T) -> Result<(), BoxError> + Send + Sync + 'static,
153 ) -> Self {
154 Self {
155 config,
156 shared,
157 reload_fn: Arc::new(reload_fn),
158 validate_fn: Arc::new(validate_fn),
159 post_reload_hooks: Vec::new(),
160 }
161 }
162
163 #[must_use]
173 pub fn with_post_reload_hook(mut self, hook: impl Fn(&T) + Send + Sync + 'static) -> Self {
174 self.post_reload_hooks.push(Arc::new(hook));
175 self
176 }
177
178 #[must_use]
184 pub fn with_registry_update(self, key: &str) -> Self
185 where
186 T: serde::Serialize + Default,
187 {
188 let key = key.to_string();
189 self.with_post_reload_hook(move |config| {
190 super::registry::update::<T>(&key, config);
191 })
192 }
193
194 pub fn start(self) -> JoinHandle<()> {
199 let has_file = self.config.config_path.is_some();
200 let has_periodic = self.config.periodic_interval > Duration::ZERO;
201 let has_sighup = self.config.enable_sighup;
202
203 info!(
204 file_watch = has_file,
205 periodic = has_periodic,
206 sighup = has_sighup,
207 "Config reloader started"
208 );
209
210 tokio::spawn(async move {
211 self.run_loop().await;
212 })
213 }
214
215 async fn run_loop(self) {
217 #[cfg(feature = "shutdown")]
218 let shutdown_token = crate::shutdown::token();
219
220 let mut last_modified: Option<SystemTime> = match self.config.config_path.as_ref() {
222 Some(p) => file_mtime_async(p).await,
223 None => None,
224 };
225 let mut last_reload = Instant::now();
226
227 let mut poll_timer = self
229 .config
230 .config_path
231 .as_ref()
232 .map(|_| tokio::time::interval(self.config.poll_interval));
233
234 let mut periodic_timer = if self.config.periodic_interval > Duration::ZERO {
236 Some(tokio::time::interval(self.config.periodic_interval))
237 } else {
238 None
239 };
240
241 #[cfg(unix)]
243 let mut sighup = if self.config.enable_sighup {
244 Some(
245 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
246 .expect("failed to register SIGHUP handler"),
247 )
248 } else {
249 None
250 };
251
252 loop {
253 #[cfg(feature = "shutdown")]
255 if shutdown_token.is_cancelled() {
256 info!("Config reloader stopping (shutdown)");
257 return;
258 }
259
260 let trigger_result = {
261 #[cfg(feature = "shutdown")]
262 {
263 tokio::select! {
264 trigger = self.wait_for_trigger(
265 &mut poll_timer,
266 &mut periodic_timer,
267 #[cfg(unix)]
268 &mut sighup,
269 &mut last_modified,
270 ) => Some(trigger),
271 () = shutdown_token.cancelled() => None,
272 }
273 }
274 #[cfg(not(feature = "shutdown"))]
275 {
276 Some(
277 self.wait_for_trigger(
278 &mut poll_timer,
279 &mut periodic_timer,
280 #[cfg(unix)]
281 &mut sighup,
282 &mut last_modified,
283 )
284 .await,
285 )
286 }
287 };
288
289 let Some(trigger) = trigger_result else {
290 info!("Config reloader stopping (shutdown)");
291 return;
292 };
293
294 if last_reload.elapsed() < self.config.debounce {
296 debug!("Debouncing config reload");
297 continue;
298 }
299
300 match trigger {
301 ReloadTrigger::FileChanged => {
302 info!(
303 path = ?self.config.config_path,
304 "Config file changed, reloading"
305 );
306 }
307 ReloadTrigger::Periodic => {
308 info!("Periodic config reload triggered");
309 }
310 ReloadTrigger::Sighup => {
311 info!("SIGHUP received, reloading configuration");
312 }
313 }
314
315 self.do_reload();
316 last_reload = Instant::now();
317 }
318 }
319
320 async fn wait_for_trigger(
324 &self,
325 poll_timer: &mut Option<tokio::time::Interval>,
326 periodic_timer: &mut Option<tokio::time::Interval>,
327 #[cfg(unix)] sighup: &mut Option<tokio::signal::unix::Signal>,
328 last_modified: &mut Option<SystemTime>,
329 ) -> ReloadTrigger {
330 loop {
331 let trigger = self
332 .select_trigger(
333 poll_timer,
334 periodic_timer,
335 #[cfg(unix)]
336 sighup,
337 )
338 .await;
339
340 match trigger {
341 ReloadTrigger::FileChanged => {
342 if let Some(ref path) = self.config.config_path {
344 let current_mtime = file_mtime_async(path).await;
345 let changed = match (&*last_modified, ¤t_mtime) {
346 (Some(last), Some(current)) => current > last,
347 (None, Some(_)) => true,
348 _ => false,
349 };
350 if changed {
351 *last_modified = current_mtime;
352 return ReloadTrigger::FileChanged;
353 }
354 }
355 }
357 other => return other,
358 }
359 }
360 }
361
362 #[cfg(unix)]
364 async fn select_trigger(
365 &self,
366 poll_timer: &mut Option<tokio::time::Interval>,
367 periodic_timer: &mut Option<tokio::time::Interval>,
368 sighup: &mut Option<tokio::signal::unix::Signal>,
369 ) -> ReloadTrigger {
370 tokio::select! {
371 _ = async {
372 match poll_timer.as_mut() {
373 Some(timer) => timer.tick().await,
374 None => std::future::pending().await,
375 }
376 } => ReloadTrigger::FileChanged,
377
378 _ = async {
379 match periodic_timer.as_mut() {
380 Some(timer) => timer.tick().await,
381 None => std::future::pending().await,
382 }
383 } => ReloadTrigger::Periodic,
384
385 () = async {
386 match sighup.as_mut() {
387 Some(sig) => { sig.recv().await; },
388 None => std::future::pending::<()>().await,
389 }
390 } => ReloadTrigger::Sighup,
391 }
392 }
393
394 #[cfg(not(unix))]
396 async fn select_trigger(
397 &self,
398 poll_timer: &mut Option<tokio::time::Interval>,
399 periodic_timer: &mut Option<tokio::time::Interval>,
400 ) -> ReloadTrigger {
401 tokio::select! {
402 _ = async {
403 match poll_timer.as_mut() {
404 Some(timer) => timer.tick().await,
405 None => std::future::pending().await,
406 }
407 } => ReloadTrigger::FileChanged,
408
409 _ = async {
410 match periodic_timer.as_mut() {
411 Some(timer) => timer.tick().await,
412 None => std::future::pending().await,
413 }
414 } => ReloadTrigger::Periodic,
415 }
416 }
417
418 fn do_reload(&self) {
420 match (self.reload_fn)() {
421 Ok(new_config) => {
422 if let Err(e) = (self.validate_fn)(&new_config) {
423 error!(error = %e, "Config reload validation failed, keeping current config");
424 #[cfg(feature = "metrics")]
425 metrics::counter!("config_reloads_total", "result" => "error").increment(1);
426 return;
427 }
428
429 let old_version = self.shared.version();
430 self.shared.update(new_config.clone());
431 let new_version = self.shared.version();
432
433 for hook in &self.post_reload_hooks {
435 hook(&new_config);
436 }
437
438 #[cfg(feature = "metrics")]
439 metrics::counter!("config_reloads_total", "result" => "success").increment(1);
440
441 info!(
442 old_version = old_version,
443 new_version = new_version,
444 "Configuration reloaded successfully"
445 );
446 }
447 Err(e) => {
448 warn!(error = %e, "Config reload failed, keeping current config");
449 #[cfg(feature = "metrics")]
450 metrics::counter!("config_reloads_total", "result" => "error").increment(1);
451 }
452 }
453 }
454}
455
456#[derive(Debug, Clone, Copy, PartialEq, Eq)]
458enum ReloadTrigger {
459 FileChanged,
460 Periodic,
461 #[allow(dead_code)]
462 Sighup,
463}
464
465async fn file_mtime_async(path: &PathBuf) -> Option<SystemTime> {
468 tokio::fs::metadata(path)
469 .await
470 .ok()
471 .and_then(|m| m.modified().ok())
472}
473
474#[cfg(test)]
476fn file_mtime(path: &PathBuf) -> Option<SystemTime> {
477 std::fs::metadata(path).ok().and_then(|m| m.modified().ok())
478}
479
480#[cfg(test)]
481mod tests {
482 use super::*;
483 use std::fs;
484 use std::io::Write;
485 use std::sync::atomic::{AtomicBool, Ordering};
486 use tempfile::TempDir;
487
488 #[derive(Clone, Debug, Default, PartialEq)]
489 struct TestConfig {
490 pub value: String,
491 }
492
493 #[test]
494 fn test_reloader_config_defaults() {
495 let config = ReloaderConfig::default();
496 assert!(config.config_path.is_none());
497 assert_eq!(config.poll_interval, Duration::from_secs(5));
498 assert_eq!(config.periodic_interval, Duration::ZERO);
499 assert_eq!(config.debounce, Duration::from_millis(500));
500 assert!(config.enable_sighup);
501 }
502
503 #[tokio::test]
504 async fn test_periodic_reload() {
505 let shared = SharedConfig::new(TestConfig {
506 value: "initial".into(),
507 });
508 let mut rx = shared.subscribe();
509
510 let call_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
511 let call_count_clone = call_count.clone();
512
513 let reloader = ConfigReloader::new(
514 ReloaderConfig {
515 periodic_interval: Duration::from_millis(50),
516 debounce: Duration::from_millis(10),
517 enable_sighup: false,
518 ..Default::default()
519 },
520 shared.clone(),
521 move || {
522 call_count_clone.fetch_add(1, Ordering::Relaxed);
523 Ok(TestConfig {
524 value: "reloaded".into(),
525 })
526 },
527 |_| Ok(()),
528 );
529
530 let handle = reloader.start();
531
532 let result = tokio::time::timeout(Duration::from_secs(2), rx.changed()).await;
534 assert!(result.is_ok(), "Should receive reload notification");
535
536 assert_eq!(shared.read().value, "reloaded");
537 assert!(shared.version() >= 1);
538 assert!(call_count.load(Ordering::Relaxed) >= 1);
539
540 handle.abort();
541 }
542
543 #[tokio::test]
544 async fn test_file_change_triggers_reload() {
545 let dir = TempDir::new().unwrap();
546 let config_path = dir.path().join("config.yaml");
547 fs::write(&config_path, "initial content").unwrap();
548
549 let shared = SharedConfig::new(TestConfig {
550 value: "initial".into(),
551 });
552 let mut rx = shared.subscribe();
553
554 let path_for_reload = config_path.clone();
555 let reloader = ConfigReloader::new(
556 ReloaderConfig {
557 config_path: Some(config_path.clone()),
558 poll_interval: Duration::from_millis(50),
559 debounce: Duration::from_millis(10),
560 enable_sighup: false,
561 ..Default::default()
562 },
563 shared.clone(),
564 move || {
565 let content = fs::read_to_string(&path_for_reload).unwrap_or_default();
566 Ok(TestConfig { value: content })
567 },
568 |_| Ok(()),
569 );
570
571 let handle = reloader.start();
572
573 tokio::time::sleep(Duration::from_millis(150)).await;
575
576 {
578 let mut file = fs::OpenOptions::new()
579 .write(true)
580 .truncate(true)
581 .open(&config_path)
582 .unwrap();
583 file.write_all(b"updated content").unwrap();
584 file.sync_all().unwrap();
585 }
586
587 let result = tokio::time::timeout(Duration::from_secs(2), rx.changed()).await;
589 if result.is_ok() {
590 assert_eq!(shared.read().value, "updated content");
591 }
592
593 handle.abort();
594 }
595
596 #[tokio::test]
597 async fn test_validation_failure_preserves_config() {
598 let shared = SharedConfig::new(TestConfig {
599 value: "good".into(),
600 });
601
602 let should_fail = Arc::new(AtomicBool::new(true));
603 let should_fail_clone = should_fail.clone();
604
605 let reloader = ConfigReloader::new(
606 ReloaderConfig {
607 periodic_interval: Duration::from_millis(50),
608 debounce: Duration::from_millis(10),
609 enable_sighup: false,
610 ..Default::default()
611 },
612 shared.clone(),
613 || {
614 Ok(TestConfig {
615 value: "bad".into(),
616 })
617 },
618 move |_cfg| {
619 if should_fail_clone.load(Ordering::Relaxed) {
620 Err("validation failed".into())
621 } else {
622 Ok(())
623 }
624 },
625 );
626
627 let handle = reloader.start();
628
629 tokio::time::sleep(Duration::from_millis(200)).await;
631
632 assert_eq!(shared.read().value, "good");
634 assert_eq!(shared.version(), 0);
635
636 handle.abort();
637 }
638
639 #[tokio::test]
640 async fn test_reload_fn_error_preserves_config() {
641 let shared = SharedConfig::new(TestConfig {
642 value: "good".into(),
643 });
644
645 let reloader = ConfigReloader::new(
646 ReloaderConfig {
647 periodic_interval: Duration::from_millis(50),
648 debounce: Duration::from_millis(10),
649 enable_sighup: false,
650 ..Default::default()
651 },
652 shared.clone(),
653 || Err("load failed".into()),
654 |_| Ok(()),
655 );
656
657 let handle = reloader.start();
658
659 tokio::time::sleep(Duration::from_millis(200)).await;
661
662 assert_eq!(shared.read().value, "good");
664 assert_eq!(shared.version(), 0);
665
666 handle.abort();
667 }
668
669 #[test]
670 fn test_file_mtime() {
671 let dir = TempDir::new().unwrap();
672 let path = dir.path().join("test.txt");
673 fs::write(&path, "content").unwrap();
674
675 let mtime = file_mtime(&path);
676 assert!(mtime.is_some());
677
678 let mtime = file_mtime(&PathBuf::from("/nonexistent/file.txt"));
680 assert!(mtime.is_none());
681 }
682}