Skip to main content

ccboard_core/
watcher.rs

1//! File watcher for Claude Code data changes
2//!
3//! Uses notify with adaptive debouncing for efficient change detection.
4
5use crate::event::{ConfigScope, DataEvent, EventBus};
6use crate::store::DataStore;
7use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::mpsc;
12use tracing::{debug, error, info, trace};
13
14/// Configuration for the file watcher
15#[derive(Debug, Clone)]
16pub struct WatcherConfig {
17    /// Base debounce delay
18    pub debounce_delay: Duration,
19
20    /// Maximum debounce delay during burst
21    pub max_debounce_delay: Duration,
22
23    /// Burst detection threshold (events per second)
24    pub burst_threshold: u32,
25}
26
27impl Default for WatcherConfig {
28    fn default() -> Self {
29        Self {
30            debounce_delay: Duration::from_millis(500),
31            max_debounce_delay: Duration::from_secs(3),
32            burst_threshold: 10,
33        }
34    }
35}
36
37/// File watcher that monitors Claude Code directories
38pub struct FileWatcher {
39    /// Notify watcher instance
40    _watcher: RecommendedWatcher,
41
42    /// Shutdown signal
43    shutdown_tx: mpsc::Sender<()>,
44}
45
46impl FileWatcher {
47    /// Start watching Claude Code directories
48    pub async fn start(
49        claude_home: PathBuf,
50        project_path: Option<PathBuf>,
51        store: Arc<DataStore>,
52        config: WatcherConfig,
53    ) -> Result<Self, notify::Error> {
54        let (event_tx, mut event_rx) = mpsc::channel::<notify::Result<Event>>(100);
55        let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
56
57        // Create watcher
58        let watcher = RecommendedWatcher::new(
59            move |res| {
60                let _ = event_tx.blocking_send(res);
61            },
62            Config::default().with_poll_interval(Duration::from_secs(2)),
63        )?;
64
65        let mut file_watcher = Self {
66            _watcher: watcher,
67            shutdown_tx,
68        };
69
70        // Watch paths - non-recursive for performance (26k+ files in ~/.claude)
71        // Watch top-level files (stats-cache.json, settings.json)
72        file_watcher.watch_path(&claude_home, RecursiveMode::NonRecursive)?;
73
74        // Watch projects directory non-recursively to detect new projects
75        let projects_dir = claude_home.join("projects");
76        if projects_dir.exists() {
77            file_watcher.watch_path(&projects_dir, RecursiveMode::NonRecursive)?;
78
79            // Watch each project directory non-recursively to detect new sessions
80            if let Ok(entries) = std::fs::read_dir(&projects_dir) {
81                for entry in entries.flatten() {
82                    if entry.path().is_dir() {
83                        // Watch project directory non-recursively (only .jsonl files at this level)
84                        let _ = file_watcher.watch_path(&entry.path(), RecursiveMode::NonRecursive);
85                    }
86                }
87            }
88        }
89
90        // Watch cache directory for SQLite changes
91        let cache_dir = claude_home.join("cache");
92        if cache_dir.exists() {
93            file_watcher.watch_path(&cache_dir, RecursiveMode::NonRecursive)?;
94        }
95
96        // Watch project-specific .claude directory non-recursively
97        if let Some(ref proj) = project_path {
98            let claude_dir = proj.join(".claude");
99            if claude_dir.exists() {
100                file_watcher.watch_path(&claude_dir, RecursiveMode::NonRecursive)?;
101
102                // Watch subdirectories individually (agents, commands, skills, hooks)
103                for subdir in ["agents", "commands", "skills", "hooks"].iter() {
104                    let path = claude_dir.join(subdir);
105                    if path.exists() {
106                        let _ = file_watcher.watch_path(&path, RecursiveMode::Recursive);
107                    }
108                }
109            }
110        }
111
112        info!(claude_home = %claude_home.display(), "File watcher started");
113
114        // Spawn event processor
115        let event_bus = store.event_bus().clone();
116        tokio::spawn(async move {
117            let mut debounce_state = DebounceState::new(config);
118
119            loop {
120                tokio::select! {
121                    Some(result) = event_rx.recv() => {
122                        match result {
123                            Ok(event) => {
124                                if let Some((data_event, path)) = Self::process_event(&event, &claude_home, project_path.as_deref()) {
125                                    if debounce_state.should_emit(&data_event) {
126                                        debug!(?data_event, "Emitting file change event");
127                                        Self::handle_event(data_event, Some(&path), &store, &event_bus).await;
128                                    }
129                                }
130                            }
131                            Err(e) => {
132                                error!(error = %e, "File watcher error");
133                                event_bus.publish(DataEvent::WatcherError(e.to_string()));
134                            }
135                        }
136                    }
137                    _ = shutdown_rx.recv() => {
138                        info!("File watcher shutting down");
139                        break;
140                    }
141                }
142            }
143        });
144
145        Ok(file_watcher)
146    }
147
148    fn watch_path(&mut self, path: &Path, mode: RecursiveMode) -> Result<(), notify::Error> {
149        self._watcher.watch(path, mode)?;
150        debug!(path = %path.display(), "Watching path");
151        Ok(())
152    }
153
154    /// Process a notify event into a DataEvent with its path
155    fn process_event(
156        event: &Event,
157        claude_home: &Path,
158        project_path: Option<&Path>,
159    ) -> Option<(DataEvent, PathBuf)> {
160        // Only process create/modify events
161        match event.kind {
162            EventKind::Create(_) | EventKind::Modify(_) => {}
163            _ => return None,
164        }
165
166        let path = event.paths.first()?;
167        let path_str = path.to_string_lossy();
168
169        trace!(path = %path_str, "Processing file event");
170
171        // Stats cache
172        if path
173            .file_name()
174            .map(|n| n == "stats-cache.json")
175            .unwrap_or(false)
176        {
177            return Some((DataEvent::StatsUpdated, path.clone()));
178        }
179
180        // Session files
181        if path.extension().map(|e| e == "jsonl").unwrap_or(false) && path_str.contains("projects")
182        {
183            let session_id = path
184                .file_stem()
185                .and_then(|s| s.to_str())
186                .unwrap_or("unknown")
187                .to_string();
188
189            return Some((DataEvent::SessionUpdated(session_id.into()), path.clone()));
190        }
191
192        // Global settings
193        if *path == claude_home.join("settings.json") {
194            return Some((DataEvent::ConfigChanged(ConfigScope::Global), path.clone()));
195        }
196
197        // Project settings
198        if let Some(proj) = project_path {
199            if *path == proj.join(".claude").join("settings.json") {
200                return Some((
201                    DataEvent::ConfigChanged(ConfigScope::Project(
202                        proj.to_string_lossy().to_string(),
203                    )),
204                    path.clone(),
205                ));
206            }
207            if *path == proj.join(".claude").join("settings.local.json") {
208                return Some((
209                    DataEvent::ConfigChanged(ConfigScope::Local(
210                        proj.to_string_lossy().to_string(),
211                    )),
212                    path.clone(),
213                ));
214            }
215        }
216
217        // MCP config
218        if path
219            .file_name()
220            .map(|n| n == "claude_desktop_config.json")
221            .unwrap_or(false)
222        {
223            return Some((DataEvent::ConfigChanged(ConfigScope::Mcp), path.clone()));
224        }
225
226        None
227    }
228
229    /// Handle a data event by updating the store
230    async fn handle_event(
231        event: DataEvent,
232        path: Option<&Path>,
233        store: &DataStore,
234        event_bus: &EventBus,
235    ) {
236        match &event {
237            DataEvent::StatsUpdated => {
238                store.reload_stats().await;
239            }
240            DataEvent::SessionUpdated(_id) | DataEvent::SessionCreated(_id) => {
241                // Update session with path
242                if let Some(p) = path {
243                    store.update_session(p).await;
244                }
245            }
246            DataEvent::ConfigChanged(_scope) => {
247                // Reload settings
248                store.reload_settings().await;
249            }
250            _ => {}
251        }
252
253        event_bus.publish(event);
254    }
255
256    /// Stop the watcher
257    pub async fn stop(&self) {
258        let _ = self.shutdown_tx.send(()).await;
259    }
260}
261
262/// Debounce state for adaptive debouncing
263struct DebounceState {
264    config: WatcherConfig,
265    last_events: std::collections::HashMap<String, std::time::Instant>,
266    event_count_window: std::collections::VecDeque<std::time::Instant>,
267}
268
269impl DebounceState {
270    fn new(config: WatcherConfig) -> Self {
271        Self {
272            config,
273            last_events: std::collections::HashMap::new(),
274            event_count_window: std::collections::VecDeque::new(),
275        }
276    }
277
278    fn should_emit(&mut self, event: &DataEvent) -> bool {
279        let now = std::time::Instant::now();
280        let key = Self::event_key(event);
281
282        // Track event rate for burst detection
283        self.event_count_window.push_back(now);
284        while self
285            .event_count_window
286            .front()
287            .map(|t| now.duration_since(*t) > Duration::from_secs(1))
288            .unwrap_or(false)
289        {
290            self.event_count_window.pop_front();
291        }
292
293        // Calculate adaptive delay
294        let delay = if self.event_count_window.len() as u32 > self.config.burst_threshold {
295            self.config.max_debounce_delay
296        } else {
297            self.config.debounce_delay
298        };
299
300        // Check if enough time has passed
301        if let Some(last) = self.last_events.get(&key) {
302            if now.duration_since(*last) < delay {
303                trace!(key = %key, "Debouncing event");
304                return false;
305            }
306        }
307
308        self.last_events.insert(key, now);
309        true
310    }
311
312    fn event_key(event: &DataEvent) -> String {
313        match event {
314            DataEvent::StatsUpdated => "stats".to_string(),
315            DataEvent::SessionCreated(id) | DataEvent::SessionUpdated(id) => {
316                format!("session:{}", id)
317            }
318            DataEvent::ConfigChanged(scope) => format!("config:{:?}", scope),
319            DataEvent::AnalyticsUpdated => "analytics".to_string(),
320            DataEvent::LoadCompleted => "load".to_string(),
321            DataEvent::WatcherError(_) => "error".to_string(),
322        }
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329
330    #[test]
331    fn test_debounce_state_basic() {
332        let config = WatcherConfig {
333            debounce_delay: Duration::from_millis(100),
334            max_debounce_delay: Duration::from_millis(500),
335            burst_threshold: 5,
336        };
337        let mut state = DebounceState::new(config);
338
339        // First event should pass
340        assert!(state.should_emit(&DataEvent::StatsUpdated));
341
342        // Immediate second should be debounced
343        assert!(!state.should_emit(&DataEvent::StatsUpdated));
344
345        // Different event type should pass
346        assert!(state.should_emit(&DataEvent::SessionUpdated("test".into())));
347    }
348
349    #[test]
350    fn test_process_event_stats() {
351        let claude_home = PathBuf::from("/home/user/.claude");
352        let event = Event {
353            kind: EventKind::Modify(notify::event::ModifyKind::Data(
354                notify::event::DataChange::Content,
355            )),
356            paths: vec![PathBuf::from("/home/user/.claude/stats-cache.json")],
357            ..Default::default()
358        };
359
360        let result = FileWatcher::process_event(&event, &claude_home, None);
361        assert!(matches!(result, Some((DataEvent::StatsUpdated, _))));
362    }
363
364    #[test]
365    fn test_process_event_session() {
366        let claude_home = PathBuf::from("/home/user/.claude");
367        let event = Event {
368            kind: EventKind::Modify(notify::event::ModifyKind::Data(
369                notify::event::DataChange::Content,
370            )),
371            paths: vec![PathBuf::from(
372                "/home/user/.claude/projects/-test/abc123.jsonl",
373            )],
374            ..Default::default()
375        };
376
377        let result = FileWatcher::process_event(&event, &claude_home, None);
378        assert!(matches!(result, Some((DataEvent::SessionUpdated(id), _)) if id == "abc123"));
379    }
380}