lore_cli/daemon/
watcher.rs

1//! File system watcher for AI tool session files.
2//!
3//! Watches directories configured by the WatcherRegistry for new and modified
4//! session files. Performs incremental parsing to efficiently handle file
5//! updates without re-reading entire files.
6//!
7//! Currently supports:
8//! - Claude Code JSONL files in `~/.claude/projects/`
9
10use anyhow::{Context, Result};
11use notify::RecursiveMode;
12use notify_debouncer_mini::{new_debouncer, DebouncedEvent};
13use std::collections::HashMap;
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{mpsc, RwLock};
18
19use crate::capture::watchers::default_registry;
20use crate::storage::Database;
21
22use super::state::DaemonStats;
23
24/// Database path for creating connections within the watcher.
25/// rusqlite connections are not thread-safe, so we create a new
26/// connection when needed rather than sharing one across threads.
27#[derive(Clone)]
28pub struct DbConfig {
29    path: PathBuf,
30}
31
32impl DbConfig {
33    /// Creates a new DbConfig for the default database location.
34    pub fn default_config() -> Result<Self> {
35        let path = crate::storage::db::default_db_path()?;
36        Ok(Self { path })
37    }
38
39    /// Opens a new database connection.
40    pub fn open(&self) -> Result<Database> {
41        Database::open(&self.path)
42    }
43}
44
45/// Watches for session file changes and imports new messages.
46///
47/// Tracks the byte position in each file to enable incremental reading,
48/// avoiding the need to re-parse entire files on each modification.
49pub struct SessionWatcher {
50    /// Maps file paths to their last read position (byte offset).
51    file_positions: HashMap<PathBuf, u64>,
52    /// Directories to watch for session files.
53    watch_dirs: Vec<PathBuf>,
54    /// Database configuration for creating connections.
55    db_config: DbConfig,
56}
57
58impl SessionWatcher {
59    /// Creates a new SessionWatcher.
60    ///
61    /// Uses the default watcher registry to determine which directories
62    /// to watch for session files.
63    ///
64    /// # Errors
65    ///
66    /// Returns an error if the database configuration cannot be created.
67    pub fn new() -> Result<Self> {
68        let registry = default_registry();
69        let watch_dirs = registry.all_watch_paths();
70
71        let db_config = DbConfig::default_config()?;
72
73        Ok(Self {
74            file_positions: HashMap::new(),
75            watch_dirs,
76            db_config,
77        })
78    }
79
80    /// Returns the directories being watched.
81    ///
82    /// This method is part of the public API for status reporting
83    /// and may be used by CLI commands in the future.
84    #[allow(dead_code)]
85    pub fn watch_dirs(&self) -> &[PathBuf] {
86        &self.watch_dirs
87    }
88
89    /// Starts watching for file changes and processing them.
90    ///
91    /// This function runs until the shutdown signal is received.
92    ///
93    /// # Arguments
94    ///
95    /// * `stats` - Shared statistics to update on imports
96    /// * `shutdown_rx` - Receiver that signals when to stop watching
97    ///
98    /// # Errors
99    ///
100    /// Returns an error if the watcher cannot be created or started.
101    pub async fn watch(
102        &mut self,
103        stats: Arc<RwLock<DaemonStats>>,
104        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
105    ) -> Result<()> {
106        // Log which directories we will watch
107        for dir in &self.watch_dirs {
108            if dir.exists() {
109                tracing::info!("Will watch for session files in {:?}", dir);
110            } else {
111                tracing::info!("Watch directory does not exist yet: {:?}", dir);
112            }
113        }
114
115        // Create a channel for file events
116        let (tx, mut rx) = mpsc::channel::<Vec<DebouncedEvent>>(100);
117
118        // Create the debounced watcher
119        let mut debouncer = new_debouncer(
120            Duration::from_millis(500),
121            move |events: Result<Vec<DebouncedEvent>, notify::Error>| {
122                if let Ok(events) = events {
123                    // Filter for JSONL and SQLite database files
124                    let filtered: Vec<DebouncedEvent> = events
125                        .into_iter()
126                        .filter(|e| {
127                            let ext = e.path.extension().and_then(|ext| ext.to_str());
128                            matches!(ext, Some("jsonl") | Some("vscdb"))
129                        })
130                        .collect();
131
132                    if !filtered.is_empty() {
133                        let _ = tx.blocking_send(filtered);
134                    }
135                }
136            },
137        )
138        .context("Failed to create file watcher")?;
139
140        // Start watching directories that exist
141        for dir in &self.watch_dirs {
142            if dir.exists() {
143                debouncer
144                    .watcher()
145                    .watch(dir, RecursiveMode::Recursive)
146                    .context(format!("Failed to start watching directory {dir:?}"))?;
147
148                tracing::info!("Watching for session files in {:?}", dir);
149            }
150        }
151
152        // Do an initial scan (sync, before entering async loop)
153        self.initial_scan(&stats).await?;
154
155        // Process events
156        loop {
157            tokio::select! {
158                Some(events) = rx.recv() => {
159                    for event in events {
160                        if let Err(e) = self.handle_file_event(&event.path, &stats).await {
161                            let error_msg = e.to_string();
162                            // Database unavailable errors are transient (e.g., during lore init)
163                            // Log at debug level to avoid spam
164                            if error_msg.contains("unable to open database")
165                                || error_msg.contains("database is locked")
166                            {
167                                tracing::debug!(
168                                    "Database temporarily unavailable for {:?}: {}",
169                                    event.path,
170                                    e
171                                );
172                            } else {
173                                tracing::warn!(
174                                    "Error handling file event for {:?}: {}",
175                                    event.path,
176                                    e
177                                );
178                                let mut stats_guard = stats.write().await;
179                                stats_guard.errors += 1;
180                            }
181                        }
182                    }
183                }
184                _ = shutdown_rx.recv() => {
185                    tracing::info!("Session watcher shutting down");
186                    break;
187                }
188            }
189        }
190
191        Ok(())
192    }
193
194    /// Opens a database connection for this operation.
195    fn open_db(&self) -> Result<Database> {
196        self.db_config.open()
197    }
198
199    /// Performs an initial scan of existing session files.
200    ///
201    /// Called when the watcher starts to import any sessions that were
202    /// created while the daemon was not running. Uses the watcher registry
203    /// to find session sources from all available watchers.
204    async fn initial_scan(&mut self, stats: &Arc<RwLock<DaemonStats>>) -> Result<()> {
205        tracing::info!("Performing initial scan of session files...");
206
207        let registry = default_registry();
208        let mut total_files = 0;
209
210        for watcher in registry.available_watchers() {
211            let watcher_name = watcher.info().name;
212            match watcher.find_sources() {
213                Ok(sources) => {
214                    tracing::info!("Found {} sources for {}", sources.len(), watcher_name);
215                    total_files += sources.len();
216
217                    for path in sources {
218                        // Process each file synchronously to avoid Send issues
219                        match self.process_file_sync(&path) {
220                            Ok(Some((sessions_imported, messages_imported))) => {
221                                let mut stats_guard = stats.write().await;
222                                stats_guard.sessions_imported += sessions_imported;
223                                stats_guard.messages_imported += messages_imported;
224                                stats_guard.files_watched = self.file_positions.len();
225                            }
226                            Ok(None) => {
227                                // File was already imported, just track position
228                            }
229                            Err(e) => {
230                                tracing::warn!("Failed to import {:?}: {}", path, e);
231                                let mut stats_guard = stats.write().await;
232                                stats_guard.errors += 1;
233                            }
234                        }
235                    }
236                }
237                Err(e) => {
238                    tracing::warn!("Failed to find sources for {}: {}", watcher_name, e);
239                }
240            }
241        }
242
243        {
244            let mut stats_guard = stats.write().await;
245            stats_guard.files_watched = total_files;
246        }
247
248        Ok(())
249    }
250
251    /// Handles a file system event for a session file.
252    async fn handle_file_event(
253        &mut self,
254        path: &Path,
255        stats: &Arc<RwLock<DaemonStats>>,
256    ) -> Result<()> {
257        let ext = path.extension().and_then(|e| e.to_str());
258
259        // Skip files that are not session sources
260        if !matches!(ext, Some("jsonl") | Some("vscdb")) {
261            return Ok(());
262        }
263
264        // Skip agent files (Claude Code specific)
265        if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
266            if name.starts_with("agent-") {
267                return Ok(());
268            }
269        }
270
271        // Check if file exists (might be a delete event)
272        if !path.exists() {
273            // File was deleted, remove from tracking
274            self.file_positions.remove(path);
275            return Ok(());
276        }
277
278        // Process the file synchronously
279        match self.process_file_sync(path) {
280            Ok(Some((sessions_imported, messages_imported))) => {
281                let mut stats_guard = stats.write().await;
282                stats_guard.sessions_imported += sessions_imported;
283                stats_guard.messages_imported += messages_imported;
284                stats_guard.files_watched = self.file_positions.len();
285            }
286            Ok(None) => {
287                // File unchanged or already processed
288            }
289            Err(e) => {
290                return Err(e);
291            }
292        }
293
294        Ok(())
295    }
296
297    /// Processes a file synchronously, returning import counts if anything was imported.
298    ///
299    /// Returns `Ok(Some((sessions, messages)))` if data was imported,
300    /// `Ok(None)` if the file was already processed, or an error.
301    fn process_file_sync(&mut self, path: &Path) -> Result<Option<(u64, u64)>> {
302        let db = self.open_db()?;
303        let path_str = path.to_string_lossy();
304        let last_pos = self.file_positions.get(path).copied().unwrap_or(0);
305
306        // Get current file size
307        let metadata = std::fs::metadata(path).context("Failed to get file metadata")?;
308        let current_size = metadata.len();
309
310        if current_size <= last_pos {
311            // File hasn't grown (might have been truncated)
312            if current_size < last_pos {
313                // File was truncated, reset position
314                self.file_positions.insert(path.to_path_buf(), 0);
315            }
316            return Ok(None);
317        }
318
319        // Check if this is a new file we haven't seen
320        if db.session_exists_by_source(&path_str)? {
321            // Already imported, just update position
322            self.file_positions.insert(path.to_path_buf(), current_size);
323            return Ok(None);
324        }
325
326        // Import the file
327        let result = self.import_file_sync(path, &db)?;
328
329        // Update tracked position
330        self.file_positions.insert(path.to_path_buf(), current_size);
331
332        Ok(Some(result))
333    }
334
335    /// Imports a complete session file synchronously.
336    /// Returns (sessions_imported, messages_imported) counts.
337    ///
338    /// Uses the watcher registry to find the appropriate parser for the file type.
339    fn import_file_sync(&mut self, path: &Path, db: &Database) -> Result<(u64, u64)> {
340        tracing::debug!("Importing session file: {:?}", path);
341
342        let path_buf = path.to_path_buf();
343        let registry = default_registry();
344
345        // Try to parse with each watcher until one succeeds
346        let mut parsed_sessions = Vec::new();
347
348        for watcher in registry.available_watchers() {
349            match watcher.parse_source(&path_buf) {
350                Ok(sessions) if !sessions.is_empty() => {
351                    parsed_sessions = sessions;
352                    break;
353                }
354                Ok(_) => continue,
355                Err(e) => {
356                    tracing::debug!(
357                        "Watcher {} could not parse {:?}: {}",
358                        watcher.info().name,
359                        path,
360                        e
361                    );
362                }
363            }
364        }
365
366        if parsed_sessions.is_empty() {
367            tracing::debug!("No watcher could parse {:?}", path);
368            return Ok((0, 0));
369        }
370
371        let mut total_sessions = 0u64;
372        let mut total_messages = 0u64;
373
374        for (session, messages) in parsed_sessions {
375            if messages.is_empty() {
376                continue;
377            }
378
379            let message_count = messages.len();
380
381            // Store session
382            db.insert_session(&session)?;
383
384            // Store messages and track the most recent branch
385            let mut latest_branch: Option<String> = None;
386            for msg in &messages {
387                db.insert_message(msg)?;
388                // Track the branch from the most recent message that has one
389                if msg.git_branch.is_some() {
390                    latest_branch = msg.git_branch.clone();
391                }
392            }
393
394            // Update session branch if the latest message has a different branch
395            // This handles the case where the user switches branches mid-session
396            if let Some(ref new_branch) = latest_branch {
397                if session.git_branch.as_ref() != Some(new_branch) {
398                    if let Err(e) = db.update_session_branch(session.id, new_branch) {
399                        tracing::warn!(
400                            "Failed to update session branch for {}: {}",
401                            &session.id.to_string()[..8],
402                            e
403                        );
404                    } else {
405                        tracing::debug!(
406                            "Updated session {} branch to {}",
407                            &session.id.to_string()[..8],
408                            new_branch
409                        );
410                    }
411                }
412            }
413
414            tracing::info!(
415                "Imported session {} with {} messages from {:?}",
416                &session.id.to_string()[..8],
417                message_count,
418                path.file_name().unwrap_or_default()
419            );
420
421            total_sessions += 1;
422            total_messages += message_count as u64;
423        }
424
425        // Update file position
426        if let Ok(metadata) = std::fs::metadata(path) {
427            self.file_positions
428                .insert(path.to_path_buf(), metadata.len());
429        }
430
431        Ok((total_sessions, total_messages))
432    }
433
434    /// Returns the number of files currently being tracked.
435    ///
436    /// This method is part of the public API for status reporting
437    /// and is used by tests.
438    #[allow(dead_code)]
439    pub fn tracked_file_count(&self) -> usize {
440        self.file_positions.len()
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447
448    #[test]
449    fn test_session_watcher_creation() {
450        let watcher = SessionWatcher::new();
451        assert!(watcher.is_ok(), "Should create watcher successfully");
452
453        // SessionWatcher creation should succeed even if no watchers are
454        // available (e.g., in CI environments where ~/.claude doesn't exist).
455        // The watch_dirs list may be empty in such environments.
456        let _watcher = watcher.unwrap();
457    }
458
459    #[test]
460    fn test_watch_dirs_from_registry() {
461        use crate::capture::watchers::default_registry;
462
463        // Test that the registry is configured with known watcher paths.
464        // Note: SessionWatcher.watch_dirs() only includes paths from AVAILABLE
465        // watchers. In CI environments, no watchers may be available because
466        // their directories (like ~/.claude) don't exist.
467
468        // Instead of testing through SessionWatcher, we verify the registry
469        // directly by checking all_watchers() (not just available ones).
470        let registry = default_registry();
471        let all_watchers = registry.all_watchers();
472
473        // Collect watch paths from ALL watchers (including unavailable ones)
474        let all_paths: Vec<_> = all_watchers.iter().flat_map(|w| w.watch_paths()).collect();
475
476        let has_claude = all_paths
477            .iter()
478            .any(|d| d.to_string_lossy().contains(".claude"));
479        let has_cursor = all_paths
480            .iter()
481            .any(|d| d.to_string_lossy().contains("Cursor"));
482
483        // The registry should have paths configured for known watchers.
484        assert!(
485            has_claude || has_cursor,
486            "Registry should configure at least one known watcher path pattern \
487             (expected .claude or Cursor in paths). Found paths: {all_paths:?}"
488        );
489    }
490
491    #[test]
492    fn test_tracked_file_count_initial() {
493        let watcher = SessionWatcher::new().unwrap();
494        assert_eq!(
495            watcher.tracked_file_count(),
496            0,
497            "Should start with no tracked files"
498        );
499    }
500
501    #[test]
502    fn test_db_config_creation() {
503        let config = DbConfig::default_config();
504        assert!(config.is_ok(), "Should create DbConfig successfully");
505    }
506
507    #[test]
508    fn test_file_position_tracking() {
509        let mut watcher = SessionWatcher::new().unwrap();
510
511        let path1 = PathBuf::from("/test/file1.jsonl");
512        let path2 = PathBuf::from("/test/file2.jsonl");
513
514        watcher.file_positions.insert(path1.clone(), 100);
515        watcher.file_positions.insert(path2.clone(), 200);
516
517        assert_eq!(watcher.tracked_file_count(), 2);
518        assert_eq!(watcher.file_positions.get(&path1), Some(&100));
519        assert_eq!(watcher.file_positions.get(&path2), Some(&200));
520    }
521}