use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use notify_debouncer_full::notify::RecommendedWatcher;
use notify_debouncer_full::{
DebounceEventResult, Debouncer, RecommendedCache, new_debouncer, notify::RecursiveMode,
};
use terraphim_automata::load_thesaurus_from_json;
use terraphim_types::{NormalizedTerm, NormalizedTermValue, Thesaurus};
use tracing::{error, info, warn};
use crate::kg_scorer::KgPathScorer;
pub struct KgWatcher {
_debouncer: Debouncer<RecommendedWatcher, RecommendedCache>,
watch_path: PathBuf,
}
impl KgWatcher {
pub fn new(
watch_path: impl Into<PathBuf>,
scorer: Arc<KgPathScorer>,
) -> Result<Self, notify_debouncer_full::notify::Error> {
let watch_path = watch_path.into();
let reload_path = watch_path.clone();
let mut debouncer = new_debouncer(
Duration::from_millis(500),
None,
move |result: DebounceEventResult| match result {
Ok(events) if !events.is_empty() => {
info!(
path = %reload_path.display(),
events = events.len(),
"KgWatcher: filesystem change detected, reloading thesaurus"
);
match load_thesaurus_from_dir(&reload_path) {
Ok(thesaurus) => {
scorer.update_thesaurus(thesaurus);
info!(
path = %reload_path.display(),
"KgWatcher: thesaurus reloaded"
);
}
Err(err) => {
warn!(
path = %reload_path.display(),
error = %err,
"KgWatcher: failed to reload thesaurus"
);
}
}
}
Ok(_) => {}
Err(errors) => {
for e in errors {
error!(error = %e, "KgWatcher: notify error");
}
}
},
)?;
debouncer.watch(&watch_path, RecursiveMode::Recursive)?;
Ok(Self {
_debouncer: debouncer,
watch_path,
})
}
pub fn watch_path(&self) -> &Path {
&self.watch_path
}
}
pub(crate) fn load_thesaurus_from_dir(dir: &Path) -> Result<Thesaurus, std::io::Error> {
let entries = std::fs::read_dir(dir)?;
let mut combined = Thesaurus::new(dir.to_string_lossy().to_string());
for entry in entries.filter_map(|e| e.ok()) {
let path = entry.path();
if path.is_file() {
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
if ext == "json" || ext == "jsonl" {
match std::fs::read_to_string(&path) {
Ok(content) => match load_thesaurus_from_json(&content) {
Ok(t) => {
for (k, v) in &t {
let k: NormalizedTermValue = k.clone();
let v: NormalizedTerm = v.clone();
combined.insert(k, v);
}
}
Err(err) => {
warn!(
path = %path.display(),
error = %err,
"KgWatcher: failed to parse thesaurus file, skipping"
);
}
},
Err(err) => {
warn!(
path = %path.display(),
error = %err,
"KgWatcher: failed to read thesaurus file, skipping"
);
}
}
}
}
}
Ok(combined)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use tempfile::TempDir;
use terraphim_types::Thesaurus;
use super::*;
use crate::kg_scorer::KgPathScorer;
fn empty_thesaurus() -> Thesaurus {
Thesaurus::new("test".to_string())
}
fn write_thesaurus_json(dir: &Path, filename: &str, key: &str, nterm: &str) {
let content = format!(
r#"{{"name":"test","data":{{"{key}":{{"id":1,"nterm":"{nterm}","url":null}}}}}}"#,
key = key,
nterm = nterm,
);
std::fs::write(dir.join(filename), content).unwrap();
}
#[test]
fn watcher_creates_without_error() {
let tmp = TempDir::new().unwrap();
let scorer = Arc::new(KgPathScorer::new(empty_thesaurus()));
let watcher = KgWatcher::new(tmp.path(), scorer);
assert!(
watcher.is_ok(),
"KgWatcher::new should succeed on a valid dir"
);
}
#[test]
fn watcher_triggers_update_on_file_write() -> Result<(), String> {
let tmp = TempDir::new().unwrap();
let scorer = Arc::new(KgPathScorer::new(empty_thesaurus()));
let _watcher =
KgWatcher::new(tmp.path(), Arc::clone(&scorer)).expect("KgWatcher::new should succeed");
std::thread::sleep(std::time::Duration::from_millis(500));
write_thesaurus_json(tmp.path(), "kg.json", "rust", "rust");
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
loop {
if scorer.score_path("src/rust.rs") > 0 {
return Ok(()); }
if std::time::Instant::now() >= deadline {
return Err("KgWatcher did not reload thesaurus within 10 seconds".to_string());
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
}
}