Skip to main content

ccboard_core/
watcher.rs

1//! File watcher for Claude Code data changes
2//!
3//! Uses notify with adaptive debouncing for efficient change detection.
4
5use crate::event::{ConfigScope, DataEvent, EventBus};
6use crate::store::DataStore;
7use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::mpsc;
12use tracing::{debug, error, info, trace};
13
14/// Configuration for the file watcher
15#[derive(Debug, Clone)]
16pub struct WatcherConfig {
17    /// Base debounce delay
18    pub debounce_delay: Duration,
19
20    /// Maximum debounce delay during burst
21    pub max_debounce_delay: Duration,
22
23    /// Burst detection threshold (events per second)
24    pub burst_threshold: u32,
25}
26
27impl Default for WatcherConfig {
28    fn default() -> Self {
29        Self {
30            debounce_delay: Duration::from_millis(500),
31            max_debounce_delay: Duration::from_secs(3),
32            burst_threshold: 10,
33        }
34    }
35}
36
37/// File watcher that monitors Claude Code directories
38pub struct FileWatcher {
39    /// Notify watcher instance
40    _watcher: RecommendedWatcher,
41
42    /// Shutdown signal
43    shutdown_tx: mpsc::Sender<()>,
44}
45
46impl FileWatcher {
47    /// Start watching Claude Code directories
48    pub async fn start(
49        claude_home: PathBuf,
50        project_path: Option<PathBuf>,
51        store: Arc<DataStore>,
52        config: WatcherConfig,
53    ) -> Result<Self, notify::Error> {
54        let (event_tx, mut event_rx) = mpsc::channel::<notify::Result<Event>>(100);
55        let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
56
57        // Create watcher
58        let watcher = RecommendedWatcher::new(
59            move |res| {
60                let _ = event_tx.blocking_send(res);
61            },
62            Config::default().with_poll_interval(Duration::from_secs(2)),
63        )?;
64
65        let mut file_watcher = Self {
66            _watcher: watcher,
67            shutdown_tx,
68        };
69
70        // Watch paths
71        file_watcher.watch_path(&claude_home, RecursiveMode::Recursive)?;
72
73        if let Some(ref proj) = project_path {
74            let claude_dir = proj.join(".claude");
75            if claude_dir.exists() {
76                file_watcher.watch_path(&claude_dir, RecursiveMode::Recursive)?;
77            }
78        }
79
80        info!(claude_home = %claude_home.display(), "File watcher started");
81
82        // Spawn event processor
83        let event_bus = store.event_bus().clone();
84        tokio::spawn(async move {
85            let mut debounce_state = DebounceState::new(config);
86
87            loop {
88                tokio::select! {
89                    Some(result) = event_rx.recv() => {
90                        match result {
91                            Ok(event) => {
92                                if let Some((data_event, path)) = Self::process_event(&event, &claude_home, project_path.as_deref()) {
93                                    if debounce_state.should_emit(&data_event) {
94                                        debug!(?data_event, "Emitting file change event");
95                                        Self::handle_event(data_event, Some(&path), &store, &event_bus).await;
96                                    }
97                                }
98                            }
99                            Err(e) => {
100                                error!(error = %e, "File watcher error");
101                                event_bus.publish(DataEvent::WatcherError(e.to_string()));
102                            }
103                        }
104                    }
105                    _ = shutdown_rx.recv() => {
106                        info!("File watcher shutting down");
107                        break;
108                    }
109                }
110            }
111        });
112
113        Ok(file_watcher)
114    }
115
116    fn watch_path(&mut self, path: &Path, mode: RecursiveMode) -> Result<(), notify::Error> {
117        self._watcher.watch(path, mode)?;
118        debug!(path = %path.display(), "Watching path");
119        Ok(())
120    }
121
122    /// Process a notify event into a DataEvent with its path
123    fn process_event(
124        event: &Event,
125        claude_home: &Path,
126        project_path: Option<&Path>,
127    ) -> Option<(DataEvent, PathBuf)> {
128        // Only process create/modify events
129        match event.kind {
130            EventKind::Create(_) | EventKind::Modify(_) => {}
131            _ => return None,
132        }
133
134        let path = event.paths.first()?;
135        let path_str = path.to_string_lossy();
136
137        trace!(path = %path_str, "Processing file event");
138
139        // Stats cache
140        if path
141            .file_name()
142            .map(|n| n == "stats-cache.json")
143            .unwrap_or(false)
144        {
145            return Some((DataEvent::StatsUpdated, path.clone()));
146        }
147
148        // Session files
149        if path.extension().map(|e| e == "jsonl").unwrap_or(false) && path_str.contains("projects")
150        {
151            let session_id = path
152                .file_stem()
153                .and_then(|s| s.to_str())
154                .unwrap_or("unknown")
155                .to_string();
156
157            return Some((DataEvent::SessionUpdated(session_id.into()), path.clone()));
158        }
159
160        // Global settings
161        if *path == claude_home.join("settings.json") {
162            return Some((DataEvent::ConfigChanged(ConfigScope::Global), path.clone()));
163        }
164
165        // Project settings
166        if let Some(proj) = project_path {
167            if *path == proj.join(".claude").join("settings.json") {
168                return Some((
169                    DataEvent::ConfigChanged(ConfigScope::Project(
170                        proj.to_string_lossy().to_string(),
171                    )),
172                    path.clone(),
173                ));
174            }
175            if *path == proj.join(".claude").join("settings.local.json") {
176                return Some((
177                    DataEvent::ConfigChanged(ConfigScope::Local(
178                        proj.to_string_lossy().to_string(),
179                    )),
180                    path.clone(),
181                ));
182            }
183        }
184
185        // MCP config
186        if path
187            .file_name()
188            .map(|n| n == "claude_desktop_config.json")
189            .unwrap_or(false)
190        {
191            return Some((DataEvent::ConfigChanged(ConfigScope::Mcp), path.clone()));
192        }
193
194        None
195    }
196
197    /// Handle a data event by updating the store
198    async fn handle_event(
199        event: DataEvent,
200        path: Option<&Path>,
201        store: &DataStore,
202        event_bus: &EventBus,
203    ) {
204        match &event {
205            DataEvent::StatsUpdated => {
206                store.reload_stats().await;
207            }
208            DataEvent::SessionUpdated(_id) | DataEvent::SessionCreated(_id) => {
209                // Update session with path
210                if let Some(p) = path {
211                    store.update_session(p).await;
212                }
213            }
214            DataEvent::ConfigChanged(_scope) => {
215                // Reload settings
216                store.reload_settings().await;
217            }
218            _ => {}
219        }
220
221        event_bus.publish(event);
222    }
223
224    /// Stop the watcher
225    pub async fn stop(&self) {
226        let _ = self.shutdown_tx.send(()).await;
227    }
228}
229
230/// Debounce state for adaptive debouncing
231struct DebounceState {
232    config: WatcherConfig,
233    last_events: std::collections::HashMap<String, std::time::Instant>,
234    event_count_window: std::collections::VecDeque<std::time::Instant>,
235}
236
237impl DebounceState {
238    fn new(config: WatcherConfig) -> Self {
239        Self {
240            config,
241            last_events: std::collections::HashMap::new(),
242            event_count_window: std::collections::VecDeque::new(),
243        }
244    }
245
246    fn should_emit(&mut self, event: &DataEvent) -> bool {
247        let now = std::time::Instant::now();
248        let key = Self::event_key(event);
249
250        // Track event rate for burst detection
251        self.event_count_window.push_back(now);
252        while self
253            .event_count_window
254            .front()
255            .map(|t| now.duration_since(*t) > Duration::from_secs(1))
256            .unwrap_or(false)
257        {
258            self.event_count_window.pop_front();
259        }
260
261        // Calculate adaptive delay
262        let delay = if self.event_count_window.len() as u32 > self.config.burst_threshold {
263            self.config.max_debounce_delay
264        } else {
265            self.config.debounce_delay
266        };
267
268        // Check if enough time has passed
269        if let Some(last) = self.last_events.get(&key) {
270            if now.duration_since(*last) < delay {
271                trace!(key = %key, "Debouncing event");
272                return false;
273            }
274        }
275
276        self.last_events.insert(key, now);
277        true
278    }
279
280    fn event_key(event: &DataEvent) -> String {
281        match event {
282            DataEvent::StatsUpdated => "stats".to_string(),
283            DataEvent::SessionCreated(id) | DataEvent::SessionUpdated(id) => {
284                format!("session:{}", id)
285            }
286            DataEvent::ConfigChanged(scope) => format!("config:{:?}", scope),
287            DataEvent::AnalyticsUpdated => "analytics".to_string(),
288            DataEvent::LoadCompleted => "load".to_string(),
289            DataEvent::WatcherError(_) => "error".to_string(),
290        }
291    }
292}
293
294#[cfg(test)]
295mod tests {
296    use super::*;
297
298    #[test]
299    fn test_debounce_state_basic() {
300        let config = WatcherConfig {
301            debounce_delay: Duration::from_millis(100),
302            max_debounce_delay: Duration::from_millis(500),
303            burst_threshold: 5,
304        };
305        let mut state = DebounceState::new(config);
306
307        // First event should pass
308        assert!(state.should_emit(&DataEvent::StatsUpdated));
309
310        // Immediate second should be debounced
311        assert!(!state.should_emit(&DataEvent::StatsUpdated));
312
313        // Different event type should pass
314        assert!(state.should_emit(&DataEvent::SessionUpdated("test".into())));
315    }
316
317    #[test]
318    fn test_process_event_stats() {
319        let claude_home = PathBuf::from("/home/user/.claude");
320        let event = Event {
321            kind: EventKind::Modify(notify::event::ModifyKind::Data(
322                notify::event::DataChange::Content,
323            )),
324            paths: vec![PathBuf::from("/home/user/.claude/stats-cache.json")],
325            ..Default::default()
326        };
327
328        let result = FileWatcher::process_event(&event, &claude_home, None);
329        assert!(matches!(result, Some((DataEvent::StatsUpdated, _))));
330    }
331
332    #[test]
333    fn test_process_event_session() {
334        let claude_home = PathBuf::from("/home/user/.claude");
335        let event = Event {
336            kind: EventKind::Modify(notify::event::ModifyKind::Data(
337                notify::event::DataChange::Content,
338            )),
339            paths: vec![PathBuf::from(
340                "/home/user/.claude/projects/-test/abc123.jsonl",
341            )],
342            ..Default::default()
343        };
344
345        let result = FileWatcher::process_event(&event, &claude_home, None);
346        assert!(matches!(result, Some((DataEvent::SessionUpdated(id), _)) if id == "abc123"));
347    }
348}