Skip to main content

talon_cli/mcp/background/
watcher.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3use std::time::Duration;
4
5use color_eyre::eyre::Result;
6use notify_debouncer_mini::{DebounceEventResult, new_debouncer};
7
8use crate::config::RefreshLockPolicy;
9use crate::mcp::state::McpServerState;
10
11const DEBOUNCE_SECS: u64 = 60;
12
13/// Spawns a background thread watching `vault_path` for `.md` changes.
14///
15/// Changes trigger a fast (no-embed) index refresh via the existing
16/// `refresh_index_if_needed` function. Non-`.md` files are ignored.
17/// Errors are recorded in diagnostics; the watcher thread runs until
18/// the process exits.
19///
20/// If the thread fails to spawn, the error is silently ignored; the MCP
21/// server continues without the watcher.
22pub fn spawn_watcher(vault_path: PathBuf, state: Arc<McpServerState>) {
23    let _ = std::thread::Builder::new()
24        .name("talon-vault-watcher".to_owned())
25        .spawn(move || {
26            if let Err(e) = run_watcher(&vault_path, &state) {
27                let mut err = state
28                    .diagnostics
29                    .last_refresh_error
30                    .lock()
31                    .unwrap_or_else(std::sync::PoisonError::into_inner);
32                *err = Some(format!("watcher error: {e:#}"));
33            }
34        });
35}
36
37fn run_watcher(vault_path: &Path, state: &Arc<McpServerState>) -> Result<()> {
38    use std::sync::atomic::Ordering;
39
40    let state_clone = Arc::clone(state);
41    let (tx, rx) = std::sync::mpsc::channel::<DebounceEventResult>();
42
43    let mut debouncer = new_debouncer(Duration::from_secs(DEBOUNCE_SECS), tx)?;
44    debouncer.watcher().watch(
45        vault_path,
46        notify_debouncer_mini::notify::RecursiveMode::Recursive,
47    )?;
48
49    state
50        .diagnostics
51        .watcher_running
52        .store(true, Ordering::Relaxed);
53
54    let mut conn = talon_core::open_database(&state_clone.config.db_path)?;
55
56    for result in rx {
57        match result {
58            Ok(events) => {
59                // Only react to .md file changes.
60                let has_md = events
61                    .iter()
62                    .any(|e| e.path.extension().is_some_and(|ext| ext == "md"));
63                if !has_md {
64                    continue;
65                }
66                // Refresh the text index (no-embed, skip if busy).
67                if let Err(e) = crate::config::refresh_index_if_needed(
68                    &state_clone.config.config,
69                    &mut conn,
70                    true,
71                    RefreshLockPolicy::SkipIfBusy,
72                ) {
73                    let mut err = state_clone
74                        .diagnostics
75                        .last_refresh_error
76                        .lock()
77                        .unwrap_or_else(std::sync::PoisonError::into_inner);
78                    *err = Some(format!("refresh error: {e:#}"));
79                } else {
80                    let mut err = state_clone
81                        .diagnostics
82                        .last_refresh_error
83                        .lock()
84                        .unwrap_or_else(std::sync::PoisonError::into_inner);
85                    *err = None;
86                }
87            }
88            Err(e) => {
89                let mut err = state
90                    .diagnostics
91                    .last_refresh_error
92                    .lock()
93                    .unwrap_or_else(std::sync::PoisonError::into_inner);
94                *err = Some(format!("watcher recv error: {e:?}"));
95            }
96        }
97    }
98
99    state
100        .diagnostics
101        .watcher_running
102        .store(false, Ordering::Relaxed);
103    Ok(())
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109
110    #[test]
111    fn spawn_watcher_sets_watcher_running_false_initially() {
112        let vault_path = PathBuf::from("/tmp/vault");
113        let db_path = PathBuf::from("/tmp/vault.db");
114        let config = crate::config::default_config_for_vault(vault_path.clone());
115        let config_state = crate::mcp::state::ConfigState {
116            config,
117            config_path: None,
118            vault_path,
119            db_path,
120        };
121        let state = McpServerState::new(config_state);
122        // Verify watcher_running starts as false
123        assert!(
124            !state
125                .diagnostics
126                .watcher_running
127                .load(std::sync::atomic::Ordering::Relaxed),
128            "expected watcher_running to start as false"
129        );
130    }
131}