use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use notify::RecursiveMode;
use notify_debouncer_mini::{DebounceEventResult, Debouncer, new_debouncer};
use tokio::runtime::Handle;
use super::index::VaultIndex;
use super::tantivy_index::TantivyIndex;
use crate::error::{VaultError, VaultResult};
const DEBOUNCE_TIMEOUT: Duration = Duration::from_millis(500);
const EVENT_CHANNEL_CAPACITY: usize = 256;
#[cfg(feature = "embeddings")]
pub fn start_watcher(
vault_root: PathBuf,
index: Arc<RwLock<VaultIndex>>,
tantivy: Option<Arc<TantivyIndex>>,
embedding_model: Option<Arc<super::embeddings::EmbeddingModel>>,
embedding_store: Option<Arc<RwLock<super::embeddings::EmbeddingStore>>>,
) -> VaultResult<Debouncer<notify::RecommendedWatcher>> {
let (tx, mut rx) = tokio::sync::mpsc::channel::<DebounceEventResult>(EVENT_CHANNEL_CAPACITY);
let rt = Handle::current();
let mut debouncer = new_debouncer(DEBOUNCE_TIMEOUT, move |result: DebounceEventResult| {
let tx = tx.clone();
rt.spawn(async move {
if let Err(e) = tx.send(result).await {
tracing::error!("watcher channel closed: {e}");
}
});
})
.map_err(|e| VaultError::Watcher(e.to_string()))?;
debouncer
.watcher()
.watch(&vault_root, RecursiveMode::Recursive)
.map_err(|e| {
VaultError::Watcher(format!("failed to watch {}: {e}", vault_root.display()))
})?;
tracing::info!(path = %vault_root.display(), "filesystem watcher started");
tokio::spawn(async move {
while let Some(result) = rx.recv().await {
match result {
Ok(events) => {
for event in events {
process_event(
&vault_root,
&index,
tantivy.as_deref(),
embedding_model.as_deref(),
embedding_store.as_ref(),
&event.path,
);
}
}
Err(e) => {
tracing::warn!("watch error: {e}");
}
}
}
tracing::debug!("watcher event loop exited");
});
Ok(debouncer)
}
#[cfg(not(feature = "embeddings"))]
pub fn start_watcher(
vault_root: PathBuf,
index: Arc<RwLock<VaultIndex>>,
tantivy: Option<Arc<TantivyIndex>>,
) -> VaultResult<Debouncer<notify::RecommendedWatcher>> {
let (tx, mut rx) = tokio::sync::mpsc::channel::<DebounceEventResult>(EVENT_CHANNEL_CAPACITY);
let rt = Handle::current();
let mut debouncer = new_debouncer(DEBOUNCE_TIMEOUT, move |result: DebounceEventResult| {
let tx = tx.clone();
rt.spawn(async move {
if let Err(e) = tx.send(result).await {
tracing::error!("watcher channel closed: {e}");
}
});
})
.map_err(|e| VaultError::Watcher(e.to_string()))?;
debouncer
.watcher()
.watch(&vault_root, RecursiveMode::Recursive)
.map_err(|e| {
VaultError::Watcher(format!("failed to watch {}: {e}", vault_root.display()))
})?;
tracing::info!(path = %vault_root.display(), "filesystem watcher started");
tokio::spawn(async move {
while let Some(result) = rx.recv().await {
match result {
Ok(events) => {
for event in events {
process_event(&vault_root, &index, tantivy.as_deref(), &event.path);
}
}
Err(e) => {
tracing::warn!("watch error: {e}");
}
}
}
tracing::debug!("watcher event loop exited");
});
Ok(debouncer)
}
fn should_process_path(vault_root: &Path, absolute: &Path) -> bool {
let relative = match absolute.strip_prefix(vault_root) {
Ok(r) => r,
Err(_) => {
tracing::trace!(path = %absolute.display(), "event path outside vault root, ignoring");
return false;
}
};
if is_obsidian_dir(relative) {
return false;
}
match absolute.extension().and_then(|e| e.to_str()) {
Some("md") => true,
Some(ext) => {
tracing::trace!(path = %relative.display(), ext, "non-markdown file, ignoring");
false
}
None => {
let path_str = absolute.to_string_lossy();
if path_str.ends_with(".md") {
true
} else {
tracing::trace!(path = %relative.display(), "no extension, ignoring");
false
}
}
}
}
fn is_obsidian_dir(relative: &Path) -> bool {
relative
.components()
.next()
.is_some_and(|c| c.as_os_str() == ".obsidian")
}
#[cfg(feature = "embeddings")]
fn process_event(
vault_root: &Path,
index: &Arc<RwLock<VaultIndex>>,
tantivy: Option<&TantivyIndex>,
embedding_model: Option<&super::embeddings::EmbeddingModel>,
embedding_store: Option<&Arc<RwLock<super::embeddings::EmbeddingStore>>>,
absolute: &Path,
) {
if !should_process_path(vault_root, absolute) {
return;
}
let relative = match absolute.strip_prefix(vault_root) {
Ok(r) => r.to_path_buf(),
Err(_) => return,
};
if absolute.exists() {
tracing::debug!(path = %relative.display(), "reindexing (create/modify)");
match index.write() {
Ok(mut idx) => {
if let Err(e) = idx.reindex_file(vault_root, &relative) {
tracing::warn!(path = %relative.display(), error = %e, "reindex failed");
return;
}
let meta = idx.get_note(&relative).cloned();
if let Some(tv) = tantivy
&& let Some(ref m) = meta
&& let Err(e) = tv.reindex_file(vault_root, &relative, m)
{
tracing::warn!(path = %relative.display(), error = %e, "tantivy reindex failed");
}
if let (Some(model), Some(store), Some(m)) =
(embedding_model, embedding_store, meta.as_ref())
{
embed_and_insert(vault_root, &relative, m, model, store);
}
}
Err(e) => {
tracing::error!("index lock poisoned: {e}");
}
}
} else {
tracing::debug!(path = %relative.display(), "removing (delete)");
match index.write() {
Ok(mut idx) => {
idx.remove_file(&relative);
if let Some(tv) = tantivy
&& let Err(e) = tv.remove_file(&relative)
{
tracing::warn!(path = %relative.display(), error = %e, "tantivy remove failed");
}
if let Some(store) = embedding_store
&& let Ok(mut s) = store.write()
{
s.remove(&relative);
save_embedding_cache(vault_root, &s);
}
}
Err(e) => {
tracing::error!("index lock poisoned: {e}");
}
}
}
}
#[cfg(feature = "embeddings")]
fn embed_and_insert(
vault_root: &Path,
relative: &Path,
meta: &crate::models::NoteMetadata,
model: &super::embeddings::EmbeddingModel,
store: &Arc<RwLock<super::embeddings::EmbeddingStore>>,
) {
let Ok(content) = super::fs::read_file(vault_root, relative) else {
return;
};
let body = super::frontmatter::get_body(&content);
let heading_texts: Vec<String> = meta.headings.iter().map(|h| h.text.clone()).collect();
let text = super::embeddings::prepare_embed_text(&meta.title, &heading_texts, body);
match model.embed_one(&text) {
Ok(vec) => {
if let Ok(mut s) = store.write() {
s.insert(relative.to_path_buf(), vec);
save_embedding_cache(vault_root, &s);
}
}
Err(e) => {
tracing::warn!(path = %relative.display(), error = %e, "embedding failed in watcher");
}
}
}
#[cfg(feature = "embeddings")]
fn save_embedding_cache(vault_root: &Path, store: &super::embeddings::EmbeddingStore) {
let cache_path = vault_root
.join(".obsidian")
.join("obsidian-mcp")
.join("embeddings.bin");
if let Err(e) = store.save(&cache_path) {
tracing::warn!(error = %e, "failed to save embedding cache from watcher");
}
}
#[cfg(not(feature = "embeddings"))]
fn process_event(
vault_root: &Path,
index: &Arc<RwLock<VaultIndex>>,
tantivy: Option<&TantivyIndex>,
absolute: &Path,
) {
if !should_process_path(vault_root, absolute) {
return;
}
let relative = match absolute.strip_prefix(vault_root) {
Ok(r) => r.to_path_buf(),
Err(_) => return,
};
if absolute.exists() {
tracing::debug!(path = %relative.display(), "reindexing (create/modify)");
match index.write() {
Ok(mut idx) => {
if let Err(e) = idx.reindex_file(vault_root, &relative) {
tracing::warn!(path = %relative.display(), error = %e, "reindex failed");
return;
}
if let Some(tv) = tantivy
&& let Some(meta) = idx.get_note(&relative)
&& let Err(e) = tv.reindex_file(vault_root, &relative, meta)
{
tracing::warn!(path = %relative.display(), error = %e, "tantivy reindex failed");
}
}
Err(e) => {
tracing::error!("index lock poisoned: {e}");
}
}
} else {
tracing::debug!(path = %relative.display(), "removing (delete)");
match index.write() {
Ok(mut idx) => {
idx.remove_file(&relative);
if let Some(tv) = tantivy
&& let Err(e) = tv.remove_file(&relative)
{
tracing::warn!(path = %relative.display(), error = %e, "tantivy remove failed");
}
}
Err(e) => {
tracing::error!("index lock poisoned: {e}");
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
fn vault() -> PathBuf {
PathBuf::from("/tmp/test-vault")
}
#[test]
fn filters_obsidian_directory() {
let root = vault();
assert!(!should_process_path(
&root,
&root.join(".obsidian/plugins/foo.json")
));
assert!(!should_process_path(
&root,
&root.join(".obsidian/workspace.json")
));
}
#[test]
fn filters_non_markdown_files() {
let root = vault();
assert!(!should_process_path(&root, &root.join("image.png")));
assert!(!should_process_path(&root, &root.join("data.json")));
assert!(!should_process_path(
&root,
&root.join("subfolder/script.js")
));
}
#[test]
fn accepts_markdown_files() {
let root = vault();
assert!(should_process_path(&root, &root.join("note.md")));
assert!(should_process_path(
&root,
&root.join("subfolder/deep/note.md")
));
}
#[test]
fn filters_paths_outside_vault() {
let root = vault();
assert!(!should_process_path(
&root,
Path::new("/other/place/note.md")
));
}
#[test]
fn obsidian_dir_detection() {
assert!(is_obsidian_dir(Path::new(".obsidian/plugins/foo.json")));
assert!(is_obsidian_dir(Path::new(".obsidian")));
assert!(!is_obsidian_dir(Path::new("notes/.obsidian/foo")));
assert!(!is_obsidian_dir(Path::new("daily/2024-01-01.md")));
}
fn call_start_watcher(
vault_root: PathBuf,
index: Arc<RwLock<VaultIndex>>,
) -> VaultResult<Debouncer<notify::RecommendedWatcher>> {
#[cfg(feature = "embeddings")]
{
start_watcher(vault_root, index, None, None, None)
}
#[cfg(not(feature = "embeddings"))]
{
start_watcher(vault_root, index, None)
}
}
#[tokio::test]
async fn watcher_starts_and_stops() {
let dir = tempfile::tempdir().unwrap();
let vault_root = dir.path().to_path_buf();
let index = Arc::new(RwLock::new(VaultIndex::empty()));
let debouncer = call_start_watcher(vault_root, index);
assert!(debouncer.is_ok(), "watcher should start without error");
drop(debouncer.unwrap());
tokio::time::sleep(Duration::from_millis(100)).await;
}
#[tokio::test]
async fn watcher_survives_mixed_file_events() {
let dir = tempfile::tempdir().unwrap();
let vault_root = dir.path().to_path_buf();
let index = Arc::new(RwLock::new(VaultIndex::empty()));
let _debouncer = call_start_watcher(vault_root.clone(), index).unwrap();
std::fs::write(vault_root.join("image.png"), b"fake png").unwrap();
std::fs::create_dir_all(vault_root.join(".obsidian")).unwrap();
std::fs::write(vault_root.join(".obsidian/workspace.json"), b"{}").unwrap();
std::fs::write(vault_root.join("note.md"), "# Hello\n").unwrap();
std::fs::write(vault_root.join("note.md"), "# Hello\nUpdated.\n").unwrap();
tokio::time::sleep(Duration::from_millis(1500)).await;
std::fs::remove_file(vault_root.join("note.md")).unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}