Skip to main content

obs_core/
config_watcher.rs

1//! `notify`-driven file watcher for `obs.yaml` reload. Spec 15 § 5.3
2//! / spec 93 P0-9.
3//!
4//! [`ConfigWatcher::spawn`] launches a background OS-level watcher
5//! that debounces file change events and invokes a user-supplied
6//! callback with the freshly-reloaded [`EventsConfig`]. The callback
7//! is the integration point — typically it calls
8//! `Observer::reload_config` plus emits an `ObsConfigReloaded` /
9//! `ObsConfigReloadFailed` self-event so operators can confirm the
10//! reload landed.
11//!
12//! The watcher debounces with a 200 ms window so a save that triggers
13//! `Modify(Data) + Modify(Metadata)` only fires the callback once.
14//! Drop the [`ConfigWatcher`] handle to stop watching.
15
16#![allow(clippy::disallowed_methods, clippy::disallowed_types)]
17
18use std::{
19    path::{Path, PathBuf},
20    sync::{Arc, mpsc::channel},
21    thread,
22    time::{Duration, Instant},
23};
24
25use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
26use parking_lot::Mutex;
27
28use crate::config::{ConfigError, EventsConfig};
29
30/// Active file watcher. Drop to stop.
31pub struct ConfigWatcher {
32    _watcher: RecommendedWatcher,
33    _join: Arc<Mutex<Option<thread::JoinHandle<()>>>>,
34}
35
36impl std::fmt::Debug for ConfigWatcher {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        f.debug_struct("ConfigWatcher").finish()
39    }
40}
41
42/// Default debounce window between consecutive change events.
43pub const DEFAULT_DEBOUNCE: Duration = Duration::from_millis(200);
44
45impl ConfigWatcher {
46    /// Watch `path` for changes; on each debounced event, reload the
47    /// file and invoke `on_change` with the parsed result. Errors from
48    /// the loader are passed to `on_change` as `Err` so callers can
49    /// emit `ObsConfigReloadFailed` self-events without their own
50    /// retry loop.
51    ///
52    /// # Errors
53    ///
54    /// Returns the underlying notify error if the watcher cannot be
55    /// installed (e.g. path does not exist, EMFILE, permission).
56    pub fn spawn<F>(path: impl AsRef<Path>, on_change: F) -> Result<Self, notify::Error>
57    where
58        F: Fn(Result<EventsConfig, ConfigError>) + Send + 'static,
59    {
60        let path: PathBuf = path.as_ref().to_path_buf();
61        let (tx, rx) = channel::<notify::Result<Event>>();
62
63        let mut watcher: RecommendedWatcher = notify::recommended_watcher(move |res| {
64            let _ = tx.send(res);
65        })?;
66        let watch_dir = path.parent().map(Path::to_path_buf).unwrap_or(path.clone());
67        watcher.watch(&watch_dir, RecursiveMode::NonRecursive)?;
68
69        let watch_path = path.clone();
70        let join = thread::Builder::new()
71            .name("obs-config-watcher".to_string())
72            .spawn(move || {
73                let mut last_fire: Option<Instant> = None;
74                while let Ok(event) = rx.recv() {
75                    let Ok(ev) = event else { continue };
76                    if !is_relevant(&ev, &watch_path) {
77                        continue;
78                    }
79                    if let Some(prev) = last_fire
80                        && prev.elapsed() < DEFAULT_DEBOUNCE
81                    {
82                        continue;
83                    }
84                    last_fire = Some(Instant::now());
85                    let cfg = EventsConfig::from_yaml_path(&watch_path);
86                    on_change(cfg);
87                }
88            })
89            .map_err(|e| notify::Error::generic(&format!("spawn watcher thread: {e}")))?;
90
91        Ok(Self {
92            _watcher: watcher,
93            _join: Arc::new(Mutex::new(Some(join))),
94        })
95    }
96}
97
98fn is_relevant(ev: &Event, target: &Path) -> bool {
99    if !matches!(
100        ev.kind,
101        EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
102    ) {
103        return false;
104    }
105    // macOS FSEvents reports paths via `/private/tmp/...` while the
106    // test passes `/tmp/...`; ext4 inotify reports the full canonical
107    // path; Windows reports backslash-separated. Match on file name +
108    // canonicalized parent so any of these resolves correctly.
109    let target_name = target.file_name();
110    let target_parent = target.parent().and_then(|p| p.canonicalize().ok());
111    ev.paths.iter().any(|p| {
112        if p == target {
113            return true;
114        }
115        if p.file_name() != target_name {
116            return false;
117        }
118        let p_parent = p.parent().and_then(|q| q.canonicalize().ok());
119        match (p_parent, target_parent.as_ref()) {
120            (Some(a), Some(b)) => &a == b,
121            _ => false,
122        }
123    })
124}
125
126#[cfg(test)]
127mod tests {
128    use std::{
129        io::Write,
130        sync::atomic::{AtomicUsize, Ordering},
131        time::Duration,
132    };
133
134    use tempfile::tempdir;
135
136    use super::*;
137
138    #[test]
139    fn test_should_reload_on_file_change() {
140        // FSEvents (default macOS backend) coalesces events but the
141        // initial subscribe can take a few hundred ms to land. Give
142        // the watcher more wall-clock budget on the slowest backend.
143        let dir = tempdir().expect("tempdir");
144        let path = dir.path().join("obs.yaml");
145        std::fs::write(&path, "filter: info\n").expect("write initial");
146
147        let calls: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
148        let last_filter: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
149        let calls_c = Arc::clone(&calls);
150        let last_filter_c = Arc::clone(&last_filter);
151        let _w = ConfigWatcher::spawn(&path, move |res| {
152            if let Ok(cfg) = res {
153                *last_filter_c.lock() = cfg.filter.clone();
154                calls_c.fetch_add(1, Ordering::SeqCst);
155            }
156        })
157        .expect("spawn");
158
159        // Allow the watcher to subscribe before issuing any writes.
160        std::thread::sleep(Duration::from_millis(500));
161        let mut f = std::fs::File::create(&path).expect("recreate");
162        writeln!(f, "filter: warn").expect("write");
163        f.sync_all().ok();
164        drop(f);
165
166        // Poll for up to 4 s for the event to land + reload to run.
167        for _ in 0..40 {
168            if calls.load(Ordering::SeqCst) > 0 {
169                break;
170            }
171            std::thread::sleep(Duration::from_millis(100));
172        }
173
174        let n = calls.load(Ordering::SeqCst);
175        assert!(n >= 1, "expected at least one reload, got {n}");
176        assert_eq!(last_filter.lock().as_deref(), Some("warn"));
177    }
178}