Skip to main content

hyperi_rustlib/config/
reloader.rs

1// Project:   hyperi-rustlib
2// File:      src/config/reloader.rs
3// Purpose:   Universal config hot-reload with SIGHUP, periodic, and file polling
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Universal configuration reloader for DFE components.
10//!
11//! `ConfigReloader<T>` provides three reload triggers, any combination of
12//! which can be enabled simultaneously:
13//!
14//! 1. **SIGHUP** (Unix only) -- standard daemon reload signal
15//! 2. **Periodic timer** -- reload every N seconds
16//! 3. **File polling** -- detect config file changes via mtime comparison
17//!
18//! The reloader calls a user-supplied `reload_fn` to load config and a
19//! `validate_fn` to validate before applying. On success it updates the
20//! `SharedConfig<T>` which notifies all subscribers.
21//!
22//! ## Usage
23//!
24//! ```rust,no_run
25//! use std::path::PathBuf;
26//! use std::time::Duration;
27//! use hyperi_rustlib::config::reloader::{ConfigReloader, ReloaderConfig};
28//! use hyperi_rustlib::config::shared::SharedConfig;
29//!
30//! #[derive(Clone, Debug, Default)]
31//! struct AppConfig {
32//!     pub workers: usize,
33//! }
34//!
35//! #[tokio::main]
36//! async fn main() {
37//!     let config = AppConfig { workers: 4 };
38//!     let shared = SharedConfig::new(config);
39//!
40//!     let reloader_config = ReloaderConfig {
41//!         config_path: Some(PathBuf::from("config.yaml")),
42//!         poll_interval: Duration::from_secs(5),
43//!         periodic_interval: Duration::ZERO,  // disabled
44//!         debounce: Duration::from_millis(500),
45//!         enable_sighup: true,
46//!     };
47//!
48//!     let reloader = ConfigReloader::new(
49//!         reloader_config,
50//!         shared.clone(),
51//!         || {
52//!             // Your config loading logic here
53//!             Ok(AppConfig { workers: 8 })
54//!         },
55//!         |cfg| {
56//!             // Your validation logic here
57//!             if cfg.workers == 0 {
58//!                 return Err("workers must be > 0".into());
59//!             }
60//!             Ok(())
61//!         },
62//!     );
63//!
64//!     let _handle = reloader.start();
65//!     // ... run your application ...
66//! }
67//! ```
68//!
69//! ## Migration from Component-Specific Implementations
70//!
71//! ### From dfe-loader's `ConfigWatcher` (file polling)
72//!
73//! ```text
74//! // Before:
75//! let watcher = ConfigWatcher::new(WatcherConfig {
76//!     config_path, poll_interval, debounce, enabled: true,
77//! }, shared)?;
78//! let _handle = watcher.start();
79//!
80//! // After:
81//! let reloader = ConfigReloader::new(
82//!     ReloaderConfig {
83//!         config_path: Some(config_path),
84//!         poll_interval,
85//!         debounce,
86//!         enable_sighup: true,      // bonus: also reload on SIGHUP
87//!         periodic_interval: Duration::ZERO,
88//!     },
89//!     shared,
90//!     || Config::load(path),        // your reload function
91//!     |c| c.validate(),             // your validate function
92//! );
93//! let _handle = reloader.start();
94//! ```
95//!
96//! ### From dfe-receiver's `config_reload_task` (SIGHUP + periodic)
97//!
98//! ```text
99//! // Before (inline in main.rs):
100//! tokio::spawn(config_reload_task(state, reload_secs));
101//!
102//! // After:
103//! let reloader = ConfigReloader::new(
104//!     ReloaderConfig {
105//!         periodic_interval: Duration::from_secs(reload_secs),
106//!         enable_sighup: true,
107//!         config_path: None,         // no file watching
108//!         ..Default::default()
109//!     },
110//!     shared,
111//!     || Config::load(path),
112//!     |c| c.validate(),
113//! );
114//! let _handle = reloader.start();
115//! ```
116//!
117//! ### From dfe-archiver (not yet wired)
118//!
119//! The archiver has `SharedConfig` and `reload_config()` ready but not
120//! connected. Use `ConfigReloader` to complete the integration:
121//!
122//! ```text
123//! let reloader = ConfigReloader::new(
124//!     ReloaderConfig {
125//!         config_path: config.config_path.as_ref().map(PathBuf::from),
126//!         periodic_interval: Duration::from_secs(config.config_reload_secs),
127//!         enable_sighup: true,
128//!         ..Default::default()
129//!     },
130//!     shared,
131//!     || load_config(config_path),
132//!     |c| validate_config(c),
133//! );
134//! let _handle = reloader.start();
135//! ```
136
137use std::path::PathBuf;
138use std::sync::Arc;
139use std::time::{Duration, Instant, SystemTime};
140
141use tokio::task::JoinHandle;
142use tracing::{debug, error, info, warn};
143
144use super::shared::SharedConfig;
145
146/// Boxed error type for reload/validate callbacks.
147type BoxError = Box<dyn std::error::Error + Send + Sync>;
148
149/// Configuration for the reloader.
150#[derive(Debug, Clone)]
151pub struct ReloaderConfig {
152    /// Path to config file to watch for changes (None = no file watching).
153    pub config_path: Option<PathBuf>,
154
155    /// File polling interval. Only used when `config_path` is Some.
156    /// Default: 5 seconds.
157    pub poll_interval: Duration,
158
159    /// Periodic reload interval. Set to `Duration::ZERO` to disable.
160    /// Default: disabled.
161    pub periodic_interval: Duration,
162
163    /// Minimum time between reloads (debounce).
164    /// Default: 500ms.
165    pub debounce: Duration,
166
167    /// Enable SIGHUP reload trigger (Unix only, ignored on other platforms).
168    /// Default: true.
169    pub enable_sighup: bool,
170}
171
172impl Default for ReloaderConfig {
173    fn default() -> Self {
174        Self {
175            config_path: None,
176            poll_interval: Duration::from_secs(5),
177            periodic_interval: Duration::ZERO,
178            debounce: Duration::from_millis(500),
179            enable_sighup: true,
180        }
181    }
182}
183
184/// Universal configuration reloader.
185///
186/// Supports three reload triggers (any combination):
187/// - **SIGHUP** (Unix) -- `enable_sighup: true`
188/// - **Periodic timer** -- `periodic_interval > 0`
189/// - **File polling** -- `config_path: Some(path)`
190///
191/// On each trigger, calls `reload_fn` to load new config, `validate_fn` to
192/// validate, then updates the `SharedConfig<T>` if valid.
193/// Callback invoked after a successful reload with the new config value.
194type PostReloadHook<T> = Arc<dyn Fn(&T) + Send + Sync>;
195
196pub struct ConfigReloader<T: Clone + Send + Sync + 'static> {
197    config: ReloaderConfig,
198    shared: SharedConfig<T>,
199    reload_fn: Arc<dyn Fn() -> Result<T, BoxError> + Send + Sync>,
200    validate_fn: Arc<dyn Fn(&T) -> Result<(), BoxError> + Send + Sync>,
201    post_reload_hooks: Vec<PostReloadHook<T>>,
202}
203
204impl<T: Clone + Send + Sync + 'static> ConfigReloader<T> {
205    /// Create a new config reloader.
206    ///
207    /// - `config`: Reload trigger configuration
208    /// - `shared`: Shared config to update on successful reload
209    /// - `reload_fn`: Called to load a fresh config (re-reads file + env)
210    /// - `validate_fn`: Called to validate before applying (return Err to reject)
211    pub fn new(
212        config: ReloaderConfig,
213        shared: SharedConfig<T>,
214        reload_fn: impl Fn() -> Result<T, BoxError> + Send + Sync + 'static,
215        validate_fn: impl Fn(&T) -> Result<(), BoxError> + Send + Sync + 'static,
216    ) -> Self {
217        Self {
218            config,
219            shared,
220            reload_fn: Arc::new(reload_fn),
221            validate_fn: Arc::new(validate_fn),
222            post_reload_hooks: Vec::new(),
223        }
224    }
225
226    /// Add a hook that runs after each successful reload.
227    ///
228    /// Use this to connect to the config registry:
229    ///
230    /// ```rust,no_run
231    /// # use hyperi_rustlib::config::reloader::ConfigReloader;
232    /// # use hyperi_rustlib::config::registry;
233    /// // reloader.with_registry_update("my_app");
234    /// ```
235    #[must_use]
236    pub fn with_post_reload_hook(mut self, hook: impl Fn(&T) + Send + Sync + 'static) -> Self {
237        self.post_reload_hooks.push(Arc::new(hook));
238        self
239    }
240
241    /// Connect to the config registry: after each successful reload,
242    /// call `registry::update()` so listeners are notified and the
243    /// registry reflects the new effective config.
244    ///
245    /// Requires `T: Serialize + Default`.
246    #[must_use]
247    pub fn with_registry_update(self, key: &str) -> Self
248    where
249        T: serde::Serialize + Default,
250    {
251        let key = key.to_string();
252        self.with_post_reload_hook(move |config| {
253            super::registry::update::<T>(&key, config);
254        })
255    }
256
257    /// Start the reload loop in a background task.
258    ///
259    /// Returns a `JoinHandle` that can be used to abort the reloader.
260    /// The task runs until cancelled or the process exits.
261    pub fn start(self) -> JoinHandle<()> {
262        let has_file = self.config.config_path.is_some();
263        let has_periodic = self.config.periodic_interval > Duration::ZERO;
264        let has_sighup = self.config.enable_sighup;
265
266        info!(
267            file_watch = has_file,
268            periodic = has_periodic,
269            sighup = has_sighup,
270            "Config reloader started"
271        );
272
273        tokio::spawn(async move {
274            self.run_loop().await;
275        })
276    }
277
278    /// Main reload loop -- waits for any trigger, then attempts reload.
279    async fn run_loop(self) {
280        #[cfg(feature = "shutdown")]
281        let shutdown_token = crate::shutdown::token();
282
283        // File polling state
284        let mut last_modified: Option<SystemTime> = match self.config.config_path.as_ref() {
285            Some(p) => file_mtime_async(p).await,
286            None => None,
287        };
288        let mut last_reload = Instant::now();
289
290        // Set up poll timer (for file watching)
291        let mut poll_timer = self
292            .config
293            .config_path
294            .as_ref()
295            .map(|_| tokio::time::interval(self.config.poll_interval));
296
297        // Set up periodic timer
298        let mut periodic_timer = if self.config.periodic_interval > Duration::ZERO {
299            Some(tokio::time::interval(self.config.periodic_interval))
300        } else {
301            None
302        };
303
304        // Set up SIGHUP handler
305        #[cfg(unix)]
306        let mut sighup = if self.config.enable_sighup {
307            Some(
308                tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
309                    .expect("failed to register SIGHUP handler"),
310            )
311        } else {
312            None
313        };
314
315        loop {
316            // Check for global shutdown before waiting for next trigger
317            #[cfg(feature = "shutdown")]
318            if shutdown_token.is_cancelled() {
319                info!("Config reloader stopping (shutdown)");
320                return;
321            }
322
323            let trigger_result = {
324                #[cfg(feature = "shutdown")]
325                {
326                    tokio::select! {
327                        trigger = self.wait_for_trigger(
328                            &mut poll_timer,
329                            &mut periodic_timer,
330                            #[cfg(unix)]
331                            &mut sighup,
332                            &mut last_modified,
333                        ) => Some(trigger),
334                        () = shutdown_token.cancelled() => None,
335                    }
336                }
337                #[cfg(not(feature = "shutdown"))]
338                {
339                    Some(
340                        self.wait_for_trigger(
341                            &mut poll_timer,
342                            &mut periodic_timer,
343                            #[cfg(unix)]
344                            &mut sighup,
345                            &mut last_modified,
346                        )
347                        .await,
348                    )
349                }
350            };
351
352            let Some(trigger) = trigger_result else {
353                info!("Config reloader stopping (shutdown)");
354                return;
355            };
356
357            // Debounce check
358            if last_reload.elapsed() < self.config.debounce {
359                debug!("Debouncing config reload");
360                continue;
361            }
362
363            match trigger {
364                ReloadTrigger::FileChanged => {
365                    info!(
366                        path = ?self.config.config_path,
367                        "Config file changed, reloading"
368                    );
369                }
370                ReloadTrigger::Periodic => {
371                    info!("Periodic config reload triggered");
372                }
373                ReloadTrigger::Sighup => {
374                    info!("SIGHUP received, reloading configuration");
375                }
376            }
377
378            self.do_reload();
379            last_reload = Instant::now();
380        }
381    }
382
383    /// Wait for the next reload trigger.
384    ///
385    /// Returns which trigger fired. For file polling, also updates last_modified.
386    async fn wait_for_trigger(
387        &self,
388        poll_timer: &mut Option<tokio::time::Interval>,
389        periodic_timer: &mut Option<tokio::time::Interval>,
390        #[cfg(unix)] sighup: &mut Option<tokio::signal::unix::Signal>,
391        last_modified: &mut Option<SystemTime>,
392    ) -> ReloadTrigger {
393        loop {
394            let trigger = self
395                .select_trigger(
396                    poll_timer,
397                    periodic_timer,
398                    #[cfg(unix)]
399                    sighup,
400                )
401                .await;
402
403            match trigger {
404                ReloadTrigger::FileChanged => {
405                    // Check if file actually changed (mtime comparison)
406                    if let Some(ref path) = self.config.config_path {
407                        let current_mtime = file_mtime_async(path).await;
408                        let changed = match (&*last_modified, &current_mtime) {
409                            (Some(last), Some(current)) => current > last,
410                            (None, Some(_)) => true,
411                            _ => false,
412                        };
413                        if changed {
414                            *last_modified = current_mtime;
415                            return ReloadTrigger::FileChanged;
416                        }
417                    }
418                    // No actual change, loop back
419                }
420                other => return other,
421            }
422        }
423    }
424
425    /// Select on all enabled triggers, returning which one fired first.
426    #[cfg(unix)]
427    async fn select_trigger(
428        &self,
429        poll_timer: &mut Option<tokio::time::Interval>,
430        periodic_timer: &mut Option<tokio::time::Interval>,
431        sighup: &mut Option<tokio::signal::unix::Signal>,
432    ) -> ReloadTrigger {
433        tokio::select! {
434            _ = async {
435                match poll_timer.as_mut() {
436                    Some(timer) => timer.tick().await,
437                    None => std::future::pending().await,
438                }
439            } => ReloadTrigger::FileChanged,
440
441            _ = async {
442                match periodic_timer.as_mut() {
443                    Some(timer) => timer.tick().await,
444                    None => std::future::pending().await,
445                }
446            } => ReloadTrigger::Periodic,
447
448            () = async {
449                match sighup.as_mut() {
450                    Some(sig) => { sig.recv().await; },
451                    None => std::future::pending::<()>().await,
452                }
453            } => ReloadTrigger::Sighup,
454        }
455    }
456
457    /// Select on all enabled triggers (non-Unix: no SIGHUP).
458    #[cfg(not(unix))]
459    async fn select_trigger(
460        &self,
461        poll_timer: &mut Option<tokio::time::Interval>,
462        periodic_timer: &mut Option<tokio::time::Interval>,
463    ) -> ReloadTrigger {
464        tokio::select! {
465            _ = async {
466                match poll_timer.as_mut() {
467                    Some(timer) => timer.tick().await,
468                    None => std::future::pending().await,
469                }
470            } => ReloadTrigger::FileChanged,
471
472            _ = async {
473                match periodic_timer.as_mut() {
474                    Some(timer) => timer.tick().await,
475                    None => std::future::pending().await,
476                }
477            } => ReloadTrigger::Periodic,
478        }
479    }
480
481    /// Attempt to reload config: load → validate → update shared.
482    fn do_reload(&self) {
483        match (self.reload_fn)() {
484            Ok(new_config) => {
485                if let Err(e) = (self.validate_fn)(&new_config) {
486                    error!(error = %e, "Config reload validation failed, keeping current config");
487                    #[cfg(feature = "metrics")]
488                    metrics::counter!("config_reloads_total", "result" => "error").increment(1);
489                    return;
490                }
491
492                let old_version = self.shared.version();
493                self.shared.update(new_config.clone());
494                let new_version = self.shared.version();
495
496                // Run post-reload hooks (registry update, etc.)
497                for hook in &self.post_reload_hooks {
498                    hook(&new_config);
499                }
500
501                #[cfg(feature = "metrics")]
502                metrics::counter!("config_reloads_total", "result" => "success").increment(1);
503
504                info!(
505                    old_version = old_version,
506                    new_version = new_version,
507                    "Configuration reloaded successfully"
508                );
509            }
510            Err(e) => {
511                warn!(error = %e, "Config reload failed, keeping current config");
512                #[cfg(feature = "metrics")]
513                metrics::counter!("config_reloads_total", "result" => "error").increment(1);
514            }
515        }
516    }
517}
518
519/// Which trigger caused a reload.
520#[derive(Debug, Clone, Copy, PartialEq, Eq)]
521enum ReloadTrigger {
522    FileChanged,
523    Periodic,
524    #[allow(dead_code)]
525    Sighup,
526}
527
528/// Get the modification time of a file. Used inside `run_loop` so the
529/// periodic poll doesn't block the runtime thread.
530async fn file_mtime_async(path: &PathBuf) -> Option<SystemTime> {
531    tokio::fs::metadata(path)
532        .await
533        .ok()
534        .and_then(|m| m.modified().ok())
535}
536
537/// Sync mtime helper -- used only by the sync-context test below.
538#[cfg(test)]
539fn file_mtime(path: &PathBuf) -> Option<SystemTime> {
540    std::fs::metadata(path).ok().and_then(|m| m.modified().ok())
541}
542
543#[cfg(test)]
544mod tests {
545    use super::*;
546    use std::fs;
547    use std::io::Write;
548    use std::sync::atomic::{AtomicBool, Ordering};
549    use tempfile::TempDir;
550
551    #[derive(Clone, Debug, Default, PartialEq)]
552    struct TestConfig {
553        pub value: String,
554    }
555
556    #[test]
557    fn test_reloader_config_defaults() {
558        let config = ReloaderConfig::default();
559        assert!(config.config_path.is_none());
560        assert_eq!(config.poll_interval, Duration::from_secs(5));
561        assert_eq!(config.periodic_interval, Duration::ZERO);
562        assert_eq!(config.debounce, Duration::from_millis(500));
563        assert!(config.enable_sighup);
564    }
565
566    #[tokio::test]
567    async fn test_periodic_reload() {
568        let shared = SharedConfig::new(TestConfig {
569            value: "initial".into(),
570        });
571        let mut rx = shared.subscribe();
572
573        let call_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
574        let call_count_clone = call_count.clone();
575
576        let reloader = ConfigReloader::new(
577            ReloaderConfig {
578                periodic_interval: Duration::from_millis(50),
579                debounce: Duration::from_millis(10),
580                enable_sighup: false,
581                ..Default::default()
582            },
583            shared.clone(),
584            move || {
585                call_count_clone.fetch_add(1, Ordering::Relaxed);
586                Ok(TestConfig {
587                    value: "reloaded".into(),
588                })
589            },
590            |_| Ok(()),
591        );
592
593        let handle = reloader.start();
594
595        // Wait for at least one reload
596        let result = tokio::time::timeout(Duration::from_secs(2), rx.changed()).await;
597        assert!(result.is_ok(), "Should receive reload notification");
598
599        assert_eq!(shared.read().value, "reloaded");
600        assert!(shared.version() >= 1);
601        assert!(call_count.load(Ordering::Relaxed) >= 1);
602
603        handle.abort();
604    }
605
606    #[tokio::test]
607    async fn test_file_change_triggers_reload() {
608        let dir = TempDir::new().unwrap();
609        let config_path = dir.path().join("config.yaml");
610        fs::write(&config_path, "initial content").unwrap();
611
612        let shared = SharedConfig::new(TestConfig {
613            value: "initial".into(),
614        });
615        let mut rx = shared.subscribe();
616
617        let path_for_reload = config_path.clone();
618        let reloader = ConfigReloader::new(
619            ReloaderConfig {
620                config_path: Some(config_path.clone()),
621                poll_interval: Duration::from_millis(50),
622                debounce: Duration::from_millis(10),
623                enable_sighup: false,
624                ..Default::default()
625            },
626            shared.clone(),
627            move || {
628                let content = fs::read_to_string(&path_for_reload).unwrap_or_default();
629                Ok(TestConfig { value: content })
630            },
631            |_| Ok(()),
632        );
633
634        let handle = reloader.start();
635
636        // Let the watcher start and record initial mtime
637        tokio::time::sleep(Duration::from_millis(150)).await;
638
639        // Modify the file
640        {
641            let mut file = fs::OpenOptions::new()
642                .write(true)
643                .truncate(true)
644                .open(&config_path)
645                .unwrap();
646            file.write_all(b"updated content").unwrap();
647            file.sync_all().unwrap();
648        }
649
650        // Wait for the reload
651        let result = tokio::time::timeout(Duration::from_secs(2), rx.changed()).await;
652        if result.is_ok() {
653            assert_eq!(shared.read().value, "updated content");
654        }
655
656        handle.abort();
657    }
658
659    #[tokio::test]
660    async fn test_validation_failure_preserves_config() {
661        let shared = SharedConfig::new(TestConfig {
662            value: "good".into(),
663        });
664
665        let should_fail = Arc::new(AtomicBool::new(true));
666        let should_fail_clone = should_fail.clone();
667
668        let reloader = ConfigReloader::new(
669            ReloaderConfig {
670                periodic_interval: Duration::from_millis(50),
671                debounce: Duration::from_millis(10),
672                enable_sighup: false,
673                ..Default::default()
674            },
675            shared.clone(),
676            || {
677                Ok(TestConfig {
678                    value: "bad".into(),
679                })
680            },
681            move |_cfg| {
682                if should_fail_clone.load(Ordering::Relaxed) {
683                    Err("validation failed".into())
684                } else {
685                    Ok(())
686                }
687            },
688        );
689
690        let handle = reloader.start();
691
692        // Let a few reload attempts happen (all should fail validation)
693        tokio::time::sleep(Duration::from_millis(200)).await;
694
695        // Config should still be the original
696        assert_eq!(shared.read().value, "good");
697        assert_eq!(shared.version(), 0);
698
699        handle.abort();
700    }
701
702    #[tokio::test]
703    async fn test_reload_fn_error_preserves_config() {
704        let shared = SharedConfig::new(TestConfig {
705            value: "good".into(),
706        });
707
708        let reloader = ConfigReloader::new(
709            ReloaderConfig {
710                periodic_interval: Duration::from_millis(50),
711                debounce: Duration::from_millis(10),
712                enable_sighup: false,
713                ..Default::default()
714            },
715            shared.clone(),
716            || Err("load failed".into()),
717            |_| Ok(()),
718        );
719
720        let handle = reloader.start();
721
722        // Let a few reload attempts happen
723        tokio::time::sleep(Duration::from_millis(200)).await;
724
725        // Config should still be the original
726        assert_eq!(shared.read().value, "good");
727        assert_eq!(shared.version(), 0);
728
729        handle.abort();
730    }
731
732    #[test]
733    fn test_file_mtime() {
734        let dir = TempDir::new().unwrap();
735        let path = dir.path().join("test.txt");
736        fs::write(&path, "content").unwrap();
737
738        let mtime = file_mtime(&path);
739        assert!(mtime.is_some());
740
741        // Non-existent file
742        let mtime = file_mtime(&PathBuf::from("/nonexistent/file.txt"));
743        assert!(mtime.is_none());
744    }
745}