Skip to main content

hyperi_rustlib/config/
reloader.rs

1// Project:   hyperi-rustlib
2// File:      src/config/reloader.rs
3// Purpose:   Universal config hot-reload with SIGHUP, periodic, and file polling
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Universal configuration reloader for DFE components.
10//!
11//! `ConfigReloader<T>` provides three reload triggers, any combination of
12//! which can be enabled simultaneously:
13//!
14//! 1. **SIGHUP** (Unix only) -- standard daemon reload signal
15//! 2. **Periodic timer** -- reload every N seconds
16//! 3. **File polling** -- detect config file changes via mtime comparison
17//!
18//! The reloader calls a user-supplied `reload_fn` to load config and a
19//! `validate_fn` to validate before applying. On success it updates the
20//! `SharedConfig<T>` which notifies all subscribers.
21//!
22//! ## Usage
23//!
24//! ```rust,no_run
25//! use std::path::PathBuf;
26//! use std::time::Duration;
27//! use hyperi_rustlib::config::reloader::{ConfigReloader, ReloaderConfig};
28//! use hyperi_rustlib::config::shared::SharedConfig;
29//!
30//! #[derive(Clone, Debug, Default)]
31//! struct AppConfig {
32//!     pub workers: usize,
33//! }
34//!
35//! #[tokio::main]
36//! async fn main() {
37//!     let config = AppConfig { workers: 4 };
38//!     let shared = SharedConfig::new(config);
39//!
40//!     let reloader_config = ReloaderConfig {
41//!         config_path: Some(PathBuf::from("config.yaml")),
42//!         poll_interval: Duration::from_secs(5),
43//!         periodic_interval: Duration::ZERO,  // disabled
44//!         debounce: Duration::from_millis(500),
45//!         enable_sighup: true,
46//!     };
47//!
48//!     let reloader = ConfigReloader::new(
49//!         reloader_config,
50//!         shared.clone(),
51//!         || {
52//!             // Your config loading logic here
53//!             Ok(AppConfig { workers: 8 })
54//!         },
55//!         |cfg| {
56//!             // Your validation logic here
57//!             if cfg.workers == 0 {
58//!                 return Err("workers must be > 0".into());
59//!             }
60//!             Ok(())
61//!         },
62//!     );
63//!
64//!     let _handle = reloader.start();
65//!     // ... run your application ...
66//! }
67//! ```
68//!
69//! Replaces the per-component reload implementations in dfe-loader
70//! (`ConfigWatcher`, file polling), dfe-receiver (`config_reload_task`,
71//! SIGHUP + periodic), and dfe-archiver: set the matching `ReloaderConfig`
72//! fields and pass the component's existing load/validate functions.
73
74use std::path::PathBuf;
75use std::sync::Arc;
76use std::time::{Duration, Instant, SystemTime};
77
78use tokio::task::JoinHandle;
79use tracing::{debug, error, info, warn};
80
81use super::shared::SharedConfig;
82
83/// Boxed error type for reload/validate callbacks.
84type BoxError = Box<dyn std::error::Error + Send + Sync>;
85
86/// Configuration for the reloader.
87#[derive(Debug, Clone)]
88pub struct ReloaderConfig {
89    /// Path to config file to watch for changes (None = no file watching).
90    pub config_path: Option<PathBuf>,
91
92    /// File polling interval. Only used when `config_path` is Some.
93    /// Default: 5 seconds.
94    pub poll_interval: Duration,
95
96    /// Periodic reload interval. Set to `Duration::ZERO` to disable.
97    /// Default: disabled.
98    pub periodic_interval: Duration,
99
100    /// Minimum time between reloads (debounce).
101    /// Default: 500ms.
102    pub debounce: Duration,
103
104    /// Enable SIGHUP reload trigger (Unix only, ignored on other platforms).
105    /// Default: true.
106    pub enable_sighup: bool,
107}
108
109impl Default for ReloaderConfig {
110    fn default() -> Self {
111        Self {
112            config_path: None,
113            poll_interval: Duration::from_secs(5),
114            periodic_interval: Duration::ZERO,
115            debounce: Duration::from_millis(500),
116            enable_sighup: true,
117        }
118    }
119}
120
121/// Universal configuration reloader.
122///
123/// Supports three reload triggers (any combination):
124/// - **SIGHUP** (Unix) -- `enable_sighup: true`
125/// - **Periodic timer** -- `periodic_interval > 0`
126/// - **File polling** -- `config_path: Some(path)`
127///
128/// On each trigger, calls `reload_fn` to load new config, `validate_fn` to
129/// validate, then updates the `SharedConfig<T>` if valid.
130/// Callback invoked after a successful reload with the new config value.
131type PostReloadHook<T> = Arc<dyn Fn(&T) + Send + Sync>;
132
133pub struct ConfigReloader<T: Clone + Send + Sync + 'static> {
134    config: ReloaderConfig,
135    shared: SharedConfig<T>,
136    reload_fn: Arc<dyn Fn() -> Result<T, BoxError> + Send + Sync>,
137    validate_fn: Arc<dyn Fn(&T) -> Result<(), BoxError> + Send + Sync>,
138    post_reload_hooks: Vec<PostReloadHook<T>>,
139}
140
141impl<T: Clone + Send + Sync + 'static> ConfigReloader<T> {
142    /// Create a new config reloader.
143    ///
144    /// - `config`: Reload trigger configuration
145    /// - `shared`: Shared config to update on successful reload
146    /// - `reload_fn`: Called to load a fresh config (re-reads file + env)
147    /// - `validate_fn`: Called to validate before applying (return Err to reject)
148    pub fn new(
149        config: ReloaderConfig,
150        shared: SharedConfig<T>,
151        reload_fn: impl Fn() -> Result<T, BoxError> + Send + Sync + 'static,
152        validate_fn: impl Fn(&T) -> Result<(), BoxError> + Send + Sync + 'static,
153    ) -> Self {
154        Self {
155            config,
156            shared,
157            reload_fn: Arc::new(reload_fn),
158            validate_fn: Arc::new(validate_fn),
159            post_reload_hooks: Vec::new(),
160        }
161    }
162
163    /// Add a hook that runs after each successful reload.
164    ///
165    /// Use this to connect to the config registry:
166    ///
167    /// ```rust,no_run
168    /// # use hyperi_rustlib::config::reloader::ConfigReloader;
169    /// # use hyperi_rustlib::config::registry;
170    /// // reloader.with_registry_update("my_app");
171    /// ```
172    #[must_use]
173    pub fn with_post_reload_hook(mut self, hook: impl Fn(&T) + Send + Sync + 'static) -> Self {
174        self.post_reload_hooks.push(Arc::new(hook));
175        self
176    }
177
178    /// Connect to the config registry: after each successful reload,
179    /// call `registry::update()` so listeners are notified and the
180    /// registry reflects the new effective config.
181    ///
182    /// Requires `T: Serialize + Default`.
183    #[must_use]
184    pub fn with_registry_update(self, key: &str) -> Self
185    where
186        T: serde::Serialize + Default,
187    {
188        let key = key.to_string();
189        self.with_post_reload_hook(move |config| {
190            super::registry::update::<T>(&key, config);
191        })
192    }
193
194    /// Start the reload loop in a background task.
195    ///
196    /// Returns a `JoinHandle` that can be used to abort the reloader.
197    /// The task runs until cancelled or the process exits.
198    pub fn start(self) -> JoinHandle<()> {
199        let has_file = self.config.config_path.is_some();
200        let has_periodic = self.config.periodic_interval > Duration::ZERO;
201        let has_sighup = self.config.enable_sighup;
202
203        info!(
204            file_watch = has_file,
205            periodic = has_periodic,
206            sighup = has_sighup,
207            "Config reloader started"
208        );
209
210        tokio::spawn(async move {
211            self.run_loop().await;
212        })
213    }
214
215    /// Main reload loop -- waits for any trigger, then attempts reload.
216    async fn run_loop(self) {
217        #[cfg(feature = "shutdown")]
218        let shutdown_token = crate::shutdown::token();
219
220        // File polling state
221        let mut last_modified: Option<SystemTime> = match self.config.config_path.as_ref() {
222            Some(p) => file_mtime_async(p).await,
223            None => None,
224        };
225        let mut last_reload = Instant::now();
226
227        // Set up poll timer (for file watching)
228        let mut poll_timer = self
229            .config
230            .config_path
231            .as_ref()
232            .map(|_| tokio::time::interval(self.config.poll_interval));
233
234        // Set up periodic timer
235        let mut periodic_timer = if self.config.periodic_interval > Duration::ZERO {
236            Some(tokio::time::interval(self.config.periodic_interval))
237        } else {
238            None
239        };
240
241        // Set up SIGHUP handler
242        #[cfg(unix)]
243        let mut sighup = if self.config.enable_sighup {
244            Some(
245                tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
246                    .expect("failed to register SIGHUP handler"),
247            )
248        } else {
249            None
250        };
251
252        loop {
253            // Check for global shutdown before waiting for next trigger
254            #[cfg(feature = "shutdown")]
255            if shutdown_token.is_cancelled() {
256                info!("Config reloader stopping (shutdown)");
257                return;
258            }
259
260            let trigger_result = {
261                #[cfg(feature = "shutdown")]
262                {
263                    tokio::select! {
264                        trigger = self.wait_for_trigger(
265                            &mut poll_timer,
266                            &mut periodic_timer,
267                            #[cfg(unix)]
268                            &mut sighup,
269                            &mut last_modified,
270                        ) => Some(trigger),
271                        () = shutdown_token.cancelled() => None,
272                    }
273                }
274                #[cfg(not(feature = "shutdown"))]
275                {
276                    Some(
277                        self.wait_for_trigger(
278                            &mut poll_timer,
279                            &mut periodic_timer,
280                            #[cfg(unix)]
281                            &mut sighup,
282                            &mut last_modified,
283                        )
284                        .await,
285                    )
286                }
287            };
288
289            let Some(trigger) = trigger_result else {
290                info!("Config reloader stopping (shutdown)");
291                return;
292            };
293
294            // Debounce check
295            if last_reload.elapsed() < self.config.debounce {
296                debug!("Debouncing config reload");
297                continue;
298            }
299
300            match trigger {
301                ReloadTrigger::FileChanged => {
302                    info!(
303                        path = ?self.config.config_path,
304                        "Config file changed, reloading"
305                    );
306                }
307                ReloadTrigger::Periodic => {
308                    info!("Periodic config reload triggered");
309                }
310                ReloadTrigger::Sighup => {
311                    info!("SIGHUP received, reloading configuration");
312                }
313            }
314
315            self.do_reload();
316            last_reload = Instant::now();
317        }
318    }
319
320    /// Wait for the next reload trigger.
321    ///
322    /// Returns which trigger fired. For file polling, also updates last_modified.
323    async fn wait_for_trigger(
324        &self,
325        poll_timer: &mut Option<tokio::time::Interval>,
326        periodic_timer: &mut Option<tokio::time::Interval>,
327        #[cfg(unix)] sighup: &mut Option<tokio::signal::unix::Signal>,
328        last_modified: &mut Option<SystemTime>,
329    ) -> ReloadTrigger {
330        loop {
331            let trigger = self
332                .select_trigger(
333                    poll_timer,
334                    periodic_timer,
335                    #[cfg(unix)]
336                    sighup,
337                )
338                .await;
339
340            match trigger {
341                ReloadTrigger::FileChanged => {
342                    // Check if file actually changed (mtime comparison)
343                    if let Some(ref path) = self.config.config_path {
344                        let current_mtime = file_mtime_async(path).await;
345                        let changed = match (&*last_modified, &current_mtime) {
346                            (Some(last), Some(current)) => current > last,
347                            (None, Some(_)) => true,
348                            _ => false,
349                        };
350                        if changed {
351                            *last_modified = current_mtime;
352                            return ReloadTrigger::FileChanged;
353                        }
354                    }
355                    // No actual change, loop back
356                }
357                other => return other,
358            }
359        }
360    }
361
362    /// Select on all enabled triggers, returning which one fired first.
363    #[cfg(unix)]
364    async fn select_trigger(
365        &self,
366        poll_timer: &mut Option<tokio::time::Interval>,
367        periodic_timer: &mut Option<tokio::time::Interval>,
368        sighup: &mut Option<tokio::signal::unix::Signal>,
369    ) -> ReloadTrigger {
370        tokio::select! {
371            _ = async {
372                match poll_timer.as_mut() {
373                    Some(timer) => timer.tick().await,
374                    None => std::future::pending().await,
375                }
376            } => ReloadTrigger::FileChanged,
377
378            _ = async {
379                match periodic_timer.as_mut() {
380                    Some(timer) => timer.tick().await,
381                    None => std::future::pending().await,
382                }
383            } => ReloadTrigger::Periodic,
384
385            () = async {
386                match sighup.as_mut() {
387                    Some(sig) => { sig.recv().await; },
388                    None => std::future::pending::<()>().await,
389                }
390            } => ReloadTrigger::Sighup,
391        }
392    }
393
394    /// Select on all enabled triggers (non-Unix: no SIGHUP).
395    #[cfg(not(unix))]
396    async fn select_trigger(
397        &self,
398        poll_timer: &mut Option<tokio::time::Interval>,
399        periodic_timer: &mut Option<tokio::time::Interval>,
400    ) -> ReloadTrigger {
401        tokio::select! {
402            _ = async {
403                match poll_timer.as_mut() {
404                    Some(timer) => timer.tick().await,
405                    None => std::future::pending().await,
406                }
407            } => ReloadTrigger::FileChanged,
408
409            _ = async {
410                match periodic_timer.as_mut() {
411                    Some(timer) => timer.tick().await,
412                    None => std::future::pending().await,
413                }
414            } => ReloadTrigger::Periodic,
415        }
416    }
417
418    /// Attempt to reload config: load -> validate -> update shared.
419    //
420    // NOTE (metrics audit): `config_reloads_total` carries no `{ns}` prefix and
421    // can collide with an app-level `AppMetrics` series of the same name on the
422    // shared registry. Left unchanged (low priority -- the `result` label keeps
423    // it usable); a future pass should reconcile to one canonical series.
424    fn do_reload(&self) {
425        match (self.reload_fn)() {
426            Ok(new_config) => {
427                if let Err(e) = (self.validate_fn)(&new_config) {
428                    error!(error = %e, "Config reload validation failed, keeping current config");
429                    #[cfg(feature = "metrics")]
430                    metrics::counter!("config_reloads_total", "result" => "error").increment(1);
431                    return;
432                }
433
434                let old_version = self.shared.version();
435                self.shared.update(new_config.clone());
436                let new_version = self.shared.version();
437
438                // Run post-reload hooks (registry update, etc.)
439                for hook in &self.post_reload_hooks {
440                    hook(&new_config);
441                }
442
443                #[cfg(feature = "metrics")]
444                metrics::counter!("config_reloads_total", "result" => "success").increment(1);
445
446                info!(
447                    old_version = old_version,
448                    new_version = new_version,
449                    "Configuration reloaded successfully"
450                );
451            }
452            Err(e) => {
453                warn!(error = %e, "Config reload failed, keeping current config");
454                #[cfg(feature = "metrics")]
455                metrics::counter!("config_reloads_total", "result" => "error").increment(1);
456            }
457        }
458    }
459}
460
461/// Which trigger caused a reload.
462#[derive(Debug, Clone, Copy, PartialEq, Eq)]
463enum ReloadTrigger {
464    FileChanged,
465    Periodic,
466    #[allow(dead_code)]
467    Sighup,
468}
469
470/// Get the modification time of a file. Used inside `run_loop` so the
471/// periodic poll doesn't block the runtime thread.
472async fn file_mtime_async(path: &PathBuf) -> Option<SystemTime> {
473    tokio::fs::metadata(path)
474        .await
475        .ok()
476        .and_then(|m| m.modified().ok())
477}
478
479/// Sync mtime helper -- used only by the sync-context test below.
480#[cfg(test)]
481fn file_mtime(path: &PathBuf) -> Option<SystemTime> {
482    std::fs::metadata(path).ok().and_then(|m| m.modified().ok())
483}
484
485#[cfg(test)]
486mod tests {
487    use super::*;
488    use std::fs;
489    use std::io::Write;
490    use std::sync::atomic::{AtomicBool, Ordering};
491    use tempfile::TempDir;
492
493    #[derive(Clone, Debug, Default, PartialEq)]
494    struct TestConfig {
495        pub value: String,
496    }
497
498    #[test]
499    fn test_reloader_config_defaults() {
500        let config = ReloaderConfig::default();
501        assert!(config.config_path.is_none());
502        assert_eq!(config.poll_interval, Duration::from_secs(5));
503        assert_eq!(config.periodic_interval, Duration::ZERO);
504        assert_eq!(config.debounce, Duration::from_millis(500));
505        assert!(config.enable_sighup);
506    }
507
508    #[tokio::test]
509    async fn test_periodic_reload() {
510        let shared = SharedConfig::new(TestConfig {
511            value: "initial".into(),
512        });
513        let mut rx = shared.subscribe();
514
515        let call_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
516        let call_count_clone = call_count.clone();
517
518        let reloader = ConfigReloader::new(
519            ReloaderConfig {
520                periodic_interval: Duration::from_millis(50),
521                debounce: Duration::from_millis(10),
522                enable_sighup: false,
523                ..Default::default()
524            },
525            shared.clone(),
526            move || {
527                call_count_clone.fetch_add(1, Ordering::Relaxed);
528                Ok(TestConfig {
529                    value: "reloaded".into(),
530                })
531            },
532            |_| Ok(()),
533        );
534
535        let handle = reloader.start();
536
537        // Wait for at least one reload
538        let result = tokio::time::timeout(Duration::from_secs(2), rx.changed()).await;
539        assert!(result.is_ok(), "Should receive reload notification");
540
541        assert_eq!(shared.read().value, "reloaded");
542        assert!(shared.version() >= 1);
543        assert!(call_count.load(Ordering::Relaxed) >= 1);
544
545        handle.abort();
546    }
547
548    #[tokio::test]
549    async fn test_file_change_triggers_reload() {
550        let dir = TempDir::new().unwrap();
551        let config_path = dir.path().join("config.yaml");
552        fs::write(&config_path, "initial content").unwrap();
553
554        let shared = SharedConfig::new(TestConfig {
555            value: "initial".into(),
556        });
557        let mut rx = shared.subscribe();
558
559        let path_for_reload = config_path.clone();
560        let reloader = ConfigReloader::new(
561            ReloaderConfig {
562                config_path: Some(config_path.clone()),
563                poll_interval: Duration::from_millis(50),
564                debounce: Duration::from_millis(10),
565                enable_sighup: false,
566                ..Default::default()
567            },
568            shared.clone(),
569            move || {
570                let content = fs::read_to_string(&path_for_reload).unwrap_or_default();
571                Ok(TestConfig { value: content })
572            },
573            |_| Ok(()),
574        );
575
576        let handle = reloader.start();
577
578        // Let the watcher start and record initial mtime
579        tokio::time::sleep(Duration::from_millis(150)).await;
580
581        // Modify the file
582        {
583            let mut file = fs::OpenOptions::new()
584                .write(true)
585                .truncate(true)
586                .open(&config_path)
587                .unwrap();
588            file.write_all(b"updated content").unwrap();
589            file.sync_all().unwrap();
590        }
591
592        // Wait for the reload
593        let result = tokio::time::timeout(Duration::from_secs(2), rx.changed()).await;
594        if result.is_ok() {
595            assert_eq!(shared.read().value, "updated content");
596        }
597
598        handle.abort();
599    }
600
601    #[tokio::test]
602    async fn test_validation_failure_preserves_config() {
603        let shared = SharedConfig::new(TestConfig {
604            value: "good".into(),
605        });
606
607        let should_fail = Arc::new(AtomicBool::new(true));
608        let should_fail_clone = should_fail.clone();
609
610        let reloader = ConfigReloader::new(
611            ReloaderConfig {
612                periodic_interval: Duration::from_millis(50),
613                debounce: Duration::from_millis(10),
614                enable_sighup: false,
615                ..Default::default()
616            },
617            shared.clone(),
618            || {
619                Ok(TestConfig {
620                    value: "bad".into(),
621                })
622            },
623            move |_cfg| {
624                if should_fail_clone.load(Ordering::Relaxed) {
625                    Err("validation failed".into())
626                } else {
627                    Ok(())
628                }
629            },
630        );
631
632        let handle = reloader.start();
633
634        // Let a few reload attempts happen (all should fail validation)
635        tokio::time::sleep(Duration::from_millis(200)).await;
636
637        // Config should still be the original
638        assert_eq!(shared.read().value, "good");
639        assert_eq!(shared.version(), 0);
640
641        handle.abort();
642    }
643
644    #[tokio::test]
645    async fn test_reload_fn_error_preserves_config() {
646        let shared = SharedConfig::new(TestConfig {
647            value: "good".into(),
648        });
649
650        let reloader = ConfigReloader::new(
651            ReloaderConfig {
652                periodic_interval: Duration::from_millis(50),
653                debounce: Duration::from_millis(10),
654                enable_sighup: false,
655                ..Default::default()
656            },
657            shared.clone(),
658            || Err("load failed".into()),
659            |_| Ok(()),
660        );
661
662        let handle = reloader.start();
663
664        // Let a few reload attempts happen
665        tokio::time::sleep(Duration::from_millis(200)).await;
666
667        // Config should still be the original
668        assert_eq!(shared.read().value, "good");
669        assert_eq!(shared.version(), 0);
670
671        handle.abort();
672    }
673
674    #[test]
675    fn test_file_mtime() {
676        let dir = TempDir::new().unwrap();
677        let path = dir.path().join("test.txt");
678        fs::write(&path, "content").unwrap();
679
680        let mtime = file_mtime(&path);
681        assert!(mtime.is_some());
682
683        // Non-existent file
684        let mtime = file_mtime(&PathBuf::from("/nonexistent/file.txt"));
685        assert!(mtime.is_none());
686    }
687}