lore_cli/daemon/
watcher.rs

1//! File system watcher for AI tool session files.
2//!
3//! Watches directories configured by the WatcherRegistry for new and modified
4//! session files. Performs incremental parsing to efficiently handle file
5//! updates without re-reading entire files.
6//!
7//! Currently supports:
8//! - Claude Code JSONL files in `~/.claude/projects/`
9
10use anyhow::{Context, Result};
11use notify::RecursiveMode;
12use notify_debouncer_mini::{new_debouncer, DebouncedEvent};
13use std::collections::HashMap;
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{mpsc, RwLock};
18
19use crate::capture::watchers::default_registry;
20use crate::storage::Database;
21
22use super::state::DaemonStats;
23
24/// Database path for creating connections within the watcher.
25/// rusqlite connections are not thread-safe, so we create a new
26/// connection when needed rather than sharing one across threads.
27#[derive(Clone)]
28pub struct DbConfig {
29    path: PathBuf,
30}
31
32impl DbConfig {
33    /// Creates a new DbConfig for the default database location.
34    pub fn default_config() -> Result<Self> {
35        let path = crate::storage::db::default_db_path()?;
36        Ok(Self { path })
37    }
38
39    /// Opens a new database connection.
40    pub fn open(&self) -> Result<Database> {
41        Database::open(&self.path)
42    }
43}
44
45/// Watches for session file changes and imports new messages.
46///
47/// Tracks the byte position in each file to enable incremental reading,
48/// avoiding the need to re-parse entire files on each modification.
49pub struct SessionWatcher {
50    /// Maps file paths to their last read position (byte offset).
51    file_positions: HashMap<PathBuf, u64>,
52    /// Directories to watch for session files.
53    watch_dirs: Vec<PathBuf>,
54    /// Database configuration for creating connections.
55    db_config: DbConfig,
56}
57
58impl SessionWatcher {
59    /// Creates a new SessionWatcher.
60    ///
61    /// Uses the default watcher registry to determine which directories
62    /// to watch for session files.
63    ///
64    /// # Errors
65    ///
66    /// Returns an error if the database configuration cannot be created.
67    pub fn new() -> Result<Self> {
68        let registry = default_registry();
69        let watch_dirs = registry.all_watch_paths();
70
71        let db_config = DbConfig::default_config()?;
72
73        Ok(Self {
74            file_positions: HashMap::new(),
75            watch_dirs,
76            db_config,
77        })
78    }
79
80    /// Returns the directories being watched.
81    ///
82    /// This method is part of the public API for status reporting
83    /// and may be used by CLI commands in the future.
84    #[allow(dead_code)]
85    pub fn watch_dirs(&self) -> &[PathBuf] {
86        &self.watch_dirs
87    }
88
89    /// Starts watching for file changes and processing them.
90    ///
91    /// This function runs until the shutdown signal is received.
92    ///
93    /// # Arguments
94    ///
95    /// * `stats` - Shared statistics to update on imports
96    /// * `shutdown_rx` - Receiver that signals when to stop watching
97    ///
98    /// # Errors
99    ///
100    /// Returns an error if the watcher cannot be created or started.
101    pub async fn watch(
102        &mut self,
103        stats: Arc<RwLock<DaemonStats>>,
104        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
105    ) -> Result<()> {
106        // Log which directories we will watch
107        for dir in &self.watch_dirs {
108            if dir.exists() {
109                tracing::info!("Will watch for session files in {:?}", dir);
110            } else {
111                tracing::info!("Watch directory does not exist yet: {:?}", dir);
112            }
113        }
114
115        // Create a channel for file events
116        let (tx, mut rx) = mpsc::channel::<Vec<DebouncedEvent>>(100);
117
118        // Create the debounced watcher
119        let mut debouncer = new_debouncer(
120            Duration::from_millis(500),
121            move |events: Result<Vec<DebouncedEvent>, notify::Error>| {
122                if let Ok(events) = events {
123                    // Filter for JSONL and SQLite database files
124                    let filtered: Vec<DebouncedEvent> = events
125                        .into_iter()
126                        .filter(|e| {
127                            let ext = e.path.extension().and_then(|ext| ext.to_str());
128                            matches!(ext, Some("jsonl") | Some("vscdb"))
129                        })
130                        .collect();
131
132                    if !filtered.is_empty() {
133                        let _ = tx.blocking_send(filtered);
134                    }
135                }
136            },
137        )
138        .context("Failed to create file watcher")?;
139
140        // Start watching directories that exist
141        for dir in &self.watch_dirs {
142            if dir.exists() {
143                debouncer
144                    .watcher()
145                    .watch(dir, RecursiveMode::Recursive)
146                    .context(format!("Failed to start watching directory {dir:?}"))?;
147
148                tracing::info!("Watching for session files in {:?}", dir);
149            }
150        }
151
152        // Do an initial scan (sync, before entering async loop)
153        self.initial_scan(&stats).await?;
154
155        // Process events
156        loop {
157            tokio::select! {
158                Some(events) = rx.recv() => {
159                    for event in events {
160                        if let Err(e) = self.handle_file_event(&event.path, &stats).await {
161                            tracing::warn!("Error handling file event for {:?}: {}", event.path, e);
162                            let mut stats_guard = stats.write().await;
163                            stats_guard.errors += 1;
164                        }
165                    }
166                }
167                _ = shutdown_rx.recv() => {
168                    tracing::info!("Session watcher shutting down");
169                    break;
170                }
171            }
172        }
173
174        Ok(())
175    }
176
177    /// Opens a database connection for this operation.
178    fn open_db(&self) -> Result<Database> {
179        self.db_config.open()
180    }
181
182    /// Performs an initial scan of existing session files.
183    ///
184    /// Called when the watcher starts to import any sessions that were
185    /// created while the daemon was not running. Uses the watcher registry
186    /// to find session sources from all available watchers.
187    async fn initial_scan(&mut self, stats: &Arc<RwLock<DaemonStats>>) -> Result<()> {
188        tracing::info!("Performing initial scan of session files...");
189
190        let registry = default_registry();
191        let mut total_files = 0;
192
193        for watcher in registry.available_watchers() {
194            let watcher_name = watcher.info().name;
195            match watcher.find_sources() {
196                Ok(sources) => {
197                    tracing::info!("Found {} sources for {}", sources.len(), watcher_name);
198                    total_files += sources.len();
199
200                    for path in sources {
201                        // Process each file synchronously to avoid Send issues
202                        match self.process_file_sync(&path) {
203                            Ok(Some((sessions_imported, messages_imported))) => {
204                                let mut stats_guard = stats.write().await;
205                                stats_guard.sessions_imported += sessions_imported;
206                                stats_guard.messages_imported += messages_imported;
207                                stats_guard.files_watched = self.file_positions.len();
208                            }
209                            Ok(None) => {
210                                // File was already imported, just track position
211                            }
212                            Err(e) => {
213                                tracing::warn!("Failed to import {:?}: {}", path, e);
214                                let mut stats_guard = stats.write().await;
215                                stats_guard.errors += 1;
216                            }
217                        }
218                    }
219                }
220                Err(e) => {
221                    tracing::warn!("Failed to find sources for {}: {}", watcher_name, e);
222                }
223            }
224        }
225
226        {
227            let mut stats_guard = stats.write().await;
228            stats_guard.files_watched = total_files;
229        }
230
231        Ok(())
232    }
233
234    /// Handles a file system event for a session file.
235    async fn handle_file_event(
236        &mut self,
237        path: &Path,
238        stats: &Arc<RwLock<DaemonStats>>,
239    ) -> Result<()> {
240        let ext = path.extension().and_then(|e| e.to_str());
241
242        // Skip files that are not session sources
243        if !matches!(ext, Some("jsonl") | Some("vscdb")) {
244            return Ok(());
245        }
246
247        // Skip agent files (Claude Code specific)
248        if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
249            if name.starts_with("agent-") {
250                return Ok(());
251            }
252        }
253
254        // Check if file exists (might be a delete event)
255        if !path.exists() {
256            // File was deleted, remove from tracking
257            self.file_positions.remove(path);
258            return Ok(());
259        }
260
261        // Process the file synchronously
262        match self.process_file_sync(path) {
263            Ok(Some((sessions_imported, messages_imported))) => {
264                let mut stats_guard = stats.write().await;
265                stats_guard.sessions_imported += sessions_imported;
266                stats_guard.messages_imported += messages_imported;
267                stats_guard.files_watched = self.file_positions.len();
268            }
269            Ok(None) => {
270                // File unchanged or already processed
271            }
272            Err(e) => {
273                return Err(e);
274            }
275        }
276
277        Ok(())
278    }
279
280    /// Processes a file synchronously, returning import counts if anything was imported.
281    ///
282    /// Returns `Ok(Some((sessions, messages)))` if data was imported,
283    /// `Ok(None)` if the file was already processed, or an error.
284    fn process_file_sync(&mut self, path: &Path) -> Result<Option<(u64, u64)>> {
285        let db = self.open_db()?;
286        let path_str = path.to_string_lossy();
287        let last_pos = self.file_positions.get(path).copied().unwrap_or(0);
288
289        // Get current file size
290        let metadata = std::fs::metadata(path).context("Failed to get file metadata")?;
291        let current_size = metadata.len();
292
293        if current_size <= last_pos {
294            // File hasn't grown (might have been truncated)
295            if current_size < last_pos {
296                // File was truncated, reset position
297                self.file_positions.insert(path.to_path_buf(), 0);
298            }
299            return Ok(None);
300        }
301
302        // Check if this is a new file we haven't seen
303        if db.session_exists_by_source(&path_str)? {
304            // Already imported, just update position
305            self.file_positions.insert(path.to_path_buf(), current_size);
306            return Ok(None);
307        }
308
309        // Import the file
310        let result = self.import_file_sync(path, &db)?;
311
312        // Update tracked position
313        self.file_positions.insert(path.to_path_buf(), current_size);
314
315        Ok(Some(result))
316    }
317
318    /// Imports a complete session file synchronously.
319    /// Returns (sessions_imported, messages_imported) counts.
320    ///
321    /// Uses the watcher registry to find the appropriate parser for the file type.
322    fn import_file_sync(&mut self, path: &Path, db: &Database) -> Result<(u64, u64)> {
323        tracing::debug!("Importing session file: {:?}", path);
324
325        let path_buf = path.to_path_buf();
326        let registry = default_registry();
327
328        // Try to parse with each watcher until one succeeds
329        let mut parsed_sessions = Vec::new();
330
331        for watcher in registry.available_watchers() {
332            match watcher.parse_source(&path_buf) {
333                Ok(sessions) if !sessions.is_empty() => {
334                    parsed_sessions = sessions;
335                    break;
336                }
337                Ok(_) => continue,
338                Err(e) => {
339                    tracing::debug!(
340                        "Watcher {} could not parse {:?}: {}",
341                        watcher.info().name,
342                        path,
343                        e
344                    );
345                }
346            }
347        }
348
349        if parsed_sessions.is_empty() {
350            tracing::debug!("No watcher could parse {:?}", path);
351            return Ok((0, 0));
352        }
353
354        let mut total_sessions = 0u64;
355        let mut total_messages = 0u64;
356
357        for (session, messages) in parsed_sessions {
358            if messages.is_empty() {
359                continue;
360            }
361
362            let message_count = messages.len();
363
364            // Store session
365            db.insert_session(&session)?;
366
367            // Store messages and track the most recent branch
368            let mut latest_branch: Option<String> = None;
369            for msg in &messages {
370                db.insert_message(msg)?;
371                // Track the branch from the most recent message that has one
372                if msg.git_branch.is_some() {
373                    latest_branch = msg.git_branch.clone();
374                }
375            }
376
377            // Update session branch if the latest message has a different branch
378            // This handles the case where the user switches branches mid-session
379            if let Some(ref new_branch) = latest_branch {
380                if session.git_branch.as_ref() != Some(new_branch) {
381                    if let Err(e) = db.update_session_branch(session.id, new_branch) {
382                        tracing::warn!(
383                            "Failed to update session branch for {}: {}",
384                            &session.id.to_string()[..8],
385                            e
386                        );
387                    } else {
388                        tracing::debug!(
389                            "Updated session {} branch to {}",
390                            &session.id.to_string()[..8],
391                            new_branch
392                        );
393                    }
394                }
395            }
396
397            tracing::info!(
398                "Imported session {} with {} messages from {:?}",
399                &session.id.to_string()[..8],
400                message_count,
401                path.file_name().unwrap_or_default()
402            );
403
404            total_sessions += 1;
405            total_messages += message_count as u64;
406        }
407
408        // Update file position
409        if let Ok(metadata) = std::fs::metadata(path) {
410            self.file_positions
411                .insert(path.to_path_buf(), metadata.len());
412        }
413
414        Ok((total_sessions, total_messages))
415    }
416
417    /// Returns the number of files currently being tracked.
418    ///
419    /// This method is part of the public API for status reporting
420    /// and is used by tests.
421    #[allow(dead_code)]
422    pub fn tracked_file_count(&self) -> usize {
423        self.file_positions.len()
424    }
425}
426
427#[cfg(test)]
428mod tests {
429    use super::*;
430
431    #[test]
432    fn test_session_watcher_creation() {
433        let watcher = SessionWatcher::new();
434        assert!(watcher.is_ok(), "Should create watcher successfully");
435
436        // SessionWatcher creation should succeed even if no watchers are
437        // available (e.g., in CI environments where ~/.claude doesn't exist).
438        // The watch_dirs list may be empty in such environments.
439        let _watcher = watcher.unwrap();
440    }
441
442    #[test]
443    fn test_watch_dirs_from_registry() {
444        use crate::capture::watchers::default_registry;
445
446        // Test that the registry is configured with known watcher paths.
447        // Note: SessionWatcher.watch_dirs() only includes paths from AVAILABLE
448        // watchers. In CI environments, no watchers may be available because
449        // their directories (like ~/.claude) don't exist.
450
451        // Instead of testing through SessionWatcher, we verify the registry
452        // directly by checking all_watchers() (not just available ones).
453        let registry = default_registry();
454        let all_watchers = registry.all_watchers();
455
456        // Collect watch paths from ALL watchers (including unavailable ones)
457        let all_paths: Vec<_> = all_watchers.iter().flat_map(|w| w.watch_paths()).collect();
458
459        let has_claude = all_paths
460            .iter()
461            .any(|d| d.to_string_lossy().contains(".claude"));
462        let has_cursor = all_paths
463            .iter()
464            .any(|d| d.to_string_lossy().contains("Cursor"));
465
466        // The registry should have paths configured for known watchers.
467        assert!(
468            has_claude || has_cursor,
469            "Registry should configure at least one known watcher path pattern \
470             (expected .claude or Cursor in paths). Found paths: {all_paths:?}"
471        );
472    }
473
474    #[test]
475    fn test_tracked_file_count_initial() {
476        let watcher = SessionWatcher::new().unwrap();
477        assert_eq!(
478            watcher.tracked_file_count(),
479            0,
480            "Should start with no tracked files"
481        );
482    }
483
484    #[test]
485    fn test_db_config_creation() {
486        let config = DbConfig::default_config();
487        assert!(config.is_ok(), "Should create DbConfig successfully");
488    }
489
490    #[test]
491    fn test_file_position_tracking() {
492        let mut watcher = SessionWatcher::new().unwrap();
493
494        let path1 = PathBuf::from("/test/file1.jsonl");
495        let path2 = PathBuf::from("/test/file2.jsonl");
496
497        watcher.file_positions.insert(path1.clone(), 100);
498        watcher.file_positions.insert(path2.clone(), 200);
499
500        assert_eq!(watcher.tracked_file_count(), 2);
501        assert_eq!(watcher.file_positions.get(&path1), Some(&100));
502        assert_eq!(watcher.file_positions.get(&path2), Some(&200));
503    }
504}