use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use notify::RecursiveMode;
use notify_debouncer_mini::{DebounceEventResult, Debouncer, new_debouncer};
use tokio::runtime::Handle;
use super::exclude::ExcludeSet;
use super::index::VaultIndex;
use super::tantivy_index::TantivyIndex;
use crate::error::{VaultError, VaultResult};
const DEBOUNCE_TIMEOUT: Duration = Duration::from_millis(500);
const EVENT_CHANNEL_CAPACITY: usize = 256;
#[cfg(has_embeddings)]
pub fn start_watcher(
vault_root: PathBuf,
index: Arc<RwLock<VaultIndex>>,
tantivy: Option<Arc<TantivyIndex>>,
embedding_model: Option<Arc<super::embeddings::EmbeddingModel>>,
embedding_store: Option<Arc<RwLock<super::embeddings::EmbeddingStore>>>,
exclude: Arc<ExcludeSet>,
mcp_data: PathBuf,
) -> VaultResult<Debouncer<notify::RecommendedWatcher>> {
let (tx, mut rx) = tokio::sync::mpsc::channel::<DebounceEventResult>(EVENT_CHANNEL_CAPACITY);
let rt = Handle::current();
let mut debouncer = new_debouncer(DEBOUNCE_TIMEOUT, move |result: DebounceEventResult| {
let tx = tx.clone();
rt.spawn(async move {
if let Err(e) = tx.send(result).await {
tracing::error!("watcher channel closed: {e}");
}
});
})
.map_err(|e| VaultError::Watcher(e.to_string()))?;
debouncer
.watcher()
.watch(&vault_root, RecursiveMode::Recursive)
.map_err(|e| {
VaultError::Watcher(format!("failed to watch {}: {e}", vault_root.display()))
})?;
tracing::info!(path = %vault_root.display(), "filesystem watcher started");
tokio::spawn(async move {
while let Some(result) = rx.recv().await {
match result {
Ok(events) => {
let mut tantivy_dirty = false;
let mut embedding_dirty = false;
for event in events {
let (tv_touched, emb_touched) = process_event(
&vault_root,
&index,
tantivy.as_deref(),
embedding_model.as_deref(),
embedding_store.as_ref(),
&event.path,
&exclude,
);
tantivy_dirty |= tv_touched;
embedding_dirty |= emb_touched;
}
if tantivy_dirty
&& let Some(ref tv) = tantivy
&& let Err(e) = tv.flush()
{
tracing::warn!(error = %e, "tantivy batch flush failed");
}
if embedding_dirty
&& let Some(ref store) = embedding_store
&& let Ok(s) = store.read()
{
save_embedding_cache(&mcp_data, &s);
}
}
Err(e) => {
tracing::warn!("watch error: {e}");
}
}
}
tracing::debug!("watcher event loop exited");
});
Ok(debouncer)
}
#[cfg(not(has_embeddings))]
pub fn start_watcher(
vault_root: PathBuf,
index: Arc<RwLock<VaultIndex>>,
tantivy: Option<Arc<TantivyIndex>>,
exclude: Arc<ExcludeSet>,
) -> VaultResult<Debouncer<notify::RecommendedWatcher>> {
let (tx, mut rx) = tokio::sync::mpsc::channel::<DebounceEventResult>(EVENT_CHANNEL_CAPACITY);
let rt = Handle::current();
let mut debouncer = new_debouncer(DEBOUNCE_TIMEOUT, move |result: DebounceEventResult| {
let tx = tx.clone();
rt.spawn(async move {
if let Err(e) = tx.send(result).await {
tracing::error!("watcher channel closed: {e}");
}
});
})
.map_err(|e| VaultError::Watcher(e.to_string()))?;
debouncer
.watcher()
.watch(&vault_root, RecursiveMode::Recursive)
.map_err(|e| {
VaultError::Watcher(format!("failed to watch {}: {e}", vault_root.display()))
})?;
tracing::info!(path = %vault_root.display(), "filesystem watcher started");
tokio::spawn(async move {
while let Some(result) = rx.recv().await {
match result {
Ok(events) => {
let mut tantivy_dirty = false;
for event in events {
tantivy_dirty |= process_event(
&vault_root,
&index,
tantivy.as_deref(),
&event.path,
&exclude,
);
}
if tantivy_dirty
&& let Some(ref tv) = tantivy
&& let Err(e) = tv.flush()
{
tracing::warn!(error = %e, "tantivy batch flush failed");
}
}
Err(e) => {
tracing::warn!("watch error: {e}");
}
}
}
tracing::debug!("watcher event loop exited");
});
Ok(debouncer)
}
fn should_process_path(vault_root: &Path, absolute: &Path) -> bool {
let relative = match absolute.strip_prefix(vault_root) {
Ok(r) => r,
Err(_) => {
tracing::trace!(path = %absolute.display(), "event path outside vault root, ignoring");
return false;
}
};
if is_obsidian_dir(relative) {
return false;
}
match absolute.extension().and_then(|e| e.to_str()) {
Some(ext) if ext.eq_ignore_ascii_case("md") => true,
Some(ext) => {
tracing::trace!(path = %relative.display(), ext, "non-markdown file, ignoring");
false
}
None => {
let path_str = absolute.to_string_lossy();
if path_str.to_ascii_lowercase().ends_with(".md") {
true
} else {
tracing::trace!(path = %relative.display(), "no extension, ignoring");
false
}
}
}
}
fn is_obsidian_dir(relative: &Path) -> bool {
relative.components().next().is_some_and(|c| {
let name = c.as_os_str();
name == ".obsidian" || name == ".obsidian-mcp"
})
}
fn normalized_relative_path(vault_root: &Path, absolute: &Path) -> Option<PathBuf> {
absolute
.strip_prefix(vault_root)
.ok()
.map(|relative| PathBuf::from(relative.to_string_lossy().replace('\\', "/")))
}
fn is_excluded_path(exclude: &ExcludeSet, relative: &Path) -> bool {
exclude.is_excluded(Path::new(&relative.to_string_lossy().replace('\\', "/")))
}
#[cfg(has_embeddings)]
fn process_event(
vault_root: &Path,
index: &Arc<RwLock<VaultIndex>>,
tantivy: Option<&TantivyIndex>,
embedding_model: Option<&super::embeddings::EmbeddingModel>,
embedding_store: Option<&Arc<RwLock<super::embeddings::EmbeddingStore>>>,
absolute: &Path,
exclude: &ExcludeSet,
) -> (bool, bool) {
if !should_process_path(vault_root, absolute) {
return (false, false);
}
let relative = match normalized_relative_path(vault_root, absolute) {
Some(r) => r,
None => return (false, false),
};
let mut tv_touched = false;
let mut emb_touched = false;
if is_excluded_path(exclude, &relative) {
if absolute.exists() {
tracing::debug!(path = %relative.display(), "tracking excluded note");
match index.write() {
Ok(mut idx) => idx.add_excluded_file(&relative),
Err(e) => {
tracing::error!("index lock poisoned: {e}");
return (false, false);
}
}
} else {
tracing::debug!(path = %relative.display(), "removing excluded note tracking");
match index.write() {
Ok(mut idx) => idx.remove_file(&relative),
Err(e) => {
tracing::error!("index lock poisoned: {e}");
return (false, false);
}
}
}
if let Some(tv) = tantivy {
if let Err(e) = tv.remove_file_batch(&relative) {
tracing::warn!(path = %relative.display(), error = %e, "tantivy remove failed");
} else {
tv_touched = true;
}
}
if let Some(store) = embedding_store
&& let Ok(mut s) = store.write()
{
s.remove(&relative);
emb_touched = true;
}
return (tv_touched, emb_touched);
}
if absolute.exists() {
tracing::debug!(path = %relative.display(), "reindexing (create/modify)");
let meta = match index.write() {
Ok(mut idx) => {
if let Err(e) = idx.reindex_file(vault_root, &relative) {
tracing::warn!(path = %relative.display(), error = %e, "reindex failed");
return (false, false);
}
idx.get_note(&relative).cloned()
}
Err(e) => {
tracing::error!("index lock poisoned: {e}");
return (false, false);
}
};
if let Some(tv) = tantivy
&& let Some(ref m) = meta
{
if let Err(e) = tv.reindex_file_batch(vault_root, &relative, m) {
tracing::warn!(path = %relative.display(), error = %e, "tantivy reindex failed");
} else {
tv_touched = true;
}
}
if let (Some(model), Some(store), Some(m)) =
(embedding_model, embedding_store, meta.as_ref())
{
emb_touched = embed_and_insert(vault_root, &relative, m, model, store);
}
} else {
tracing::debug!(path = %relative.display(), "removing (delete)");
match index.write() {
Ok(mut idx) => idx.remove_file(&relative),
Err(e) => {
tracing::error!("index lock poisoned: {e}");
return (false, false);
}
}
if let Some(tv) = tantivy {
if let Err(e) = tv.remove_file_batch(&relative) {
tracing::warn!(path = %relative.display(), error = %e, "tantivy remove failed");
} else {
tv_touched = true;
}
}
if let Some(store) = embedding_store
&& let Ok(mut s) = store.write()
{
s.remove(&relative);
emb_touched = true;
}
}
(tv_touched, emb_touched)
}
#[cfg(has_embeddings)]
fn embed_and_insert(
vault_root: &Path,
relative: &Path,
meta: &crate::models::NoteMetadata,
model: &super::embeddings::EmbeddingModel,
store: &Arc<RwLock<super::embeddings::EmbeddingStore>>,
) -> bool {
let Ok(content) = super::fs::read_file(vault_root, relative) else {
return false;
};
let body = super::frontmatter::get_body(&content);
let heading_texts: Vec<String> = meta.headings.iter().map(|h| h.text.clone()).collect();
let text = super::embeddings::prepare_embed_text(&meta.title, &heading_texts, body);
match model.embed_one(&text) {
Ok(vec) => {
if let Ok(mut s) = store.write() {
s.insert(relative.to_path_buf(), vec);
true
} else {
false
}
}
Err(e) => {
tracing::warn!(path = %relative.display(), error = %e, "embedding failed in watcher");
false
}
}
}
#[cfg(has_embeddings)]
fn save_embedding_cache(mcp_data: &Path, store: &super::embeddings::EmbeddingStore) {
let cache_path = mcp_data.join("embeddings").join("embeddings.bin");
if let Err(e) = store.save(&cache_path) {
tracing::warn!(error = %e, "failed to save embedding cache from watcher");
}
}
#[cfg(not(has_embeddings))]
fn process_event(
vault_root: &Path,
index: &Arc<RwLock<VaultIndex>>,
tantivy: Option<&TantivyIndex>,
absolute: &Path,
exclude: &ExcludeSet,
) -> bool {
if !should_process_path(vault_root, absolute) {
return false;
}
let relative = match normalized_relative_path(vault_root, absolute) {
Some(r) => r,
None => return false,
};
if is_excluded_path(exclude, &relative) {
if absolute.exists() {
tracing::debug!(path = %relative.display(), "tracking excluded note");
match index.write() {
Ok(mut idx) => idx.add_excluded_file(&relative),
Err(e) => {
tracing::error!("index lock poisoned: {e}");
return false;
}
}
} else {
tracing::debug!(path = %relative.display(), "removing excluded note tracking");
match index.write() {
Ok(mut idx) => idx.remove_file(&relative),
Err(e) => {
tracing::error!("index lock poisoned: {e}");
return false;
}
}
}
if let Some(tv) = tantivy {
if let Err(e) = tv.remove_file_batch(&relative) {
tracing::warn!(path = %relative.display(), error = %e, "tantivy remove failed");
return false;
}
return true;
}
return false;
}
if absolute.exists() {
tracing::debug!(path = %relative.display(), "reindexing (create/modify)");
let meta = match index.write() {
Ok(mut idx) => {
if let Err(e) = idx.reindex_file(vault_root, &relative) {
tracing::warn!(path = %relative.display(), error = %e, "reindex failed");
return false;
}
idx.get_note(&relative).cloned()
}
Err(e) => {
tracing::error!("index lock poisoned: {e}");
return false;
}
};
if let Some(tv) = tantivy
&& let Some(ref m) = meta
{
if let Err(e) = tv.reindex_file_batch(vault_root, &relative, m) {
tracing::warn!(path = %relative.display(), error = %e, "tantivy reindex failed");
return false;
}
return true;
}
false
} else {
tracing::debug!(path = %relative.display(), "removing (delete)");
match index.write() {
Ok(mut idx) => idx.remove_file(&relative),
Err(e) => {
tracing::error!("index lock poisoned: {e}");
return false;
}
}
if let Some(tv) = tantivy {
if let Err(e) = tv.remove_file_batch(&relative) {
tracing::warn!(path = %relative.display(), error = %e, "tantivy remove failed");
return false;
}
return true;
}
false
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
fn vault() -> PathBuf {
PathBuf::from("/tmp/test-vault")
}
#[test]
fn filters_obsidian_directory() {
let root = vault();
assert!(!should_process_path(
&root,
&root.join(".obsidian/plugins/foo.json"),
));
assert!(!should_process_path(
&root,
&root.join(".obsidian/workspace.json"),
));
}
#[test]
fn filters_obsidian_mcp_directory() {
let root = vault();
assert!(!should_process_path(
&root,
&root.join(".obsidian-mcp/config.json"),
));
assert!(!should_process_path(
&root,
&root.join(".obsidian-mcp/ignore"),
));
}
#[test]
fn filters_non_markdown_files() {
let root = vault();
assert!(!should_process_path(&root, &root.join("image.png")));
assert!(!should_process_path(&root, &root.join("data.json")));
assert!(!should_process_path(
&root,
&root.join("subfolder/script.js"),
));
}
#[test]
fn accepts_markdown_files() {
let root = vault();
assert!(should_process_path(&root, &root.join("note.md")));
assert!(should_process_path(
&root,
&root.join("subfolder/deep/note.md"),
));
}
#[test]
fn accepts_uppercase_markdown_extension() {
let root = vault();
assert!(should_process_path(&root, &root.join("NOTE.MD")));
assert!(should_process_path(&root, &root.join("Mixed.Md")));
assert!(should_process_path(&root, &root.join("subfolder/CAPS.MD"),));
}
#[test]
fn filters_paths_outside_vault() {
let root = vault();
assert!(!should_process_path(
&root,
Path::new("/other/place/note.md"),
));
}
#[test]
fn obsidian_dir_detection() {
assert!(is_obsidian_dir(Path::new(".obsidian/plugins/foo.json")));
assert!(is_obsidian_dir(Path::new(".obsidian")));
assert!(!is_obsidian_dir(Path::new("notes/.obsidian/foo")));
assert!(!is_obsidian_dir(Path::new("daily/2024-01-01.md")));
}
#[test]
fn obsidian_mcp_dir_detection() {
assert!(is_obsidian_dir(Path::new(".obsidian-mcp/ignore")));
assert!(is_obsidian_dir(Path::new(".obsidian-mcp")));
assert!(is_obsidian_dir(Path::new(
".obsidian-mcp/embeddings/embeddings.bin"
)));
assert!(!is_obsidian_dir(Path::new("notes/.obsidian-mcp/foo")));
}
#[test]
fn accepts_excluded_markdown_paths_for_tracking() {
let root = vault();
let exclude = ExcludeSet::build(vec!["Archive/".into()]).unwrap();
assert!(should_process_path(&root, &root.join("Archive/note.md")));
assert!(should_process_path(
&root,
&root.join("Archive/sub/deep.md")
));
assert!(is_excluded_path(&exclude, Path::new("Archive/note.md")));
assert!(is_excluded_path(&exclude, Path::new("Archive/sub/deep.md")));
}
#[test]
fn accepts_non_excluded_paths() {
let root = vault();
let exclude = ExcludeSet::build(vec!["Archive/".into()]).unwrap();
assert!(should_process_path(&root, &root.join("Active/note.md"),));
assert!(should_process_path(
&root,
&root.join("Daily/2024-01-01.md"),
));
assert!(!is_excluded_path(&exclude, Path::new("Active/note.md")));
assert!(!is_excluded_path(
&exclude,
Path::new("Daily/2024-01-01.md")
));
}
fn call_start_watcher(
vault_root: PathBuf,
index: Arc<RwLock<VaultIndex>>,
) -> VaultResult<Debouncer<notify::RecommendedWatcher>> {
let exclude = Arc::new(ExcludeSet::build(vec![]).unwrap());
#[cfg(has_embeddings)]
{
let mcp_data = vault_root.join(".obsidian-mcp");
start_watcher(vault_root, index, None, None, None, exclude, mcp_data)
}
#[cfg(not(has_embeddings))]
{
start_watcher(vault_root, index, None, exclude)
}
}
fn call_process_event(
vault_root: &Path,
index: &Arc<RwLock<VaultIndex>>,
absolute: &Path,
exclude: &ExcludeSet,
) {
#[cfg(has_embeddings)]
{
let _ = process_event(vault_root, index, None, None, None, absolute, exclude);
}
#[cfg(not(has_embeddings))]
{
let _ = process_event(vault_root, index, None, absolute, exclude);
}
}
#[tokio::test]
async fn excluded_create_event_updates_stats_without_indexing() {
let dir = tempfile::tempdir().unwrap();
let vault_root = dir.path();
std::fs::create_dir_all(vault_root.join("Archive")).unwrap();
let path = vault_root.join("Archive/hidden.md");
std::fs::write(&path, "# Hidden\n").unwrap();
let index = Arc::new(RwLock::new(VaultIndex::empty()));
let exclude = ExcludeSet::build(vec!["Archive/".into()]).unwrap();
call_process_event(vault_root, &index, &path, &exclude);
let idx = index.read().unwrap();
assert_eq!(idx.stats().excluded_notes, 1);
assert!(idx.get_note(Path::new("Archive/hidden.md")).is_none());
}
#[tokio::test]
async fn excluded_delete_event_clears_stats_tracking() {
let dir = tempfile::tempdir().unwrap();
let vault_root = dir.path();
std::fs::create_dir_all(vault_root.join("Archive")).unwrap();
let path = vault_root.join("Archive/hidden.md");
std::fs::write(&path, "# Hidden\n").unwrap();
let index = Arc::new(RwLock::new(VaultIndex::empty()));
let exclude = ExcludeSet::build(vec!["Archive/".into()]).unwrap();
call_process_event(vault_root, &index, &path, &exclude);
std::fs::remove_file(&path).unwrap();
call_process_event(vault_root, &index, &path, &exclude);
let idx = index.read().unwrap();
assert_eq!(idx.stats().excluded_notes, 0);
assert!(idx.get_note(Path::new("Archive/hidden.md")).is_none());
}
#[tokio::test]
async fn watcher_starts_and_stops() {
let dir = tempfile::tempdir().unwrap();
let vault_root = dir.path().to_path_buf();
let index = Arc::new(RwLock::new(VaultIndex::empty()));
let debouncer = call_start_watcher(vault_root, index);
assert!(debouncer.is_ok(), "watcher should start without error");
drop(debouncer.unwrap());
tokio::time::sleep(Duration::from_millis(100)).await;
}
#[tokio::test]
async fn watcher_survives_mixed_file_events() {
let dir = tempfile::tempdir().unwrap();
let vault_root = dir.path().to_path_buf();
let index = Arc::new(RwLock::new(VaultIndex::empty()));
let _debouncer = call_start_watcher(vault_root.clone(), index).unwrap();
std::fs::write(vault_root.join("image.png"), b"fake png").unwrap();
std::fs::create_dir_all(vault_root.join(".obsidian")).unwrap();
std::fs::write(vault_root.join(".obsidian/workspace.json"), b"{}").unwrap();
std::fs::write(vault_root.join("note.md"), "# Hello\n").unwrap();
std::fs::write(vault_root.join("note.md"), "# Hello\nUpdated.\n").unwrap();
tokio::time::sleep(Duration::from_millis(1500)).await;
std::fs::remove_file(vault_root.join("note.md")).unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}