talon_cli/mcp/background/
watcher.rs1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3use std::time::Duration;
4
5use color_eyre::eyre::Result;
6use notify_debouncer_mini::{DebounceEventResult, new_debouncer};
7
8use crate::config::RefreshLockPolicy;
9use crate::mcp::state::McpServerState;
10
11const DEBOUNCE_SECS: u64 = 60;
12
13pub fn spawn_watcher(vault_path: PathBuf, state: Arc<McpServerState>) {
23 let _ = std::thread::Builder::new()
24 .name("talon-vault-watcher".to_owned())
25 .spawn(move || {
26 if let Err(e) = run_watcher(&vault_path, &state) {
27 let mut err = state
28 .diagnostics
29 .last_refresh_error
30 .lock()
31 .unwrap_or_else(std::sync::PoisonError::into_inner);
32 *err = Some(format!("watcher error: {e:#}"));
33 }
34 });
35}
36
37fn run_watcher(vault_path: &Path, state: &Arc<McpServerState>) -> Result<()> {
38 use std::sync::atomic::Ordering;
39
40 let state_clone = Arc::clone(state);
41 let (tx, rx) = std::sync::mpsc::channel::<DebounceEventResult>();
42
43 let mut debouncer = new_debouncer(Duration::from_secs(DEBOUNCE_SECS), tx)?;
44 debouncer.watcher().watch(
45 vault_path,
46 notify_debouncer_mini::notify::RecursiveMode::Recursive,
47 )?;
48
49 state
50 .diagnostics
51 .watcher_running
52 .store(true, Ordering::Relaxed);
53
54 let mut conn = talon_core::open_database(&state_clone.config.db_path)?;
55
56 for result in rx {
57 match result {
58 Ok(events) => {
59 let has_md = events
61 .iter()
62 .any(|e| e.path.extension().is_some_and(|ext| ext == "md"));
63 if !has_md {
64 continue;
65 }
66 if let Err(e) = crate::config::refresh_index_if_needed(
68 &state_clone.config.config,
69 &mut conn,
70 true,
71 RefreshLockPolicy::SkipIfBusy,
72 ) {
73 let mut err = state_clone
74 .diagnostics
75 .last_refresh_error
76 .lock()
77 .unwrap_or_else(std::sync::PoisonError::into_inner);
78 *err = Some(format!("refresh error: {e:#}"));
79 } else {
80 let mut err = state_clone
81 .diagnostics
82 .last_refresh_error
83 .lock()
84 .unwrap_or_else(std::sync::PoisonError::into_inner);
85 *err = None;
86 }
87 }
88 Err(e) => {
89 let mut err = state
90 .diagnostics
91 .last_refresh_error
92 .lock()
93 .unwrap_or_else(std::sync::PoisonError::into_inner);
94 *err = Some(format!("watcher recv error: {e:?}"));
95 }
96 }
97 }
98
99 state
100 .diagnostics
101 .watcher_running
102 .store(false, Ordering::Relaxed);
103 Ok(())
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109
110 #[test]
111 fn spawn_watcher_sets_watcher_running_false_initially() {
112 let vault_path = PathBuf::from("/tmp/vault");
113 let db_path = PathBuf::from("/tmp/vault.db");
114 let config = crate::config::default_config_for_vault(vault_path.clone());
115 let config_state = crate::mcp::state::ConfigState {
116 config,
117 config_path: None,
118 vault_path,
119 db_path,
120 };
121 let state = McpServerState::new(config_state);
122 assert!(
124 !state
125 .diagnostics
126 .watcher_running
127 .load(std::sync::atomic::Ordering::Relaxed),
128 "expected watcher_running to start as false"
129 );
130 }
131}