Skip to main content

hyperi_rustlib/config/
reloader.rs

1// Project:   hyperi-rustlib
2// File:      src/config/reloader.rs
3// Purpose:   Universal config hot-reload with SIGHUP, periodic, and file polling
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Universal configuration reloader for DFE components.
10//!
11//! `ConfigReloader<T>` provides three reload triggers, any combination of
12//! which can be enabled simultaneously:
13//!
14//! 1. **SIGHUP** (Unix only) -- standard daemon reload signal
15//! 2. **Periodic timer** -- reload every N seconds
16//! 3. **File polling** -- detect config file changes via mtime comparison
17//!
18//! The reloader calls a user-supplied `reload_fn` to load config and a
19//! `validate_fn` to validate before applying. On success it updates the
20//! `SharedConfig<T>` which notifies all subscribers.
21//!
22//! ## Usage
23//!
24//! ```rust,no_run
25//! use std::path::PathBuf;
26//! use std::time::Duration;
27//! use hyperi_rustlib::config::reloader::{ConfigReloader, ReloaderConfig};
28//! use hyperi_rustlib::config::shared::SharedConfig;
29//!
30//! #[derive(Clone, Debug, Default)]
31//! struct AppConfig {
32//!     pub workers: usize,
33//! }
34//!
35//! #[tokio::main]
36//! async fn main() {
37//!     let config = AppConfig { workers: 4 };
38//!     let shared = SharedConfig::new(config);
39//!
40//!     let reloader_config = ReloaderConfig {
41//!         config_path: Some(PathBuf::from("config.yaml")),
42//!         poll_interval: Duration::from_secs(5),
43//!         periodic_interval: Duration::ZERO,  // disabled
44//!         debounce: Duration::from_millis(500),
45//!         enable_sighup: true,
46//!     };
47//!
48//!     let reloader = ConfigReloader::new(
49//!         reloader_config,
50//!         shared.clone(),
51//!         || {
52//!             // Your config loading logic here
53//!             Ok(AppConfig { workers: 8 })
54//!         },
55//!         |cfg| {
56//!             // Your validation logic here
57//!             if cfg.workers == 0 {
58//!                 return Err("workers must be > 0".into());
59//!             }
60//!             Ok(())
61//!         },
62//!     );
63//!
64//!     let _handle = reloader.start();
65//!     // ... run your application ...
66//! }
67//! ```
68//!
69//! Replaces the per-component reload implementations in dfe-loader
70//! (`ConfigWatcher`, file polling), dfe-receiver (`config_reload_task`,
71//! SIGHUP + periodic), and dfe-archiver: set the matching `ReloaderConfig`
72//! fields and pass the component's existing load/validate functions.
73
74use std::path::PathBuf;
75use std::sync::Arc;
76use std::time::{Duration, Instant, SystemTime};
77
78use tokio::task::JoinHandle;
79use tracing::{debug, error, info, warn};
80
81use super::shared::SharedConfig;
82
83/// Boxed error type for reload/validate callbacks.
84type BoxError = Box<dyn std::error::Error + Send + Sync>;
85
86/// Configuration for the reloader.
87#[derive(Debug, Clone)]
88pub struct ReloaderConfig {
89    /// Path to config file to watch for changes (None = no file watching).
90    pub config_path: Option<PathBuf>,
91
92    /// File polling interval. Only used when `config_path` is Some.
93    /// Default: 5 seconds.
94    pub poll_interval: Duration,
95
96    /// Periodic reload interval. Set to `Duration::ZERO` to disable.
97    /// Default: disabled.
98    pub periodic_interval: Duration,
99
100    /// Minimum time between reloads (debounce).
101    /// Default: 500ms.
102    pub debounce: Duration,
103
104    /// Enable SIGHUP reload trigger (Unix only, ignored on other platforms).
105    /// Default: true.
106    pub enable_sighup: bool,
107}
108
109impl Default for ReloaderConfig {
110    fn default() -> Self {
111        Self {
112            config_path: None,
113            poll_interval: Duration::from_secs(5),
114            periodic_interval: Duration::ZERO,
115            debounce: Duration::from_millis(500),
116            enable_sighup: true,
117        }
118    }
119}
120
121/// Universal configuration reloader.
122///
123/// Supports three reload triggers (any combination):
124/// - **SIGHUP** (Unix) -- `enable_sighup: true`
125/// - **Periodic timer** -- `periodic_interval > 0`
126/// - **File polling** -- `config_path: Some(path)`
127///
128/// On each trigger, calls `reload_fn` to load new config, `validate_fn` to
129/// validate, then updates the `SharedConfig<T>` if valid.
130/// Callback invoked after a successful reload with the new config value.
131type PostReloadHook<T> = Arc<dyn Fn(&T) + Send + Sync>;
132
133pub struct ConfigReloader<T: Clone + Send + Sync + 'static> {
134    config: ReloaderConfig,
135    shared: SharedConfig<T>,
136    reload_fn: Arc<dyn Fn() -> Result<T, BoxError> + Send + Sync>,
137    validate_fn: Arc<dyn Fn(&T) -> Result<(), BoxError> + Send + Sync>,
138    post_reload_hooks: Vec<PostReloadHook<T>>,
139}
140
141impl<T: Clone + Send + Sync + 'static> ConfigReloader<T> {
142    /// Create a new config reloader.
143    ///
144    /// - `config`: Reload trigger configuration
145    /// - `shared`: Shared config to update on successful reload
146    /// - `reload_fn`: Called to load a fresh config (re-reads file + env)
147    /// - `validate_fn`: Called to validate before applying (return Err to reject)
148    pub fn new(
149        config: ReloaderConfig,
150        shared: SharedConfig<T>,
151        reload_fn: impl Fn() -> Result<T, BoxError> + Send + Sync + 'static,
152        validate_fn: impl Fn(&T) -> Result<(), BoxError> + Send + Sync + 'static,
153    ) -> Self {
154        Self {
155            config,
156            shared,
157            reload_fn: Arc::new(reload_fn),
158            validate_fn: Arc::new(validate_fn),
159            post_reload_hooks: Vec::new(),
160        }
161    }
162
163    /// Add a hook that runs after each successful reload.
164    ///
165    /// Use this to connect to the config registry:
166    ///
167    /// ```rust,no_run
168    /// # use hyperi_rustlib::config::reloader::ConfigReloader;
169    /// # use hyperi_rustlib::config::registry;
170    /// // reloader.with_registry_update("my_app");
171    /// ```
172    #[must_use]
173    pub fn with_post_reload_hook(mut self, hook: impl Fn(&T) + Send + Sync + 'static) -> Self {
174        self.post_reload_hooks.push(Arc::new(hook));
175        self
176    }
177
178    /// Connect to the config registry: after each successful reload,
179    /// call `registry::update()` so listeners are notified and the
180    /// registry reflects the new effective config.
181    ///
182    /// Requires `T: Serialize + Default`.
183    #[must_use]
184    pub fn with_registry_update(self, key: &str) -> Self
185    where
186        T: serde::Serialize + Default,
187    {
188        let key = key.to_string();
189        self.with_post_reload_hook(move |config| {
190            super::registry::update::<T>(&key, config);
191        })
192    }
193
194    /// Start the reload loop in a background task.
195    ///
196    /// Returns a `JoinHandle` that can be used to abort the reloader.
197    /// The task runs until cancelled or the process exits.
198    pub fn start(self) -> JoinHandle<()> {
199        let has_file = self.config.config_path.is_some();
200        let has_periodic = self.config.periodic_interval > Duration::ZERO;
201        let has_sighup = self.config.enable_sighup;
202
203        info!(
204            file_watch = has_file,
205            periodic = has_periodic,
206            sighup = has_sighup,
207            "Config reloader started"
208        );
209
210        tokio::spawn(async move {
211            self.run_loop().await;
212        })
213    }
214
215    /// Main reload loop -- waits for any trigger, then attempts reload.
216    async fn run_loop(self) {
217        #[cfg(feature = "shutdown")]
218        let shutdown_token = crate::shutdown::token();
219
220        // File polling state
221        let mut last_modified: Option<SystemTime> = match self.config.config_path.as_ref() {
222            Some(p) => file_mtime_async(p).await,
223            None => None,
224        };
225        let mut last_reload = Instant::now();
226
227        // Set up poll timer (for file watching)
228        let mut poll_timer = self
229            .config
230            .config_path
231            .as_ref()
232            .map(|_| tokio::time::interval(self.config.poll_interval));
233
234        // Set up periodic timer
235        let mut periodic_timer = if self.config.periodic_interval > Duration::ZERO {
236            Some(tokio::time::interval(self.config.periodic_interval))
237        } else {
238            None
239        };
240
241        // Set up SIGHUP handler
242        #[cfg(unix)]
243        let mut sighup = if self.config.enable_sighup {
244            Some(
245                tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
246                    .expect("failed to register SIGHUP handler"),
247            )
248        } else {
249            None
250        };
251
252        loop {
253            // Check for global shutdown before waiting for next trigger
254            #[cfg(feature = "shutdown")]
255            if shutdown_token.is_cancelled() {
256                info!("Config reloader stopping (shutdown)");
257                return;
258            }
259
260            let trigger_result = {
261                #[cfg(feature = "shutdown")]
262                {
263                    tokio::select! {
264                        trigger = self.wait_for_trigger(
265                            &mut poll_timer,
266                            &mut periodic_timer,
267                            #[cfg(unix)]
268                            &mut sighup,
269                            &mut last_modified,
270                        ) => Some(trigger),
271                        () = shutdown_token.cancelled() => None,
272                    }
273                }
274                #[cfg(not(feature = "shutdown"))]
275                {
276                    Some(
277                        self.wait_for_trigger(
278                            &mut poll_timer,
279                            &mut periodic_timer,
280                            #[cfg(unix)]
281                            &mut sighup,
282                            &mut last_modified,
283                        )
284                        .await,
285                    )
286                }
287            };
288
289            let Some(trigger) = trigger_result else {
290                info!("Config reloader stopping (shutdown)");
291                return;
292            };
293
294            // Debounce check
295            if last_reload.elapsed() < self.config.debounce {
296                debug!("Debouncing config reload");
297                continue;
298            }
299
300            match trigger {
301                ReloadTrigger::FileChanged => {
302                    info!(
303                        path = ?self.config.config_path,
304                        "Config file changed, reloading"
305                    );
306                }
307                ReloadTrigger::Periodic => {
308                    info!("Periodic config reload triggered");
309                }
310                ReloadTrigger::Sighup => {
311                    info!("SIGHUP received, reloading configuration");
312                }
313            }
314
315            self.do_reload();
316            last_reload = Instant::now();
317        }
318    }
319
320    /// Wait for the next reload trigger.
321    ///
322    /// Returns which trigger fired. For file polling, also updates last_modified.
323    async fn wait_for_trigger(
324        &self,
325        poll_timer: &mut Option<tokio::time::Interval>,
326        periodic_timer: &mut Option<tokio::time::Interval>,
327        #[cfg(unix)] sighup: &mut Option<tokio::signal::unix::Signal>,
328        last_modified: &mut Option<SystemTime>,
329    ) -> ReloadTrigger {
330        loop {
331            let trigger = self
332                .select_trigger(
333                    poll_timer,
334                    periodic_timer,
335                    #[cfg(unix)]
336                    sighup,
337                )
338                .await;
339
340            match trigger {
341                ReloadTrigger::FileChanged => {
342                    // Check if file actually changed (mtime comparison)
343                    if let Some(ref path) = self.config.config_path {
344                        let current_mtime = file_mtime_async(path).await;
345                        let changed = match (&*last_modified, &current_mtime) {
346                            (Some(last), Some(current)) => current > last,
347                            (None, Some(_)) => true,
348                            _ => false,
349                        };
350                        if changed {
351                            *last_modified = current_mtime;
352                            return ReloadTrigger::FileChanged;
353                        }
354                    }
355                    // No actual change, loop back
356                }
357                other => return other,
358            }
359        }
360    }
361
362    /// Select on all enabled triggers, returning which one fired first.
363    #[cfg(unix)]
364    async fn select_trigger(
365        &self,
366        poll_timer: &mut Option<tokio::time::Interval>,
367        periodic_timer: &mut Option<tokio::time::Interval>,
368        sighup: &mut Option<tokio::signal::unix::Signal>,
369    ) -> ReloadTrigger {
370        tokio::select! {
371            _ = async {
372                match poll_timer.as_mut() {
373                    Some(timer) => timer.tick().await,
374                    None => std::future::pending().await,
375                }
376            } => ReloadTrigger::FileChanged,
377
378            _ = async {
379                match periodic_timer.as_mut() {
380                    Some(timer) => timer.tick().await,
381                    None => std::future::pending().await,
382                }
383            } => ReloadTrigger::Periodic,
384
385            () = async {
386                match sighup.as_mut() {
387                    Some(sig) => { sig.recv().await; },
388                    None => std::future::pending::<()>().await,
389                }
390            } => ReloadTrigger::Sighup,
391        }
392    }
393
394    /// Select on all enabled triggers (non-Unix: no SIGHUP).
395    #[cfg(not(unix))]
396    async fn select_trigger(
397        &self,
398        poll_timer: &mut Option<tokio::time::Interval>,
399        periodic_timer: &mut Option<tokio::time::Interval>,
400    ) -> ReloadTrigger {
401        tokio::select! {
402            _ = async {
403                match poll_timer.as_mut() {
404                    Some(timer) => timer.tick().await,
405                    None => std::future::pending().await,
406                }
407            } => ReloadTrigger::FileChanged,
408
409            _ = async {
410                match periodic_timer.as_mut() {
411                    Some(timer) => timer.tick().await,
412                    None => std::future::pending().await,
413                }
414            } => ReloadTrigger::Periodic,
415        }
416    }
417
418    /// Attempt to reload config: load -> validate -> update shared.
419    fn do_reload(&self) {
420        match (self.reload_fn)() {
421            Ok(new_config) => {
422                if let Err(e) = (self.validate_fn)(&new_config) {
423                    error!(error = %e, "Config reload validation failed, keeping current config");
424                    #[cfg(feature = "metrics")]
425                    metrics::counter!("config_reloads_total", "result" => "error").increment(1);
426                    return;
427                }
428
429                let old_version = self.shared.version();
430                self.shared.update(new_config.clone());
431                let new_version = self.shared.version();
432
433                // Run post-reload hooks (registry update, etc.)
434                for hook in &self.post_reload_hooks {
435                    hook(&new_config);
436                }
437
438                #[cfg(feature = "metrics")]
439                metrics::counter!("config_reloads_total", "result" => "success").increment(1);
440
441                info!(
442                    old_version = old_version,
443                    new_version = new_version,
444                    "Configuration reloaded successfully"
445                );
446            }
447            Err(e) => {
448                warn!(error = %e, "Config reload failed, keeping current config");
449                #[cfg(feature = "metrics")]
450                metrics::counter!("config_reloads_total", "result" => "error").increment(1);
451            }
452        }
453    }
454}
455
456/// Which trigger caused a reload.
457#[derive(Debug, Clone, Copy, PartialEq, Eq)]
458enum ReloadTrigger {
459    FileChanged,
460    Periodic,
461    #[allow(dead_code)]
462    Sighup,
463}
464
465/// Get the modification time of a file. Used inside `run_loop` so the
466/// periodic poll doesn't block the runtime thread.
467async fn file_mtime_async(path: &PathBuf) -> Option<SystemTime> {
468    tokio::fs::metadata(path)
469        .await
470        .ok()
471        .and_then(|m| m.modified().ok())
472}
473
474/// Sync mtime helper -- used only by the sync-context test below.
475#[cfg(test)]
476fn file_mtime(path: &PathBuf) -> Option<SystemTime> {
477    std::fs::metadata(path).ok().and_then(|m| m.modified().ok())
478}
479
480#[cfg(test)]
481mod tests {
482    use super::*;
483    use std::fs;
484    use std::io::Write;
485    use std::sync::atomic::{AtomicBool, Ordering};
486    use tempfile::TempDir;
487
488    #[derive(Clone, Debug, Default, PartialEq)]
489    struct TestConfig {
490        pub value: String,
491    }
492
493    #[test]
494    fn test_reloader_config_defaults() {
495        let config = ReloaderConfig::default();
496        assert!(config.config_path.is_none());
497        assert_eq!(config.poll_interval, Duration::from_secs(5));
498        assert_eq!(config.periodic_interval, Duration::ZERO);
499        assert_eq!(config.debounce, Duration::from_millis(500));
500        assert!(config.enable_sighup);
501    }
502
503    #[tokio::test]
504    async fn test_periodic_reload() {
505        let shared = SharedConfig::new(TestConfig {
506            value: "initial".into(),
507        });
508        let mut rx = shared.subscribe();
509
510        let call_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
511        let call_count_clone = call_count.clone();
512
513        let reloader = ConfigReloader::new(
514            ReloaderConfig {
515                periodic_interval: Duration::from_millis(50),
516                debounce: Duration::from_millis(10),
517                enable_sighup: false,
518                ..Default::default()
519            },
520            shared.clone(),
521            move || {
522                call_count_clone.fetch_add(1, Ordering::Relaxed);
523                Ok(TestConfig {
524                    value: "reloaded".into(),
525                })
526            },
527            |_| Ok(()),
528        );
529
530        let handle = reloader.start();
531
532        // Wait for at least one reload
533        let result = tokio::time::timeout(Duration::from_secs(2), rx.changed()).await;
534        assert!(result.is_ok(), "Should receive reload notification");
535
536        assert_eq!(shared.read().value, "reloaded");
537        assert!(shared.version() >= 1);
538        assert!(call_count.load(Ordering::Relaxed) >= 1);
539
540        handle.abort();
541    }
542
543    #[tokio::test]
544    async fn test_file_change_triggers_reload() {
545        let dir = TempDir::new().unwrap();
546        let config_path = dir.path().join("config.yaml");
547        fs::write(&config_path, "initial content").unwrap();
548
549        let shared = SharedConfig::new(TestConfig {
550            value: "initial".into(),
551        });
552        let mut rx = shared.subscribe();
553
554        let path_for_reload = config_path.clone();
555        let reloader = ConfigReloader::new(
556            ReloaderConfig {
557                config_path: Some(config_path.clone()),
558                poll_interval: Duration::from_millis(50),
559                debounce: Duration::from_millis(10),
560                enable_sighup: false,
561                ..Default::default()
562            },
563            shared.clone(),
564            move || {
565                let content = fs::read_to_string(&path_for_reload).unwrap_or_default();
566                Ok(TestConfig { value: content })
567            },
568            |_| Ok(()),
569        );
570
571        let handle = reloader.start();
572
573        // Let the watcher start and record initial mtime
574        tokio::time::sleep(Duration::from_millis(150)).await;
575
576        // Modify the file
577        {
578            let mut file = fs::OpenOptions::new()
579                .write(true)
580                .truncate(true)
581                .open(&config_path)
582                .unwrap();
583            file.write_all(b"updated content").unwrap();
584            file.sync_all().unwrap();
585        }
586
587        // Wait for the reload
588        let result = tokio::time::timeout(Duration::from_secs(2), rx.changed()).await;
589        if result.is_ok() {
590            assert_eq!(shared.read().value, "updated content");
591        }
592
593        handle.abort();
594    }
595
596    #[tokio::test]
597    async fn test_validation_failure_preserves_config() {
598        let shared = SharedConfig::new(TestConfig {
599            value: "good".into(),
600        });
601
602        let should_fail = Arc::new(AtomicBool::new(true));
603        let should_fail_clone = should_fail.clone();
604
605        let reloader = ConfigReloader::new(
606            ReloaderConfig {
607                periodic_interval: Duration::from_millis(50),
608                debounce: Duration::from_millis(10),
609                enable_sighup: false,
610                ..Default::default()
611            },
612            shared.clone(),
613            || {
614                Ok(TestConfig {
615                    value: "bad".into(),
616                })
617            },
618            move |_cfg| {
619                if should_fail_clone.load(Ordering::Relaxed) {
620                    Err("validation failed".into())
621                } else {
622                    Ok(())
623                }
624            },
625        );
626
627        let handle = reloader.start();
628
629        // Let a few reload attempts happen (all should fail validation)
630        tokio::time::sleep(Duration::from_millis(200)).await;
631
632        // Config should still be the original
633        assert_eq!(shared.read().value, "good");
634        assert_eq!(shared.version(), 0);
635
636        handle.abort();
637    }
638
639    #[tokio::test]
640    async fn test_reload_fn_error_preserves_config() {
641        let shared = SharedConfig::new(TestConfig {
642            value: "good".into(),
643        });
644
645        let reloader = ConfigReloader::new(
646            ReloaderConfig {
647                periodic_interval: Duration::from_millis(50),
648                debounce: Duration::from_millis(10),
649                enable_sighup: false,
650                ..Default::default()
651            },
652            shared.clone(),
653            || Err("load failed".into()),
654            |_| Ok(()),
655        );
656
657        let handle = reloader.start();
658
659        // Let a few reload attempts happen
660        tokio::time::sleep(Duration::from_millis(200)).await;
661
662        // Config should still be the original
663        assert_eq!(shared.read().value, "good");
664        assert_eq!(shared.version(), 0);
665
666        handle.abort();
667    }
668
669    #[test]
670    fn test_file_mtime() {
671        let dir = TempDir::new().unwrap();
672        let path = dir.path().join("test.txt");
673        fs::write(&path, "content").unwrap();
674
675        let mtime = file_mtime(&path);
676        assert!(mtime.is_some());
677
678        // Non-existent file
679        let mtime = file_mtime(&PathBuf::from("/nonexistent/file.txt"));
680        assert!(mtime.is_none());
681    }
682}