#![allow(clippy::disallowed_methods, clippy::disallowed_types)]
use std::{
path::{Path, PathBuf},
sync::{Arc, mpsc::channel},
thread,
time::{Duration, Instant},
};
use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use parking_lot::Mutex;
use crate::config::{ConfigError, EventsConfig};
pub struct ConfigWatcher {
_watcher: RecommendedWatcher,
_join: Arc<Mutex<Option<thread::JoinHandle<()>>>>,
}
impl std::fmt::Debug for ConfigWatcher {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConfigWatcher").finish()
}
}
pub const DEFAULT_DEBOUNCE: Duration = Duration::from_millis(200);
impl ConfigWatcher {
pub fn spawn<F>(path: impl AsRef<Path>, on_change: F) -> Result<Self, notify::Error>
where
F: Fn(Result<EventsConfig, ConfigError>) + Send + 'static,
{
let path: PathBuf = path.as_ref().to_path_buf();
let (tx, rx) = channel::<notify::Result<Event>>();
let mut watcher: RecommendedWatcher = notify::recommended_watcher(move |res| {
let _ = tx.send(res);
})?;
let watch_dir = path.parent().map(Path::to_path_buf).unwrap_or(path.clone());
watcher.watch(&watch_dir, RecursiveMode::NonRecursive)?;
let watch_path = path.clone();
let join = thread::Builder::new()
.name("obs-config-watcher".to_string())
.spawn(move || {
let mut last_fire: Option<Instant> = None;
while let Ok(event) = rx.recv() {
let Ok(ev) = event else { continue };
if !is_relevant(&ev, &watch_path) {
continue;
}
if let Some(prev) = last_fire
&& prev.elapsed() < DEFAULT_DEBOUNCE
{
continue;
}
last_fire = Some(Instant::now());
let cfg = EventsConfig::from_yaml_path(&watch_path);
on_change(cfg);
}
})
.map_err(|e| notify::Error::generic(&format!("spawn watcher thread: {e}")))?;
Ok(Self {
_watcher: watcher,
_join: Arc::new(Mutex::new(Some(join))),
})
}
}
fn is_relevant(ev: &Event, target: &Path) -> bool {
if !matches!(
ev.kind,
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
) {
return false;
}
let target_name = target.file_name();
let target_parent = target.parent().and_then(|p| p.canonicalize().ok());
ev.paths.iter().any(|p| {
if p == target {
return true;
}
if p.file_name() != target_name {
return false;
}
let p_parent = p.parent().and_then(|q| q.canonicalize().ok());
match (p_parent, target_parent.as_ref()) {
(Some(a), Some(b)) => &a == b,
_ => false,
}
})
}
#[cfg(test)]
mod tests {
use std::{
io::Write,
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
use tempfile::tempdir;
use super::*;
#[test]
fn test_should_reload_on_file_change() {
let dir = tempdir().expect("tempdir");
let path = dir.path().join("obs.yaml");
std::fs::write(&path, "filter: info\n").expect("write initial");
let calls: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
let last_filter: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
let calls_c = Arc::clone(&calls);
let last_filter_c = Arc::clone(&last_filter);
let _w = ConfigWatcher::spawn(&path, move |res| {
if let Ok(cfg) = res {
*last_filter_c.lock() = cfg.filter.clone();
calls_c.fetch_add(1, Ordering::SeqCst);
}
})
.expect("spawn");
std::thread::sleep(Duration::from_millis(500));
let mut f = std::fs::File::create(&path).expect("recreate");
writeln!(f, "filter: warn").expect("write");
f.sync_all().ok();
drop(f);
for _ in 0..40 {
if calls.load(Ordering::SeqCst) > 0 {
break;
}
std::thread::sleep(Duration::from_millis(100));
}
let n = calls.load(Ordering::SeqCst);
assert!(n >= 1, "expected at least one reload, got {n}");
assert_eq!(last_filter.lock().as_deref(), Some("warn"));
}
}