1use crate::event::{ConfigScope, DataEvent, EventBus};
6use crate::store::DataStore;
7use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::mpsc;
12use tracing::{debug, error, info, trace};
13
14#[derive(Debug, Clone)]
16pub struct WatcherConfig {
17 pub debounce_delay: Duration,
19
20 pub max_debounce_delay: Duration,
22
23 pub burst_threshold: u32,
25
26 pub extra_watch_paths: Vec<PathBuf>,
28}
29
30impl Default for WatcherConfig {
31 fn default() -> Self {
32 let extra_watch_paths = dirs::home_dir()
34 .map(|h| {
35 let ccboard_dir = h.join(".ccboard");
36 if ccboard_dir.exists() {
37 vec![ccboard_dir]
38 } else {
39 vec![]
40 }
41 })
42 .unwrap_or_default();
43
44 Self {
45 debounce_delay: Duration::from_millis(500),
46 max_debounce_delay: Duration::from_secs(3),
47 burst_threshold: 10,
48 extra_watch_paths,
49 }
50 }
51}
52
53pub struct FileWatcher {
55 _watcher: RecommendedWatcher,
57
58 shutdown_tx: mpsc::Sender<()>,
60}
61
62impl FileWatcher {
63 pub async fn start(
65 claude_home: PathBuf,
66 project_path: Option<PathBuf>,
67 store: Arc<DataStore>,
68 config: WatcherConfig,
69 ) -> Result<Self, notify::Error> {
70 let (event_tx, mut event_rx) = mpsc::channel::<notify::Result<Event>>(100);
71 let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
72
73 let watcher = RecommendedWatcher::new(
75 move |res| {
76 let _ = event_tx.blocking_send(res);
77 },
78 Config::default().with_poll_interval(Duration::from_secs(2)),
79 )?;
80
81 let mut file_watcher = Self {
82 _watcher: watcher,
83 shutdown_tx,
84 };
85
86 file_watcher.watch_path(&claude_home, RecursiveMode::NonRecursive)?;
89
90 let projects_dir = claude_home.join("projects");
92 if projects_dir.exists() {
93 file_watcher.watch_path(&projects_dir, RecursiveMode::NonRecursive)?;
94
95 if let Ok(entries) = std::fs::read_dir(&projects_dir) {
97 for entry in entries.flatten() {
98 if entry.path().is_dir() {
99 let _ = file_watcher.watch_path(&entry.path(), RecursiveMode::NonRecursive);
101 }
102 }
103 }
104 }
105
106 let cache_dir = claude_home.join("cache");
108 if cache_dir.exists() {
109 file_watcher.watch_path(&cache_dir, RecursiveMode::NonRecursive)?;
110 }
111
112 if let Some(ref proj) = project_path {
114 let claude_dir = proj.join(".claude");
115 if claude_dir.exists() {
116 file_watcher.watch_path(&claude_dir, RecursiveMode::NonRecursive)?;
117
118 for subdir in ["agents", "commands", "skills", "hooks"].iter() {
120 let path = claude_dir.join(subdir);
121 if path.exists() {
122 let _ = file_watcher.watch_path(&path, RecursiveMode::Recursive);
123 }
124 }
125 }
126 }
127
128 for extra_path in &config.extra_watch_paths {
130 if extra_path.exists() {
131 let _ = file_watcher.watch_path(extra_path, RecursiveMode::NonRecursive);
132 debug!(path = %extra_path.display(), "Watching extra path for live sessions");
133 }
134 }
135
136 info!(claude_home = %claude_home.display(), "File watcher started");
137
138 let event_bus = store.event_bus().clone();
140 tokio::spawn(async move {
141 let mut debounce_state = DebounceState::new(config);
142
143 loop {
144 tokio::select! {
145 Some(result) = event_rx.recv() => {
146 match result {
147 Ok(event) => {
148 if let Some((data_event, path)) = Self::process_event(&event, &claude_home, project_path.as_deref()) {
149 if debounce_state.should_emit(&data_event) {
150 debug!(?data_event, "Emitting file change event");
151 Self::handle_event(data_event, Some(&path), &store, &event_bus).await;
152 }
153 }
154 }
155 Err(e) => {
156 error!(error = %e, "File watcher error");
157 event_bus.publish(DataEvent::WatcherError(e.to_string()));
158 }
159 }
160 }
161 _ = shutdown_rx.recv() => {
162 info!("File watcher shutting down");
163 break;
164 }
165 }
166 }
167 });
168
169 Ok(file_watcher)
170 }
171
172 fn watch_path(&mut self, path: &Path, mode: RecursiveMode) -> Result<(), notify::Error> {
173 self._watcher.watch(path, mode)?;
174 debug!(path = %path.display(), "Watching path");
175 Ok(())
176 }
177
178 fn process_event(
180 event: &Event,
181 claude_home: &Path,
182 project_path: Option<&Path>,
183 ) -> Option<(DataEvent, PathBuf)> {
184 match event.kind {
186 EventKind::Create(_) | EventKind::Modify(_) => {}
187 _ => return None,
188 }
189
190 let path = event.paths.first()?;
191 let path_str = path.to_string_lossy();
192
193 trace!(path = %path_str, "Processing file event");
194
195 if path
197 .file_name()
198 .map(|n| n == "stats-cache.json")
199 .unwrap_or(false)
200 {
201 return Some((DataEvent::StatsUpdated, path.clone()));
202 }
203
204 if path
206 .file_name()
207 .map(|n| n == "live-sessions.json")
208 .unwrap_or(false)
209 {
210 return Some((DataEvent::LiveSessionStatusChanged, path.clone()));
211 }
212
213 if path.extension().map(|e| e == "jsonl").unwrap_or(false) && path_str.contains("projects")
215 {
216 let session_id = path
217 .file_stem()
218 .and_then(|s| s.to_str())
219 .unwrap_or("unknown")
220 .to_string();
221
222 return Some((DataEvent::SessionUpdated(session_id.into()), path.clone()));
223 }
224
225 if *path == claude_home.join("settings.json") {
227 return Some((DataEvent::ConfigChanged(ConfigScope::Global), path.clone()));
228 }
229
230 if let Some(proj) = project_path {
232 if *path == proj.join(".claude").join("settings.json") {
233 return Some((
234 DataEvent::ConfigChanged(ConfigScope::Project(
235 proj.to_string_lossy().to_string(),
236 )),
237 path.clone(),
238 ));
239 }
240 if *path == proj.join(".claude").join("settings.local.json") {
241 return Some((
242 DataEvent::ConfigChanged(ConfigScope::Local(
243 proj.to_string_lossy().to_string(),
244 )),
245 path.clone(),
246 ));
247 }
248 }
249
250 if path
252 .file_name()
253 .map(|n| n == "claude_desktop_config.json")
254 .unwrap_or(false)
255 {
256 return Some((DataEvent::ConfigChanged(ConfigScope::Mcp), path.clone()));
257 }
258
259 None
260 }
261
262 async fn handle_event(
264 event: DataEvent,
265 path: Option<&Path>,
266 store: &DataStore,
267 event_bus: &EventBus,
268 ) {
269 match &event {
270 DataEvent::StatsUpdated => {
271 store.reload_stats().await;
272 }
273 DataEvent::SessionUpdated(_id) | DataEvent::SessionCreated(_id) => {
274 if let Some(p) = path {
276 store.update_session(p).await;
277 }
278 }
279 DataEvent::ConfigChanged(_scope) => {
280 store.reload_settings().await;
282 }
283 DataEvent::LiveSessionStatusChanged => {
284 if let Some(p) = path {
286 store.reload_live_hook_sessions(p).await;
287 }
288 }
289 _ => {}
290 }
291
292 event_bus.publish(event);
293 }
294
295 pub async fn stop(&self) {
297 let _ = self.shutdown_tx.send(()).await;
298 }
299}
300
301struct DebounceState {
303 config: WatcherConfig,
304 last_events: std::collections::HashMap<String, std::time::Instant>,
305 event_count_window: std::collections::VecDeque<std::time::Instant>,
306}
307
308impl DebounceState {
309 fn new(config: WatcherConfig) -> Self {
310 Self {
311 config,
312 last_events: std::collections::HashMap::new(),
313 event_count_window: std::collections::VecDeque::new(),
314 }
315 }
316
317 fn should_emit(&mut self, event: &DataEvent) -> bool {
318 let now = std::time::Instant::now();
319 let key = Self::event_key(event);
320
321 self.event_count_window.push_back(now);
323 while self
324 .event_count_window
325 .front()
326 .map(|t| now.duration_since(*t) > Duration::from_secs(1))
327 .unwrap_or(false)
328 {
329 self.event_count_window.pop_front();
330 }
331
332 let delay = if self.event_count_window.len() as u32 > self.config.burst_threshold {
334 self.config.max_debounce_delay
335 } else {
336 self.config.debounce_delay
337 };
338
339 if let Some(last) = self.last_events.get(&key) {
341 if now.duration_since(*last) < delay {
342 trace!(key = %key, "Debouncing event");
343 return false;
344 }
345 }
346
347 self.last_events.insert(key, now);
348 true
349 }
350
351 fn event_key(event: &DataEvent) -> String {
352 match event {
353 DataEvent::StatsUpdated => "stats".to_string(),
354 DataEvent::SessionCreated(id) | DataEvent::SessionUpdated(id) => {
355 format!("session:{}", id)
356 }
357 DataEvent::ConfigChanged(scope) => format!("config:{:?}", scope),
358 DataEvent::AnalyticsUpdated => "analytics".to_string(),
359 DataEvent::LoadCompleted => "load".to_string(),
360 DataEvent::WatcherError(_) => "error".to_string(),
361 DataEvent::LiveSessionStatusChanged => "live_sessions".to_string(),
362 }
363 }
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369
370 #[test]
371 fn test_debounce_state_basic() {
372 let config = WatcherConfig {
373 debounce_delay: Duration::from_millis(100),
374 max_debounce_delay: Duration::from_millis(500),
375 burst_threshold: 5,
376 extra_watch_paths: vec![],
377 };
378 let mut state = DebounceState::new(config);
379
380 assert!(state.should_emit(&DataEvent::StatsUpdated));
382
383 assert!(!state.should_emit(&DataEvent::StatsUpdated));
385
386 assert!(state.should_emit(&DataEvent::SessionUpdated("test".into())));
388 }
389
390 #[test]
391 fn test_process_event_stats() {
392 let claude_home = PathBuf::from("/home/user/.claude");
393 let event = Event {
394 kind: EventKind::Modify(notify::event::ModifyKind::Data(
395 notify::event::DataChange::Content,
396 )),
397 paths: vec![PathBuf::from("/home/user/.claude/stats-cache.json")],
398 ..Default::default()
399 };
400
401 let result = FileWatcher::process_event(&event, &claude_home, None);
402 assert!(matches!(result, Some((DataEvent::StatsUpdated, _))));
403 }
404
405 #[test]
406 fn test_process_event_session() {
407 let claude_home = PathBuf::from("/home/user/.claude");
408 let event = Event {
409 kind: EventKind::Modify(notify::event::ModifyKind::Data(
410 notify::event::DataChange::Content,
411 )),
412 paths: vec![PathBuf::from(
413 "/home/user/.claude/projects/-test/abc123.jsonl",
414 )],
415 ..Default::default()
416 };
417
418 let result = FileWatcher::process_event(&event, &claude_home, None);
419 assert!(matches!(result, Some((DataEvent::SessionUpdated(id), _)) if id == "abc123"));
420 }
421}