1use crate::event::{ConfigScope, DataEvent, EventBus};
6use crate::store::DataStore;
7use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::mpsc;
12use tracing::{debug, error, info, trace};
13
14#[derive(Debug, Clone)]
16pub struct WatcherConfig {
17 pub debounce_delay: Duration,
19
20 pub max_debounce_delay: Duration,
22
23 pub burst_threshold: u32,
25}
26
27impl Default for WatcherConfig {
28 fn default() -> Self {
29 Self {
30 debounce_delay: Duration::from_millis(500),
31 max_debounce_delay: Duration::from_secs(3),
32 burst_threshold: 10,
33 }
34 }
35}
36
37pub struct FileWatcher {
39 _watcher: RecommendedWatcher,
41
42 shutdown_tx: mpsc::Sender<()>,
44}
45
46impl FileWatcher {
47 pub async fn start(
49 claude_home: PathBuf,
50 project_path: Option<PathBuf>,
51 store: Arc<DataStore>,
52 config: WatcherConfig,
53 ) -> Result<Self, notify::Error> {
54 let (event_tx, mut event_rx) = mpsc::channel::<notify::Result<Event>>(100);
55 let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
56
57 let watcher = RecommendedWatcher::new(
59 move |res| {
60 let _ = event_tx.blocking_send(res);
61 },
62 Config::default().with_poll_interval(Duration::from_secs(2)),
63 )?;
64
65 let mut file_watcher = Self {
66 _watcher: watcher,
67 shutdown_tx,
68 };
69
70 file_watcher.watch_path(&claude_home, RecursiveMode::NonRecursive)?;
73
74 let projects_dir = claude_home.join("projects");
76 if projects_dir.exists() {
77 file_watcher.watch_path(&projects_dir, RecursiveMode::NonRecursive)?;
78
79 if let Ok(entries) = std::fs::read_dir(&projects_dir) {
81 for entry in entries.flatten() {
82 if entry.path().is_dir() {
83 let _ = file_watcher.watch_path(&entry.path(), RecursiveMode::NonRecursive);
85 }
86 }
87 }
88 }
89
90 let cache_dir = claude_home.join("cache");
92 if cache_dir.exists() {
93 file_watcher.watch_path(&cache_dir, RecursiveMode::NonRecursive)?;
94 }
95
96 if let Some(ref proj) = project_path {
98 let claude_dir = proj.join(".claude");
99 if claude_dir.exists() {
100 file_watcher.watch_path(&claude_dir, RecursiveMode::NonRecursive)?;
101
102 for subdir in ["agents", "commands", "skills", "hooks"].iter() {
104 let path = claude_dir.join(subdir);
105 if path.exists() {
106 let _ = file_watcher.watch_path(&path, RecursiveMode::Recursive);
107 }
108 }
109 }
110 }
111
112 info!(claude_home = %claude_home.display(), "File watcher started");
113
114 let event_bus = store.event_bus().clone();
116 tokio::spawn(async move {
117 let mut debounce_state = DebounceState::new(config);
118
119 loop {
120 tokio::select! {
121 Some(result) = event_rx.recv() => {
122 match result {
123 Ok(event) => {
124 if let Some((data_event, path)) = Self::process_event(&event, &claude_home, project_path.as_deref()) {
125 if debounce_state.should_emit(&data_event) {
126 debug!(?data_event, "Emitting file change event");
127 Self::handle_event(data_event, Some(&path), &store, &event_bus).await;
128 }
129 }
130 }
131 Err(e) => {
132 error!(error = %e, "File watcher error");
133 event_bus.publish(DataEvent::WatcherError(e.to_string()));
134 }
135 }
136 }
137 _ = shutdown_rx.recv() => {
138 info!("File watcher shutting down");
139 break;
140 }
141 }
142 }
143 });
144
145 Ok(file_watcher)
146 }
147
148 fn watch_path(&mut self, path: &Path, mode: RecursiveMode) -> Result<(), notify::Error> {
149 self._watcher.watch(path, mode)?;
150 debug!(path = %path.display(), "Watching path");
151 Ok(())
152 }
153
154 fn process_event(
156 event: &Event,
157 claude_home: &Path,
158 project_path: Option<&Path>,
159 ) -> Option<(DataEvent, PathBuf)> {
160 match event.kind {
162 EventKind::Create(_) | EventKind::Modify(_) => {}
163 _ => return None,
164 }
165
166 let path = event.paths.first()?;
167 let path_str = path.to_string_lossy();
168
169 trace!(path = %path_str, "Processing file event");
170
171 if path
173 .file_name()
174 .map(|n| n == "stats-cache.json")
175 .unwrap_or(false)
176 {
177 return Some((DataEvent::StatsUpdated, path.clone()));
178 }
179
180 if path.extension().map(|e| e == "jsonl").unwrap_or(false) && path_str.contains("projects")
182 {
183 let session_id = path
184 .file_stem()
185 .and_then(|s| s.to_str())
186 .unwrap_or("unknown")
187 .to_string();
188
189 return Some((DataEvent::SessionUpdated(session_id.into()), path.clone()));
190 }
191
192 if *path == claude_home.join("settings.json") {
194 return Some((DataEvent::ConfigChanged(ConfigScope::Global), path.clone()));
195 }
196
197 if let Some(proj) = project_path {
199 if *path == proj.join(".claude").join("settings.json") {
200 return Some((
201 DataEvent::ConfigChanged(ConfigScope::Project(
202 proj.to_string_lossy().to_string(),
203 )),
204 path.clone(),
205 ));
206 }
207 if *path == proj.join(".claude").join("settings.local.json") {
208 return Some((
209 DataEvent::ConfigChanged(ConfigScope::Local(
210 proj.to_string_lossy().to_string(),
211 )),
212 path.clone(),
213 ));
214 }
215 }
216
217 if path
219 .file_name()
220 .map(|n| n == "claude_desktop_config.json")
221 .unwrap_or(false)
222 {
223 return Some((DataEvent::ConfigChanged(ConfigScope::Mcp), path.clone()));
224 }
225
226 None
227 }
228
229 async fn handle_event(
231 event: DataEvent,
232 path: Option<&Path>,
233 store: &DataStore,
234 event_bus: &EventBus,
235 ) {
236 match &event {
237 DataEvent::StatsUpdated => {
238 store.reload_stats().await;
239 }
240 DataEvent::SessionUpdated(_id) | DataEvent::SessionCreated(_id) => {
241 if let Some(p) = path {
243 store.update_session(p).await;
244 }
245 }
246 DataEvent::ConfigChanged(_scope) => {
247 store.reload_settings().await;
249 }
250 _ => {}
251 }
252
253 event_bus.publish(event);
254 }
255
256 pub async fn stop(&self) {
258 let _ = self.shutdown_tx.send(()).await;
259 }
260}
261
262struct DebounceState {
264 config: WatcherConfig,
265 last_events: std::collections::HashMap<String, std::time::Instant>,
266 event_count_window: std::collections::VecDeque<std::time::Instant>,
267}
268
269impl DebounceState {
270 fn new(config: WatcherConfig) -> Self {
271 Self {
272 config,
273 last_events: std::collections::HashMap::new(),
274 event_count_window: std::collections::VecDeque::new(),
275 }
276 }
277
278 fn should_emit(&mut self, event: &DataEvent) -> bool {
279 let now = std::time::Instant::now();
280 let key = Self::event_key(event);
281
282 self.event_count_window.push_back(now);
284 while self
285 .event_count_window
286 .front()
287 .map(|t| now.duration_since(*t) > Duration::from_secs(1))
288 .unwrap_or(false)
289 {
290 self.event_count_window.pop_front();
291 }
292
293 let delay = if self.event_count_window.len() as u32 > self.config.burst_threshold {
295 self.config.max_debounce_delay
296 } else {
297 self.config.debounce_delay
298 };
299
300 if let Some(last) = self.last_events.get(&key) {
302 if now.duration_since(*last) < delay {
303 trace!(key = %key, "Debouncing event");
304 return false;
305 }
306 }
307
308 self.last_events.insert(key, now);
309 true
310 }
311
312 fn event_key(event: &DataEvent) -> String {
313 match event {
314 DataEvent::StatsUpdated => "stats".to_string(),
315 DataEvent::SessionCreated(id) | DataEvent::SessionUpdated(id) => {
316 format!("session:{}", id)
317 }
318 DataEvent::ConfigChanged(scope) => format!("config:{:?}", scope),
319 DataEvent::AnalyticsUpdated => "analytics".to_string(),
320 DataEvent::LoadCompleted => "load".to_string(),
321 DataEvent::WatcherError(_) => "error".to_string(),
322 }
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329
330 #[test]
331 fn test_debounce_state_basic() {
332 let config = WatcherConfig {
333 debounce_delay: Duration::from_millis(100),
334 max_debounce_delay: Duration::from_millis(500),
335 burst_threshold: 5,
336 };
337 let mut state = DebounceState::new(config);
338
339 assert!(state.should_emit(&DataEvent::StatsUpdated));
341
342 assert!(!state.should_emit(&DataEvent::StatsUpdated));
344
345 assert!(state.should_emit(&DataEvent::SessionUpdated("test".into())));
347 }
348
349 #[test]
350 fn test_process_event_stats() {
351 let claude_home = PathBuf::from("/home/user/.claude");
352 let event = Event {
353 kind: EventKind::Modify(notify::event::ModifyKind::Data(
354 notify::event::DataChange::Content,
355 )),
356 paths: vec![PathBuf::from("/home/user/.claude/stats-cache.json")],
357 ..Default::default()
358 };
359
360 let result = FileWatcher::process_event(&event, &claude_home, None);
361 assert!(matches!(result, Some((DataEvent::StatsUpdated, _))));
362 }
363
364 #[test]
365 fn test_process_event_session() {
366 let claude_home = PathBuf::from("/home/user/.claude");
367 let event = Event {
368 kind: EventKind::Modify(notify::event::ModifyKind::Data(
369 notify::event::DataChange::Content,
370 )),
371 paths: vec![PathBuf::from(
372 "/home/user/.claude/projects/-test/abc123.jsonl",
373 )],
374 ..Default::default()
375 };
376
377 let result = FileWatcher::process_event(&event, &claude_home, None);
378 assert!(matches!(result, Some((DataEvent::SessionUpdated(id), _)) if id == "abc123"));
379 }
380}