obs_core/
config_watcher.rs1#![allow(clippy::disallowed_methods, clippy::disallowed_types)]
17
18use std::{
19 path::{Path, PathBuf},
20 sync::{Arc, mpsc::channel},
21 thread,
22 time::{Duration, Instant},
23};
24
25use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
26use parking_lot::Mutex;
27
28use crate::config::{ConfigError, EventsConfig};
29
30pub struct ConfigWatcher {
32 _watcher: RecommendedWatcher,
33 _join: Arc<Mutex<Option<thread::JoinHandle<()>>>>,
34}
35
36impl std::fmt::Debug for ConfigWatcher {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 f.debug_struct("ConfigWatcher").finish()
39 }
40}
41
42pub const DEFAULT_DEBOUNCE: Duration = Duration::from_millis(200);
44
45impl ConfigWatcher {
46 pub fn spawn<F>(path: impl AsRef<Path>, on_change: F) -> Result<Self, notify::Error>
57 where
58 F: Fn(Result<EventsConfig, ConfigError>) + Send + 'static,
59 {
60 let path: PathBuf = path.as_ref().to_path_buf();
61 let (tx, rx) = channel::<notify::Result<Event>>();
62
63 let mut watcher: RecommendedWatcher = notify::recommended_watcher(move |res| {
64 let _ = tx.send(res);
65 })?;
66 let watch_dir = path.parent().map(Path::to_path_buf).unwrap_or(path.clone());
67 watcher.watch(&watch_dir, RecursiveMode::NonRecursive)?;
68
69 let watch_path = path.clone();
70 let join = thread::Builder::new()
71 .name("obs-config-watcher".to_string())
72 .spawn(move || {
73 let mut last_fire: Option<Instant> = None;
74 while let Ok(event) = rx.recv() {
75 let Ok(ev) = event else { continue };
76 if !is_relevant(&ev, &watch_path) {
77 continue;
78 }
79 if let Some(prev) = last_fire
80 && prev.elapsed() < DEFAULT_DEBOUNCE
81 {
82 continue;
83 }
84 last_fire = Some(Instant::now());
85 let cfg = EventsConfig::from_yaml_path(&watch_path);
86 on_change(cfg);
87 }
88 })
89 .map_err(|e| notify::Error::generic(&format!("spawn watcher thread: {e}")))?;
90
91 Ok(Self {
92 _watcher: watcher,
93 _join: Arc::new(Mutex::new(Some(join))),
94 })
95 }
96}
97
98fn is_relevant(ev: &Event, target: &Path) -> bool {
99 if !matches!(
100 ev.kind,
101 EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
102 ) {
103 return false;
104 }
105 let target_name = target.file_name();
110 let target_parent = target.parent().and_then(|p| p.canonicalize().ok());
111 ev.paths.iter().any(|p| {
112 if p == target {
113 return true;
114 }
115 if p.file_name() != target_name {
116 return false;
117 }
118 let p_parent = p.parent().and_then(|q| q.canonicalize().ok());
119 match (p_parent, target_parent.as_ref()) {
120 (Some(a), Some(b)) => &a == b,
121 _ => false,
122 }
123 })
124}
125
126#[cfg(test)]
127mod tests {
128 use std::{
129 io::Write,
130 sync::atomic::{AtomicUsize, Ordering},
131 time::Duration,
132 };
133
134 use tempfile::tempdir;
135
136 use super::*;
137
138 #[test]
139 fn test_should_reload_on_file_change() {
140 let dir = tempdir().expect("tempdir");
144 let path = dir.path().join("obs.yaml");
145 std::fs::write(&path, "filter: info\n").expect("write initial");
146
147 let calls: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
148 let last_filter: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
149 let calls_c = Arc::clone(&calls);
150 let last_filter_c = Arc::clone(&last_filter);
151 let _w = ConfigWatcher::spawn(&path, move |res| {
152 if let Ok(cfg) = res {
153 *last_filter_c.lock() = cfg.filter.clone();
154 calls_c.fetch_add(1, Ordering::SeqCst);
155 }
156 })
157 .expect("spawn");
158
159 std::thread::sleep(Duration::from_millis(500));
161 let mut f = std::fs::File::create(&path).expect("recreate");
162 writeln!(f, "filter: warn").expect("write");
163 f.sync_all().ok();
164 drop(f);
165
166 for _ in 0..40 {
168 if calls.load(Ordering::SeqCst) > 0 {
169 break;
170 }
171 std::thread::sleep(Duration::from_millis(100));
172 }
173
174 let n = calls.load(Ordering::SeqCst);
175 assert!(n >= 1, "expected at least one reload, got {n}");
176 assert_eq!(last_filter.lock().as_deref(), Some("warn"));
177 }
178}