Skip to main content

ccboard_core/
watcher.rs

1//! File watcher for Claude Code data changes
2//!
3//! Uses notify with adaptive debouncing for efficient change detection.
4
5use crate::event::{ConfigScope, DataEvent, EventBus};
6use crate::store::DataStore;
7use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::mpsc;
12use tracing::{debug, error, info, trace};
13
14/// Configuration for the file watcher
15#[derive(Debug, Clone)]
16pub struct WatcherConfig {
17    /// Base debounce delay
18    pub debounce_delay: Duration,
19
20    /// Maximum debounce delay during burst
21    pub max_debounce_delay: Duration,
22
23    /// Burst detection threshold (events per second)
24    pub burst_threshold: u32,
25
26    /// Additional paths to watch (e.g. ~/.ccboard/ for live-sessions.json)
27    pub extra_watch_paths: Vec<PathBuf>,
28}
29
30impl Default for WatcherConfig {
31    fn default() -> Self {
32        // Auto-include ~/.ccboard/ for live session monitoring
33        let extra_watch_paths = dirs::home_dir()
34            .map(|h| {
35                let ccboard_dir = h.join(".ccboard");
36                if ccboard_dir.exists() {
37                    vec![ccboard_dir]
38                } else {
39                    vec![]
40                }
41            })
42            .unwrap_or_default();
43
44        Self {
45            debounce_delay: Duration::from_millis(500),
46            max_debounce_delay: Duration::from_secs(3),
47            burst_threshold: 10,
48            extra_watch_paths,
49        }
50    }
51}
52
53/// File watcher that monitors Claude Code directories
54pub struct FileWatcher {
55    /// Notify watcher instance
56    _watcher: RecommendedWatcher,
57
58    /// Shutdown signal
59    shutdown_tx: mpsc::Sender<()>,
60}
61
62impl FileWatcher {
63    /// Start watching Claude Code directories
64    pub async fn start(
65        claude_home: PathBuf,
66        project_path: Option<PathBuf>,
67        store: Arc<DataStore>,
68        config: WatcherConfig,
69    ) -> Result<Self, notify::Error> {
70        let (event_tx, mut event_rx) = mpsc::channel::<notify::Result<Event>>(100);
71        let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
72
73        // Create watcher
74        let watcher = RecommendedWatcher::new(
75            move |res| {
76                let _ = event_tx.blocking_send(res);
77            },
78            Config::default().with_poll_interval(Duration::from_secs(2)),
79        )?;
80
81        let mut file_watcher = Self {
82            _watcher: watcher,
83            shutdown_tx,
84        };
85
86        // Watch paths - non-recursive for performance (26k+ files in ~/.claude)
87        // Watch top-level files (stats-cache.json, settings.json)
88        file_watcher.watch_path(&claude_home, RecursiveMode::NonRecursive)?;
89
90        // Watch projects directory non-recursively to detect new projects
91        let projects_dir = claude_home.join("projects");
92        if projects_dir.exists() {
93            file_watcher.watch_path(&projects_dir, RecursiveMode::NonRecursive)?;
94
95            // Watch each project directory non-recursively to detect new sessions
96            if let Ok(entries) = std::fs::read_dir(&projects_dir) {
97                for entry in entries.flatten() {
98                    if entry.path().is_dir() {
99                        // Watch project directory non-recursively (only .jsonl files at this level)
100                        let _ = file_watcher.watch_path(&entry.path(), RecursiveMode::NonRecursive);
101                    }
102                }
103            }
104        }
105
106        // Watch cache directory for SQLite changes
107        let cache_dir = claude_home.join("cache");
108        if cache_dir.exists() {
109            file_watcher.watch_path(&cache_dir, RecursiveMode::NonRecursive)?;
110        }
111
112        // Watch project-specific .claude directory non-recursively
113        if let Some(ref proj) = project_path {
114            let claude_dir = proj.join(".claude");
115            if claude_dir.exists() {
116                file_watcher.watch_path(&claude_dir, RecursiveMode::NonRecursive)?;
117
118                // Watch subdirectories individually (agents, commands, skills, hooks)
119                for subdir in ["agents", "commands", "skills", "hooks"].iter() {
120                    let path = claude_dir.join(subdir);
121                    if path.exists() {
122                        let _ = file_watcher.watch_path(&path, RecursiveMode::Recursive);
123                    }
124                }
125            }
126        }
127
128        // Watch extra paths (e.g. ~/.ccboard/ for live-sessions.json)
129        for extra_path in &config.extra_watch_paths {
130            if extra_path.exists() {
131                let _ = file_watcher.watch_path(extra_path, RecursiveMode::NonRecursive);
132                debug!(path = %extra_path.display(), "Watching extra path for live sessions");
133            }
134        }
135
136        info!(claude_home = %claude_home.display(), "File watcher started");
137
138        // Spawn event processor
139        let event_bus = store.event_bus().clone();
140        tokio::spawn(async move {
141            let mut debounce_state = DebounceState::new(config);
142
143            loop {
144                tokio::select! {
145                    Some(result) = event_rx.recv() => {
146                        match result {
147                            Ok(event) => {
148                                if let Some((data_event, path)) = Self::process_event(&event, &claude_home, project_path.as_deref()) {
149                                    if debounce_state.should_emit(&data_event) {
150                                        debug!(?data_event, "Emitting file change event");
151                                        Self::handle_event(data_event, Some(&path), &store, &event_bus).await;
152                                    }
153                                }
154                            }
155                            Err(e) => {
156                                error!(error = %e, "File watcher error");
157                                event_bus.publish(DataEvent::WatcherError(e.to_string()));
158                            }
159                        }
160                    }
161                    _ = shutdown_rx.recv() => {
162                        info!("File watcher shutting down");
163                        break;
164                    }
165                }
166            }
167        });
168
169        Ok(file_watcher)
170    }
171
172    fn watch_path(&mut self, path: &Path, mode: RecursiveMode) -> Result<(), notify::Error> {
173        self._watcher.watch(path, mode)?;
174        debug!(path = %path.display(), "Watching path");
175        Ok(())
176    }
177
178    /// Process a notify event into a DataEvent with its path
179    fn process_event(
180        event: &Event,
181        claude_home: &Path,
182        project_path: Option<&Path>,
183    ) -> Option<(DataEvent, PathBuf)> {
184        // Only process create/modify events
185        match event.kind {
186            EventKind::Create(_) | EventKind::Modify(_) => {}
187            _ => return None,
188        }
189
190        let path = event.paths.first()?;
191        let path_str = path.to_string_lossy();
192
193        trace!(path = %path_str, "Processing file event");
194
195        // Stats cache
196        if path
197            .file_name()
198            .map(|n| n == "stats-cache.json")
199            .unwrap_or(false)
200        {
201            return Some((DataEvent::StatsUpdated, path.clone()));
202        }
203
204        // Live sessions hook file (~/.ccboard/live-sessions.json)
205        if path
206            .file_name()
207            .map(|n| n == "live-sessions.json")
208            .unwrap_or(false)
209        {
210            return Some((DataEvent::LiveSessionStatusChanged, path.clone()));
211        }
212
213        // Session files
214        if path.extension().map(|e| e == "jsonl").unwrap_or(false) && path_str.contains("projects")
215        {
216            let session_id = path
217                .file_stem()
218                .and_then(|s| s.to_str())
219                .unwrap_or("unknown")
220                .to_string();
221
222            return Some((DataEvent::SessionUpdated(session_id.into()), path.clone()));
223        }
224
225        // Global settings
226        if *path == claude_home.join("settings.json") {
227            return Some((DataEvent::ConfigChanged(ConfigScope::Global), path.clone()));
228        }
229
230        // Project settings
231        if let Some(proj) = project_path {
232            if *path == proj.join(".claude").join("settings.json") {
233                return Some((
234                    DataEvent::ConfigChanged(ConfigScope::Project(
235                        proj.to_string_lossy().to_string(),
236                    )),
237                    path.clone(),
238                ));
239            }
240            if *path == proj.join(".claude").join("settings.local.json") {
241                return Some((
242                    DataEvent::ConfigChanged(ConfigScope::Local(
243                        proj.to_string_lossy().to_string(),
244                    )),
245                    path.clone(),
246                ));
247            }
248        }
249
250        // MCP config
251        if path
252            .file_name()
253            .map(|n| n == "claude_desktop_config.json")
254            .unwrap_or(false)
255        {
256            return Some((DataEvent::ConfigChanged(ConfigScope::Mcp), path.clone()));
257        }
258
259        None
260    }
261
262    /// Handle a data event by updating the store
263    async fn handle_event(
264        event: DataEvent,
265        path: Option<&Path>,
266        store: &DataStore,
267        event_bus: &EventBus,
268    ) {
269        match &event {
270            DataEvent::StatsUpdated => {
271                store.reload_stats().await;
272            }
273            DataEvent::SessionUpdated(_id) | DataEvent::SessionCreated(_id) => {
274                // Update session with path
275                if let Some(p) = path {
276                    store.update_session(p).await;
277                }
278            }
279            DataEvent::ConfigChanged(_scope) => {
280                // Reload settings
281                store.reload_settings().await;
282            }
283            DataEvent::LiveSessionStatusChanged => {
284                // Reload live session hook state from disk
285                if let Some(p) = path {
286                    store.reload_live_hook_sessions(p).await;
287                }
288            }
289            _ => {}
290        }
291
292        event_bus.publish(event);
293    }
294
295    /// Stop the watcher
296    pub async fn stop(&self) {
297        let _ = self.shutdown_tx.send(()).await;
298    }
299}
300
301/// Debounce state for adaptive debouncing
302struct DebounceState {
303    config: WatcherConfig,
304    last_events: std::collections::HashMap<String, std::time::Instant>,
305    event_count_window: std::collections::VecDeque<std::time::Instant>,
306}
307
308impl DebounceState {
309    fn new(config: WatcherConfig) -> Self {
310        Self {
311            config,
312            last_events: std::collections::HashMap::new(),
313            event_count_window: std::collections::VecDeque::new(),
314        }
315    }
316
317    fn should_emit(&mut self, event: &DataEvent) -> bool {
318        let now = std::time::Instant::now();
319        let key = Self::event_key(event);
320
321        // Track event rate for burst detection
322        self.event_count_window.push_back(now);
323        while self
324            .event_count_window
325            .front()
326            .map(|t| now.duration_since(*t) > Duration::from_secs(1))
327            .unwrap_or(false)
328        {
329            self.event_count_window.pop_front();
330        }
331
332        // Calculate adaptive delay
333        let delay = if self.event_count_window.len() as u32 > self.config.burst_threshold {
334            self.config.max_debounce_delay
335        } else {
336            self.config.debounce_delay
337        };
338
339        // Check if enough time has passed
340        if let Some(last) = self.last_events.get(&key) {
341            if now.duration_since(*last) < delay {
342                trace!(key = %key, "Debouncing event");
343                return false;
344            }
345        }
346
347        self.last_events.insert(key, now);
348        true
349    }
350
351    fn event_key(event: &DataEvent) -> String {
352        match event {
353            DataEvent::StatsUpdated => "stats".to_string(),
354            DataEvent::SessionCreated(id) | DataEvent::SessionUpdated(id) => {
355                format!("session:{}", id)
356            }
357            DataEvent::ConfigChanged(scope) => format!("config:{:?}", scope),
358            DataEvent::AnalyticsUpdated => "analytics".to_string(),
359            DataEvent::LoadCompleted => "load".to_string(),
360            DataEvent::WatcherError(_) => "error".to_string(),
361            DataEvent::LiveSessionStatusChanged => "live_sessions".to_string(),
362        }
363    }
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369
370    #[test]
371    fn test_debounce_state_basic() {
372        let config = WatcherConfig {
373            debounce_delay: Duration::from_millis(100),
374            max_debounce_delay: Duration::from_millis(500),
375            burst_threshold: 5,
376            extra_watch_paths: vec![],
377        };
378        let mut state = DebounceState::new(config);
379
380        // First event should pass
381        assert!(state.should_emit(&DataEvent::StatsUpdated));
382
383        // Immediate second should be debounced
384        assert!(!state.should_emit(&DataEvent::StatsUpdated));
385
386        // Different event type should pass
387        assert!(state.should_emit(&DataEvent::SessionUpdated("test".into())));
388    }
389
390    #[test]
391    fn test_process_event_stats() {
392        let claude_home = PathBuf::from("/home/user/.claude");
393        let event = Event {
394            kind: EventKind::Modify(notify::event::ModifyKind::Data(
395                notify::event::DataChange::Content,
396            )),
397            paths: vec![PathBuf::from("/home/user/.claude/stats-cache.json")],
398            ..Default::default()
399        };
400
401        let result = FileWatcher::process_event(&event, &claude_home, None);
402        assert!(matches!(result, Some((DataEvent::StatsUpdated, _))));
403    }
404
405    #[test]
406    fn test_process_event_session() {
407        let claude_home = PathBuf::from("/home/user/.claude");
408        let event = Event {
409            kind: EventKind::Modify(notify::event::ModifyKind::Data(
410                notify::event::DataChange::Content,
411            )),
412            paths: vec![PathBuf::from(
413                "/home/user/.claude/projects/-test/abc123.jsonl",
414            )],
415            ..Default::default()
416        };
417
418        let result = FileWatcher::process_event(&event, &claude_home, None);
419        assert!(matches!(result, Some((DataEvent::SessionUpdated(id), _)) if id == "abc123"));
420    }
421}