Skip to main content

lore_cli/daemon/
watcher.rs

1//! File system watcher for AI tool session files.
2//!
3//! Watches directories configured by the WatcherRegistry for new and modified
4//! session files. Performs incremental parsing to efficiently handle file
5//! updates without re-reading entire files.
6//!
7//! Currently supports:
8//! - Claude Code JSONL files in `~/.claude/projects/`
9
10use anyhow::{Context, Result};
11use chrono::Utc;
12use notify::RecursiveMode;
13use notify_debouncer_mini::{new_debouncer, DebouncedEvent};
14use std::collections::HashMap;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::{mpsc, RwLock};
19use uuid::Uuid;
20
21use crate::capture::watchers::{default_registry, Watcher};
22use crate::git::get_commits_in_time_range;
23use crate::storage::models::{LinkCreator, LinkType, SessionLink};
24use crate::storage::Database;
25
26use super::state::DaemonStats;
27
28/// Database path for creating connections within the watcher.
29/// rusqlite connections are not thread-safe, so we create a new
30/// connection when needed rather than sharing one across threads.
31#[derive(Clone)]
32pub struct DbConfig {
33    path: PathBuf,
34}
35
36impl DbConfig {
37    /// Creates a new DbConfig for the default database location.
38    pub fn default_config() -> Result<Self> {
39        let path = crate::storage::db::default_db_path()?;
40        Ok(Self { path })
41    }
42
43    /// Opens a new database connection.
44    pub fn open(&self) -> Result<Database> {
45        Database::open(&self.path)
46    }
47}
48
49/// Watches for session file changes and imports new messages.
50///
51/// Tracks the byte position in each file to enable incremental reading,
52/// avoiding the need to re-parse entire files on each modification.
53pub struct SessionWatcher {
54    /// Maps file paths to their last read position (byte offset).
55    file_positions: HashMap<PathBuf, u64>,
56    /// Directories to watch for session files.
57    watch_dirs: Vec<PathBuf>,
58    /// Database configuration for creating connections.
59    db_config: DbConfig,
60}
61
62impl SessionWatcher {
63    /// Creates a new SessionWatcher.
64    ///
65    /// Uses the default watcher registry to determine which directories
66    /// to watch for session files.
67    ///
68    /// # Errors
69    ///
70    /// Returns an error if the database configuration cannot be created.
71    pub fn new() -> Result<Self> {
72        let registry = default_registry();
73        let watch_dirs = registry.all_watch_paths();
74
75        let db_config = DbConfig::default_config()?;
76
77        Ok(Self {
78            file_positions: HashMap::new(),
79            watch_dirs,
80            db_config,
81        })
82    }
83
84    /// Returns the directories being watched.
85    ///
86    /// This method is part of the public API for status reporting
87    /// and may be used by CLI commands in the future.
88    #[allow(dead_code)]
89    pub fn watch_dirs(&self) -> &[PathBuf] {
90        &self.watch_dirs
91    }
92
93    /// Starts watching for file changes and processing them.
94    ///
95    /// This function runs until the shutdown signal is received.
96    ///
97    /// # Arguments
98    ///
99    /// * `stats` - Shared statistics to update on imports
100    /// * `shutdown_rx` - Receiver that signals when to stop watching
101    ///
102    /// # Errors
103    ///
104    /// Returns an error if the watcher cannot be created or started.
105    pub async fn watch(
106        &mut self,
107        stats: Arc<RwLock<DaemonStats>>,
108        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
109    ) -> Result<()> {
110        // Log which directories we will watch
111        for dir in &self.watch_dirs {
112            if dir.exists() {
113                tracing::info!("Will watch for session files in {:?}", dir);
114            } else {
115                tracing::info!("Watch directory does not exist yet: {:?}", dir);
116            }
117        }
118
119        // Create a channel for file events
120        let (tx, mut rx) = mpsc::channel::<Vec<DebouncedEvent>>(100);
121
122        // Create the debounced watcher
123        let mut debouncer = new_debouncer(
124            Duration::from_millis(500),
125            move |events: Result<Vec<DebouncedEvent>, notify::Error>| {
126                if let Ok(events) = events {
127                    // Filter for JSONL and SQLite database files
128                    let filtered: Vec<DebouncedEvent> = events
129                        .into_iter()
130                        .filter(|e| {
131                            let ext = e.path.extension().and_then(|ext| ext.to_str());
132                            matches!(ext, Some("jsonl") | Some("vscdb"))
133                        })
134                        .collect();
135
136                    if !filtered.is_empty() {
137                        let _ = tx.blocking_send(filtered);
138                    }
139                }
140            },
141        )
142        .context("Failed to create file watcher")?;
143
144        // Start watching directories that exist
145        for dir in &self.watch_dirs {
146            if dir.exists() {
147                debouncer
148                    .watcher()
149                    .watch(dir, RecursiveMode::Recursive)
150                    .context(format!("Failed to start watching directory {dir:?}"))?;
151
152                tracing::info!("Watching for session files in {:?}", dir);
153            }
154        }
155
156        // Do an initial scan (sync, before entering async loop)
157        self.initial_scan(&stats).await?;
158
159        // Process events
160        loop {
161            tokio::select! {
162                Some(events) = rx.recv() => {
163                    for event in events {
164                        if let Err(e) = self.handle_file_event(&event.path, &stats).await {
165                            let error_msg = e.to_string();
166                            // Database unavailable errors are transient (e.g., during lore init)
167                            // Log at debug level to avoid spam
168                            if error_msg.contains("unable to open database")
169                                || error_msg.contains("database is locked")
170                            {
171                                tracing::debug!(
172                                    "Database temporarily unavailable for {:?}: {}",
173                                    event.path,
174                                    e
175                                );
176                            } else {
177                                tracing::warn!(
178                                    "Error handling file event for {:?}: {}",
179                                    event.path,
180                                    e
181                                );
182                                let mut stats_guard = stats.write().await;
183                                stats_guard.errors += 1;
184                            }
185                        }
186                    }
187                }
188                _ = shutdown_rx.recv() => {
189                    tracing::info!("Session watcher shutting down");
190                    break;
191                }
192            }
193        }
194
195        Ok(())
196    }
197
198    /// Opens a database connection for this operation.
199    fn open_db(&self) -> Result<Database> {
200        self.db_config.open()
201    }
202
203    /// Finds the watcher that owns a given file path.
204    ///
205    /// A watcher "owns" a path if one of its `watch_paths()` is an ancestor
206    /// of the given path. This ensures files are parsed by the correct watcher
207    /// rather than trying all watchers in arbitrary order.
208    ///
209    /// Returns `None` if no watcher claims the path.
210    fn find_owning_watcher<'a>(
211        path: &Path,
212        watchers: &'a [&'a dyn Watcher],
213    ) -> Option<&'a dyn Watcher> {
214        for watcher in watchers {
215            for watch_path in watcher.watch_paths() {
216                if path.starts_with(&watch_path) {
217                    return Some(*watcher);
218                }
219            }
220        }
221        None
222    }
223
224    /// Performs an initial scan of existing session files.
225    ///
226    /// Called when the watcher starts to import any sessions that were
227    /// created while the daemon was not running. Uses the watcher registry
228    /// to find session sources from all available watchers.
229    async fn initial_scan(&mut self, stats: &Arc<RwLock<DaemonStats>>) -> Result<()> {
230        tracing::info!("Performing initial scan of session files...");
231
232        let registry = default_registry();
233        let mut total_files = 0;
234
235        for watcher in registry.available_watchers() {
236            let watcher_name = watcher.info().name;
237            match watcher.find_sources() {
238                Ok(sources) => {
239                    tracing::info!("Found {} sources for {}", sources.len(), watcher_name);
240                    total_files += sources.len();
241
242                    for path in sources {
243                        // Process each file synchronously to avoid Send issues
244                        match self.process_file_sync(&path) {
245                            Ok(Some((sessions_imported, messages_imported))) => {
246                                let mut stats_guard = stats.write().await;
247                                stats_guard.sessions_imported += sessions_imported;
248                                stats_guard.messages_imported += messages_imported;
249                                stats_guard.files_watched = self.file_positions.len();
250                            }
251                            Ok(None) => {
252                                // File was already imported, just track position
253                            }
254                            Err(e) => {
255                                tracing::warn!("Failed to import {:?}: {}", path, e);
256                                let mut stats_guard = stats.write().await;
257                                stats_guard.errors += 1;
258                            }
259                        }
260                    }
261                }
262                Err(e) => {
263                    tracing::warn!("Failed to find sources for {}: {}", watcher_name, e);
264                }
265            }
266        }
267
268        {
269            let mut stats_guard = stats.write().await;
270            stats_guard.files_watched = total_files;
271        }
272
273        Ok(())
274    }
275
276    /// Handles a file system event for a session file.
277    async fn handle_file_event(
278        &mut self,
279        path: &Path,
280        stats: &Arc<RwLock<DaemonStats>>,
281    ) -> Result<()> {
282        let ext = path.extension().and_then(|e| e.to_str());
283
284        // Skip files that are not session sources
285        if !matches!(ext, Some("jsonl") | Some("vscdb")) {
286            return Ok(());
287        }
288
289        // Skip agent files (Claude Code specific)
290        if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
291            if name.starts_with("agent-") {
292                return Ok(());
293            }
294        }
295
296        // Check if file exists (might be a delete event)
297        if !path.exists() {
298            // File was deleted, remove from tracking
299            self.file_positions.remove(path);
300            return Ok(());
301        }
302
303        // Process the file synchronously
304        match self.process_file_sync(path) {
305            Ok(Some((sessions_imported, messages_imported))) => {
306                let mut stats_guard = stats.write().await;
307                stats_guard.sessions_imported += sessions_imported;
308                stats_guard.messages_imported += messages_imported;
309                stats_guard.files_watched = self.file_positions.len();
310            }
311            Ok(None) => {
312                // File unchanged or already processed
313            }
314            Err(e) => {
315                return Err(e);
316            }
317        }
318
319        Ok(())
320    }
321
322    /// Processes a file synchronously, returning import counts if anything was imported.
323    ///
324    /// Returns `Ok(Some((sessions, messages)))` if data was imported,
325    /// `Ok(None)` if the file was already processed, or an error.
326    ///
327    /// When a session file already exists in the database but has grown (new messages
328    /// added), this function re-imports the entire file. The database layer handles
329    /// deduplication: `insert_session` uses ON CONFLICT to update metadata, and
330    /// `insert_message` uses ON CONFLICT DO NOTHING to skip duplicates.
331    /// After re-import, auto-linking is triggered if the session has ended.
332    fn process_file_sync(&mut self, path: &Path) -> Result<Option<(u64, u64)>> {
333        let db = self.open_db()?;
334        let path_str = path.to_string_lossy();
335        let last_pos = self.file_positions.get(path).copied().unwrap_or(0);
336
337        // Get current file size
338        let metadata = std::fs::metadata(path).context("Failed to get file metadata")?;
339        let current_size = metadata.len();
340
341        if current_size <= last_pos {
342            // File hasn't grown (might have been truncated)
343            if current_size < last_pos {
344                // File was truncated, reset position
345                self.file_positions.insert(path.to_path_buf(), 0);
346            }
347            return Ok(None);
348        }
349
350        // Check if session already exists in the database
351        let existing_session = db.get_session_by_source(&path_str)?;
352
353        if let Some(existing) = existing_session {
354            // Session exists but file has grown - re-import to get new messages
355            // and updated metadata, then run auto-linking
356            tracing::debug!(
357                "Session {} exists but file has grown, re-importing for updates",
358                &existing.id.to_string()[..8]
359            );
360            let result = self.update_existing_session(path, &db, &existing)?;
361            self.file_positions.insert(path.to_path_buf(), current_size);
362            return Ok(Some(result));
363        }
364
365        // New session file - import it
366        let result = self.import_file_sync(path, &db)?;
367
368        // Update tracked position
369        self.file_positions.insert(path.to_path_buf(), current_size);
370
371        Ok(Some(result))
372    }
373
374    /// Updates an existing session by re-importing the file.
375    ///
376    /// Re-parses the session file and updates the database. The database layer
377    /// handles deduplication automatically. After updating, triggers auto-linking
378    /// if the session has an ended_at timestamp.
379    ///
380    /// Uses path-based dispatch to find the correct watcher for the file.
381    ///
382    /// Returns (0, new_messages_imported) since the session already exists.
383    fn update_existing_session(
384        &self,
385        path: &Path,
386        db: &Database,
387        existing_session: &crate::storage::models::Session,
388    ) -> Result<(u64, u64)> {
389        tracing::debug!("Updating existing session from: {:?}", path);
390
391        let path_buf = path.to_path_buf();
392        let registry = default_registry();
393        let available = registry.available_watchers();
394
395        // Find the watcher that owns this path
396        let owning_watcher = match Self::find_owning_watcher(path, &available) {
397            Some(w) => w,
398            None => {
399                tracing::debug!("No watcher owns path {:?}", path);
400                return Ok((0, 0));
401            }
402        };
403
404        // Parse with the owning watcher
405        let parsed_sessions = match owning_watcher.parse_source(&path_buf) {
406            Ok(sessions) => sessions,
407            Err(e) => {
408                tracing::debug!(
409                    "Watcher {} could not parse {:?}: {}",
410                    owning_watcher.info().name,
411                    path,
412                    e
413                );
414                return Ok((0, 0));
415            }
416        };
417
418        if parsed_sessions.is_empty() {
419            tracing::debug!(
420                "Watcher {} returned no sessions for {:?}",
421                owning_watcher.info().name,
422                path
423            );
424            return Ok((0, 0));
425        }
426
427        let mut total_messages = 0u64;
428        let mut updated_session: Option<crate::storage::models::Session> = None;
429
430        for (session, messages) in parsed_sessions {
431            if messages.is_empty() {
432                continue;
433            }
434
435            // Update session metadata (ended_at, message_count, git_branch)
436            // insert_session uses ON CONFLICT to update these fields
437            db.insert_session(&session)?;
438
439            // Track the most recent branch from messages
440            let mut latest_branch: Option<String> = None;
441            let mut new_message_count = 0u64;
442
443            for msg in &messages {
444                // insert_message uses ON CONFLICT DO NOTHING, so duplicates are skipped
445                // We don't have a reliable way to count only new messages, but we track
446                // the total for logging purposes
447                db.insert_message(msg)?;
448                new_message_count += 1;
449
450                if msg.git_branch.is_some() {
451                    latest_branch = msg.git_branch.clone();
452                }
453            }
454
455            // Update session branch if messages show a different branch
456            if let Some(ref new_branch) = latest_branch {
457                if session.git_branch.as_ref() != Some(new_branch) {
458                    if let Err(e) = db.update_session_branch(session.id, new_branch) {
459                        tracing::warn!(
460                            "Failed to update session branch for {}: {}",
461                            &session.id.to_string()[..8],
462                            e
463                        );
464                    } else {
465                        tracing::debug!(
466                            "Updated session {} branch to {}",
467                            &session.id.to_string()[..8],
468                            new_branch
469                        );
470                    }
471                }
472            }
473
474            total_messages += new_message_count;
475            updated_session = Some(session);
476        }
477
478        // Run auto-linking if the session has ended
479        if let Some(ref session) = updated_session {
480            if let Some(ended_at) = session.ended_at {
481                let session_just_ended = existing_session.ended_at.is_none();
482
483                if session_just_ended {
484                    tracing::info!(
485                        "Session {} has ended, running auto-link",
486                        &session.id.to_string()[..8]
487                    );
488                }
489
490                let linked = self.auto_link_session_commits(
491                    db,
492                    session.id,
493                    &session.working_directory,
494                    session.started_at,
495                    ended_at,
496                );
497                if let Err(e) = linked {
498                    tracing::warn!(
499                        "Failed to auto-link commits for session {}: {}",
500                        &session.id.to_string()[..8],
501                        e
502                    );
503                }
504
505                if session_just_ended {
506                    let session_clone = session.clone();
507                    std::thread::spawn(move || match Database::open_default() {
508                        Ok(db) => Self::auto_summarize_session(&db, &session_clone),
509                        Err(e) => {
510                            tracing::warn!("Failed to open DB for auto-summarize: {e}")
511                        }
512                    });
513                }
514            }
515        }
516
517        // Return 0 sessions since we updated an existing one, not imported new
518        Ok((0, total_messages))
519    }
520
521    /// Imports a complete session file synchronously.
522    /// Returns (sessions_imported, messages_imported) counts.
523    ///
524    /// Uses path-based dispatch to find the correct watcher for the file.
525    /// Files are parsed only by the watcher that owns their parent directory,
526    /// preventing incorrect parsing by unrelated watchers.
527    fn import_file_sync(&mut self, path: &Path, db: &Database) -> Result<(u64, u64)> {
528        tracing::debug!("Importing session file: {:?}", path);
529
530        let path_buf = path.to_path_buf();
531        let registry = default_registry();
532        let available = registry.available_watchers();
533
534        // Find the watcher that owns this path
535        let owning_watcher = match Self::find_owning_watcher(path, &available) {
536            Some(w) => w,
537            None => {
538                tracing::debug!("No watcher owns path {:?}", path);
539                return Ok((0, 0));
540            }
541        };
542
543        // Parse with the owning watcher
544        let parsed_sessions = match owning_watcher.parse_source(&path_buf) {
545            Ok(sessions) => sessions,
546            Err(e) => {
547                tracing::debug!(
548                    "Watcher {} could not parse {:?}: {}",
549                    owning_watcher.info().name,
550                    path,
551                    e
552                );
553                return Ok((0, 0));
554            }
555        };
556
557        if parsed_sessions.is_empty() {
558            tracing::debug!(
559                "Watcher {} returned no sessions for {:?}",
560                owning_watcher.info().name,
561                path
562            );
563            return Ok((0, 0));
564        }
565
566        let mut total_sessions = 0u64;
567        let mut total_messages = 0u64;
568
569        for (session, messages) in parsed_sessions {
570            if messages.is_empty() {
571                continue;
572            }
573
574            let message_count = messages.len();
575
576            // Store session
577            db.insert_session(&session)?;
578
579            // Store messages and track the most recent branch
580            let mut latest_branch: Option<String> = None;
581            for msg in &messages {
582                db.insert_message(msg)?;
583                // Track the branch from the most recent message that has one
584                if msg.git_branch.is_some() {
585                    latest_branch = msg.git_branch.clone();
586                }
587            }
588
589            // Update session branch if the latest message has a different branch
590            // This handles the case where the user switches branches mid-session
591            if let Some(ref new_branch) = latest_branch {
592                if session.git_branch.as_ref() != Some(new_branch) {
593                    if let Err(e) = db.update_session_branch(session.id, new_branch) {
594                        tracing::warn!(
595                            "Failed to update session branch for {}: {}",
596                            &session.id.to_string()[..8],
597                            e
598                        );
599                    } else {
600                        tracing::debug!(
601                            "Updated session {} branch to {}",
602                            &session.id.to_string()[..8],
603                            new_branch
604                        );
605                    }
606                }
607            }
608
609            tracing::info!(
610                "Imported session {} with {} messages from {:?}",
611                &session.id.to_string()[..8],
612                message_count,
613                path.file_name().unwrap_or_default()
614            );
615
616            // Auto-link commits if session has ended
617            if let Some(ended_at) = session.ended_at {
618                let linked = self.auto_link_session_commits(
619                    db,
620                    session.id,
621                    &session.working_directory,
622                    session.started_at,
623                    ended_at,
624                );
625                if let Err(e) = linked {
626                    tracing::warn!(
627                        "Failed to auto-link commits for session {}: {}",
628                        &session.id.to_string()[..8],
629                        e
630                    );
631                }
632            }
633
634            if session.ended_at.is_some() {
635                Self::auto_summarize_session(db, &session);
636            }
637
638            total_sessions += 1;
639            total_messages += message_count as u64;
640        }
641
642        // Update file position
643        if let Ok(metadata) = std::fs::metadata(path) {
644            self.file_positions
645                .insert(path.to_path_buf(), metadata.len());
646        }
647
648        Ok((total_sessions, total_messages))
649    }
650
651    /// Auto-links commits made during a session's time window.
652    ///
653    /// Finds all commits in the session's working directory that were made
654    /// between the session's start and end time, then creates links for any
655    /// commits that are not already linked to this session.
656    ///
657    /// # Arguments
658    ///
659    /// * `db` - Database connection for storing links
660    /// * `session_id` - The session to link commits to
661    /// * `working_directory` - Path to the repository working directory
662    /// * `started_at` - Session start time
663    /// * `ended_at` - Session end time
664    ///
665    /// # Returns
666    ///
667    /// The number of commits that were linked.
668    fn auto_link_session_commits(
669        &self,
670        db: &Database,
671        session_id: Uuid,
672        working_directory: &str,
673        started_at: chrono::DateTime<Utc>,
674        ended_at: chrono::DateTime<Utc>,
675    ) -> Result<usize> {
676        let working_dir = Path::new(working_directory);
677
678        // Check if working directory is a git repository
679        if !working_dir.exists() {
680            tracing::debug!("Working directory does not exist: {}", working_directory);
681            return Ok(0);
682        }
683
684        // Get commits in the session's time range
685        let commits = match get_commits_in_time_range(working_dir, started_at, ended_at) {
686            Ok(commits) => commits,
687            Err(e) => {
688                // Not a git repository or other git error - this is expected
689                // for sessions outside git repos
690                tracing::debug!("Could not get commits for {}: {}", working_directory, e);
691                return Ok(0);
692            }
693        };
694
695        if commits.is_empty() {
696            tracing::debug!(
697                "No commits found in time range for session {}",
698                &session_id.to_string()[..8]
699            );
700            return Ok(0);
701        }
702
703        let mut linked_count = 0;
704
705        for commit in commits {
706            // Skip if link already exists
707            if db.link_exists(&session_id, &commit.sha)? {
708                tracing::debug!(
709                    "Link already exists for session {} and commit {}",
710                    &session_id.to_string()[..8],
711                    &commit.sha[..8]
712                );
713                continue;
714            }
715
716            // Create the link
717            let link = SessionLink {
718                id: Uuid::new_v4(),
719                session_id,
720                link_type: LinkType::Commit,
721                commit_sha: Some(commit.sha.clone()),
722                branch: commit.branch.clone(),
723                remote: None,
724                created_at: Utc::now(),
725                created_by: LinkCreator::Auto,
726                confidence: Some(1.0), // Direct time match is high confidence
727            };
728
729            db.insert_link(&link)?;
730            linked_count += 1;
731
732            tracing::info!(
733                "Auto-linked commit {} to session {} ({})",
734                &commit.sha[..8],
735                &session_id.to_string()[..8],
736                commit.summary.chars().take(50).collect::<String>()
737            );
738        }
739
740        if linked_count > 0 {
741            tracing::info!(
742                "Auto-linked {} commits to session {}",
743                linked_count,
744                &session_id.to_string()[..8]
745            );
746        }
747
748        Ok(linked_count)
749    }
750
751    /// Attempts to auto-generate a summary for a completed session.
752    ///
753    /// This is best-effort: all errors are logged but not propagated.
754    /// Does nothing if summary_auto is disabled, the session has too few
755    /// messages, or a summary already exists.
756    fn auto_summarize_session(db: &Database, session: &crate::storage::models::Session) {
757        let config = match crate::config::Config::load() {
758            Ok(c) => c,
759            Err(e) => {
760                tracing::debug!("Could not load config for auto-summarize: {e}");
761                return;
762            }
763        };
764
765        if !config.summary_auto {
766            return;
767        }
768
769        if (session.message_count as usize) < config.summary_auto_threshold {
770            return;
771        }
772
773        // Check if summary already exists
774        match db.get_summary(&session.id) {
775            Ok(Some(_)) => return,
776            Ok(None) => {}
777            Err(e) => {
778                tracing::warn!("Failed to check existing summary: {e}");
779                return;
780            }
781        }
782
783        let messages = match db.get_messages(&session.id) {
784            Ok(m) => m,
785            Err(e) => {
786                tracing::warn!("Failed to get messages for auto-summarize: {e}");
787                return;
788            }
789        };
790
791        match crate::summarize::generate_summary(&messages) {
792            Ok(content) => {
793                let summary = crate::storage::models::Summary {
794                    id: uuid::Uuid::new_v4(),
795                    session_id: session.id,
796                    content,
797                    generated_at: chrono::Utc::now(),
798                };
799                if let Err(e) = db.insert_summary(&summary) {
800                    tracing::warn!(
801                        "Failed to save auto-generated summary for {}: {e}",
802                        &session.id.to_string()[..8]
803                    );
804                } else {
805                    tracing::info!(
806                        "Auto-generated summary for session {}",
807                        &session.id.to_string()[..8]
808                    );
809                }
810            }
811            Err(e) => {
812                tracing::debug!(
813                    "Auto-summarize skipped for session {}: {e}",
814                    &session.id.to_string()[..8]
815                );
816            }
817        }
818    }
819
820    /// Returns the number of files currently being tracked.
821    ///
822    /// This method is part of the public API for status reporting
823    /// and is used by tests.
824    #[allow(dead_code)]
825    pub fn tracked_file_count(&self) -> usize {
826        self.file_positions.len()
827    }
828}
829
830#[cfg(test)]
831mod tests {
832    use super::*;
833    use crate::storage::models::Session;
834    use chrono::Duration;
835    use tempfile::tempdir;
836
837    /// Creates a test database in the given directory.
838    fn create_test_db(dir: &Path) -> Database {
839        let db_path = dir.join("test.db");
840        Database::open(&db_path).expect("Failed to open test database")
841    }
842
843    /// Creates a test session with specified time window.
844    fn create_test_session_with_times(
845        working_directory: &str,
846        started_at: chrono::DateTime<Utc>,
847        ended_at: chrono::DateTime<Utc>,
848    ) -> Session {
849        Session {
850            id: Uuid::new_v4(),
851            tool: "test-tool".to_string(),
852            tool_version: Some("1.0.0".to_string()),
853            started_at,
854            ended_at: Some(ended_at),
855            model: Some("test-model".to_string()),
856            working_directory: working_directory.to_string(),
857            git_branch: Some("main".to_string()),
858            source_path: None,
859            message_count: 0,
860            machine_id: Some("test-machine".to_string()),
861        }
862    }
863
864    /// Creates a test commit in the repository with a specific timestamp.
865    ///
866    /// Returns the full SHA of the created commit.
867    fn create_test_commit(
868        repo: &git2::Repository,
869        message: &str,
870        time: chrono::DateTime<Utc>,
871    ) -> String {
872        let sig = git2::Signature::new(
873            "Test User",
874            "test@example.com",
875            &git2::Time::new(time.timestamp(), 0),
876        )
877        .expect("Failed to create signature");
878
879        // Get current tree (or create empty tree for first commit)
880        let tree_id = {
881            let mut index = repo.index().expect("Failed to get index");
882
883            // Create a test file to have something to commit
884            let file_path = repo
885                .workdir()
886                .unwrap()
887                .join(format!("test_{}.txt", Uuid::new_v4()));
888            std::fs::write(&file_path, format!("Content for: {message}"))
889                .expect("Failed to write test file");
890
891            index
892                .add_path(file_path.strip_prefix(repo.workdir().unwrap()).unwrap())
893                .expect("Failed to add file to index");
894            index.write().expect("Failed to write index");
895            index.write_tree().expect("Failed to write tree")
896        };
897
898        let tree = repo.find_tree(tree_id).expect("Failed to find tree");
899
900        // Get parent commit if it exists
901        let parent = repo.head().ok().and_then(|h| h.peel_to_commit().ok());
902
903        let commit_id = if let Some(parent) = parent {
904            repo.commit(Some("HEAD"), &sig, &sig, message, &tree, &[&parent])
905                .expect("Failed to create commit")
906        } else {
907            repo.commit(Some("HEAD"), &sig, &sig, message, &tree, &[])
908                .expect("Failed to create commit")
909        };
910
911        commit_id.to_string()
912    }
913
914    /// Initializes a new git repository in the given directory.
915    fn init_test_repo(dir: &Path) -> git2::Repository {
916        git2::Repository::init(dir).expect("Failed to init test repo")
917    }
918
919    #[test]
920    fn test_session_watcher_creation() {
921        let watcher = SessionWatcher::new();
922        assert!(watcher.is_ok(), "Should create watcher successfully");
923
924        // SessionWatcher creation should succeed even if no watchers are
925        // available (e.g., in CI environments where ~/.claude doesn't exist).
926        // The watch_dirs list may be empty in such environments.
927        let _watcher = watcher.unwrap();
928    }
929
930    #[test]
931    fn test_watch_dirs_from_registry() {
932        use crate::capture::watchers::default_registry;
933
934        // Test that the registry is configured with known watcher paths.
935        // Note: SessionWatcher.watch_dirs() only includes paths from AVAILABLE
936        // watchers. In CI environments, no watchers may be available because
937        // their directories (like ~/.claude) don't exist.
938
939        // Instead of testing through SessionWatcher, we verify the registry
940        // directly by checking all_watchers() (not just available ones).
941        let registry = default_registry();
942        let all_watchers = registry.all_watchers();
943
944        // Collect watch paths from ALL watchers (including unavailable ones)
945        let all_paths: Vec<_> = all_watchers.iter().flat_map(|w| w.watch_paths()).collect();
946
947        let has_claude = all_paths
948            .iter()
949            .any(|d| d.to_string_lossy().contains(".claude"));
950        let has_cursor = all_paths
951            .iter()
952            .any(|d| d.to_string_lossy().contains("Cursor"));
953
954        // The registry should have paths configured for known watchers.
955        assert!(
956            has_claude || has_cursor,
957            "Registry should configure at least one known watcher path pattern \
958             (expected .claude or Cursor in paths). Found paths: {all_paths:?}"
959        );
960    }
961
962    #[test]
963    fn test_tracked_file_count_initial() {
964        let watcher = SessionWatcher::new().unwrap();
965        assert_eq!(
966            watcher.tracked_file_count(),
967            0,
968            "Should start with no tracked files"
969        );
970    }
971
972    #[test]
973    fn test_db_config_creation() {
974        let config = DbConfig::default_config();
975        assert!(config.is_ok(), "Should create DbConfig successfully");
976    }
977
978    #[test]
979    fn test_file_position_tracking() {
980        let mut watcher = SessionWatcher::new().unwrap();
981
982        let path1 = PathBuf::from("/test/file1.jsonl");
983        let path2 = PathBuf::from("/test/file2.jsonl");
984
985        watcher.file_positions.insert(path1.clone(), 100);
986        watcher.file_positions.insert(path2.clone(), 200);
987
988        assert_eq!(watcher.tracked_file_count(), 2);
989        assert_eq!(watcher.file_positions.get(&path1), Some(&100));
990        assert_eq!(watcher.file_positions.get(&path2), Some(&200));
991    }
992
993    // ==================== find_owning_watcher Tests ====================
994
995    #[test]
996    fn test_find_owning_watcher_matches_path_under_watch_dir() {
997        use crate::capture::watchers::{Watcher, WatcherInfo};
998        use crate::storage::models::{Message, Session};
999
1000        struct TestWatcher {
1001            name: &'static str,
1002            watch_path: PathBuf,
1003        }
1004
1005        impl Watcher for TestWatcher {
1006            fn info(&self) -> WatcherInfo {
1007                WatcherInfo {
1008                    name: self.name,
1009                    description: "Test",
1010                    default_paths: vec![],
1011                }
1012            }
1013            fn is_available(&self) -> bool {
1014                true
1015            }
1016            fn find_sources(&self) -> Result<Vec<PathBuf>> {
1017                Ok(vec![])
1018            }
1019            fn parse_source(&self, _: &Path) -> Result<Vec<(Session, Vec<Message>)>> {
1020                Ok(vec![])
1021            }
1022            fn watch_paths(&self) -> Vec<PathBuf> {
1023                vec![self.watch_path.clone()]
1024            }
1025        }
1026
1027        let watcher1 = TestWatcher {
1028            name: "watcher-a",
1029            watch_path: PathBuf::from("/home/user/.claude/projects"),
1030        };
1031        let watcher2 = TestWatcher {
1032            name: "watcher-b",
1033            watch_path: PathBuf::from("/home/user/.aider"),
1034        };
1035
1036        let watchers: Vec<&dyn Watcher> = vec![&watcher1, &watcher2];
1037
1038        // File under watcher1's path
1039        let claude_file = Path::new("/home/user/.claude/projects/myproject/session.jsonl");
1040        let result = SessionWatcher::find_owning_watcher(claude_file, &watchers);
1041        assert!(result.is_some());
1042        assert_eq!(result.unwrap().info().name, "watcher-a");
1043
1044        // File under watcher2's path
1045        let aider_file = Path::new("/home/user/.aider/history.md");
1046        let result = SessionWatcher::find_owning_watcher(aider_file, &watchers);
1047        assert!(result.is_some());
1048        assert_eq!(result.unwrap().info().name, "watcher-b");
1049
1050        // File not under any watch path
1051        let other_file = Path::new("/home/user/projects/random.txt");
1052        let result = SessionWatcher::find_owning_watcher(other_file, &watchers);
1053        assert!(result.is_none());
1054    }
1055
1056    // ==================== auto_link_session_commits Tests ====================
1057
1058    #[test]
1059    fn test_auto_link_creates_links_for_commits_in_time_range() {
1060        // Create temp directory for both git repo and database
1061        let dir = tempdir().expect("Failed to create temp directory");
1062        let repo_path = dir.path();
1063
1064        // Initialize git repo
1065        let repo = init_test_repo(repo_path);
1066
1067        // Create database in the temp directory
1068        let db = create_test_db(repo_path);
1069
1070        // Define time window for session: 1 hour ago to now
1071        let now = Utc::now();
1072        let session_start = now - Duration::hours(1);
1073        let session_end = now;
1074
1075        // Create commits within the session's time window
1076        let commit_time1 = session_start + Duration::minutes(10);
1077        let commit_time2 = session_start + Duration::minutes(30);
1078        let commit_time3 = session_start + Duration::minutes(50);
1079
1080        let sha1 = create_test_commit(&repo, "First commit in session", commit_time1);
1081        let sha2 = create_test_commit(&repo, "Second commit in session", commit_time2);
1082        let sha3 = create_test_commit(&repo, "Third commit in session", commit_time3);
1083
1084        // Create and insert session
1085        let session = create_test_session_with_times(
1086            &repo_path.to_string_lossy(),
1087            session_start,
1088            session_end,
1089        );
1090        db.insert_session(&session)
1091            .expect("Failed to insert session");
1092
1093        // Create a minimal watcher for testing
1094        let watcher = SessionWatcher {
1095            file_positions: HashMap::new(),
1096            watch_dirs: vec![],
1097            db_config: DbConfig {
1098                path: repo_path.join("test.db"),
1099            },
1100        };
1101
1102        // Call auto_link_session_commits
1103        let linked_count = watcher
1104            .auto_link_session_commits(
1105                &db,
1106                session.id,
1107                &repo_path.to_string_lossy(),
1108                session_start,
1109                session_end,
1110            )
1111            .expect("auto_link_session_commits should succeed");
1112
1113        // Assert correct number of links created
1114        assert_eq!(linked_count, 3, "Should have linked 3 commits");
1115
1116        // Verify each commit is linked
1117        assert!(
1118            db.link_exists(&session.id, &sha1)
1119                .expect("link_exists should succeed"),
1120            "First commit should be linked"
1121        );
1122        assert!(
1123            db.link_exists(&session.id, &sha2)
1124                .expect("link_exists should succeed"),
1125            "Second commit should be linked"
1126        );
1127        assert!(
1128            db.link_exists(&session.id, &sha3)
1129                .expect("link_exists should succeed"),
1130            "Third commit should be linked"
1131        );
1132    }
1133
1134    #[test]
1135    fn test_auto_link_skips_commits_outside_time_range() {
1136        // Create temp directory
1137        let dir = tempdir().expect("Failed to create temp directory");
1138        let repo_path = dir.path();
1139
1140        // Initialize git repo
1141        let repo = init_test_repo(repo_path);
1142
1143        // Create database
1144        let db = create_test_db(repo_path);
1145
1146        // Define a narrow time window: 30 minutes to 20 minutes ago
1147        let now = Utc::now();
1148        let session_start = now - Duration::minutes(30);
1149        let session_end = now - Duration::minutes(20);
1150
1151        // Create commits BEFORE the session window (40 minutes ago)
1152        let before_time = now - Duration::minutes(40);
1153        let sha_before = create_test_commit(&repo, "Commit before session", before_time);
1154
1155        // Create commit INSIDE the session window (25 minutes ago)
1156        let inside_time = now - Duration::minutes(25);
1157        let sha_inside = create_test_commit(&repo, "Commit inside session", inside_time);
1158
1159        // Create commit AFTER the session window (10 minutes ago)
1160        let after_time = now - Duration::minutes(10);
1161        let sha_after = create_test_commit(&repo, "Commit after session", after_time);
1162
1163        // Create and insert session
1164        let session = create_test_session_with_times(
1165            &repo_path.to_string_lossy(),
1166            session_start,
1167            session_end,
1168        );
1169        db.insert_session(&session)
1170            .expect("Failed to insert session");
1171
1172        // Create watcher and call auto_link
1173        let watcher = SessionWatcher {
1174            file_positions: HashMap::new(),
1175            watch_dirs: vec![],
1176            db_config: DbConfig {
1177                path: repo_path.join("test.db"),
1178            },
1179        };
1180
1181        let linked_count = watcher
1182            .auto_link_session_commits(
1183                &db,
1184                session.id,
1185                &repo_path.to_string_lossy(),
1186                session_start,
1187                session_end,
1188            )
1189            .expect("auto_link_session_commits should succeed");
1190
1191        // Only the commit inside the window should be linked
1192        assert_eq!(linked_count, 1, "Should have linked only 1 commit");
1193
1194        // Verify the commit before is NOT linked
1195        assert!(
1196            !db.link_exists(&session.id, &sha_before)
1197                .expect("link_exists should succeed"),
1198            "Commit before session should NOT be linked"
1199        );
1200
1201        // Verify the commit inside IS linked
1202        assert!(
1203            db.link_exists(&session.id, &sha_inside)
1204                .expect("link_exists should succeed"),
1205            "Commit inside session should be linked"
1206        );
1207
1208        // Verify the commit after is NOT linked
1209        assert!(
1210            !db.link_exists(&session.id, &sha_after)
1211                .expect("link_exists should succeed"),
1212            "Commit after session should NOT be linked"
1213        );
1214    }
1215
1216    #[test]
1217    fn test_auto_link_skips_existing_links() {
1218        // Create temp directory
1219        let dir = tempdir().expect("Failed to create temp directory");
1220        let repo_path = dir.path();
1221
1222        // Initialize git repo
1223        let repo = init_test_repo(repo_path);
1224
1225        // Create database
1226        let db = create_test_db(repo_path);
1227
1228        // Define time window
1229        let now = Utc::now();
1230        let session_start = now - Duration::hours(1);
1231        let session_end = now;
1232
1233        // Create a commit in the time window
1234        let commit_time = session_start + Duration::minutes(30);
1235        let sha = create_test_commit(&repo, "Test commit", commit_time);
1236
1237        // Create and insert session
1238        let session = create_test_session_with_times(
1239            &repo_path.to_string_lossy(),
1240            session_start,
1241            session_end,
1242        );
1243        db.insert_session(&session)
1244            .expect("Failed to insert session");
1245
1246        // Manually create a link for the commit
1247        let existing_link = SessionLink {
1248            id: Uuid::new_v4(),
1249            session_id: session.id,
1250            link_type: LinkType::Commit,
1251            commit_sha: Some(sha.clone()),
1252            branch: Some("main".to_string()),
1253            remote: None,
1254            created_at: Utc::now(),
1255            created_by: LinkCreator::Auto,
1256            confidence: Some(1.0),
1257        };
1258        db.insert_link(&existing_link)
1259            .expect("Failed to insert existing link");
1260
1261        // Create watcher and call auto_link
1262        let watcher = SessionWatcher {
1263            file_positions: HashMap::new(),
1264            watch_dirs: vec![],
1265            db_config: DbConfig {
1266                path: repo_path.join("test.db"),
1267            },
1268        };
1269
1270        let linked_count = watcher
1271            .auto_link_session_commits(
1272                &db,
1273                session.id,
1274                &repo_path.to_string_lossy(),
1275                session_start,
1276                session_end,
1277            )
1278            .expect("auto_link_session_commits should succeed");
1279
1280        // Should return 0 since the link already exists
1281        assert_eq!(
1282            linked_count, 0,
1283            "Should not create any new links when link already exists"
1284        );
1285
1286        // Verify the link still exists (and there is only one)
1287        assert!(
1288            db.link_exists(&session.id, &sha)
1289                .expect("link_exists should succeed"),
1290            "Link should still exist"
1291        );
1292    }
1293
1294    #[test]
1295    fn test_auto_link_handles_non_git_directory() {
1296        // Create temp directory that is NOT a git repo
1297        let dir = tempdir().expect("Failed to create temp directory");
1298        let non_repo_path = dir.path();
1299
1300        // Create database
1301        let db = create_test_db(non_repo_path);
1302
1303        // Define time window
1304        let now = Utc::now();
1305        let session_start = now - Duration::hours(1);
1306        let session_end = now;
1307
1308        // Create and insert session pointing to non-git directory
1309        let session = create_test_session_with_times(
1310            &non_repo_path.to_string_lossy(),
1311            session_start,
1312            session_end,
1313        );
1314        db.insert_session(&session)
1315            .expect("Failed to insert session");
1316
1317        // Create watcher and call auto_link
1318        let watcher = SessionWatcher {
1319            file_positions: HashMap::new(),
1320            watch_dirs: vec![],
1321            db_config: DbConfig {
1322                path: non_repo_path.join("test.db"),
1323            },
1324        };
1325
1326        let result = watcher.auto_link_session_commits(
1327            &db,
1328            session.id,
1329            &non_repo_path.to_string_lossy(),
1330            session_start,
1331            session_end,
1332        );
1333
1334        // Should return Ok(0), not an error
1335        assert!(
1336            result.is_ok(),
1337            "Should handle non-git directory gracefully: {:?}",
1338            result.err()
1339        );
1340        assert_eq!(result.unwrap(), 0, "Should return 0 for non-git directory");
1341    }
1342
1343    #[test]
1344    fn test_auto_link_finds_commits_on_multiple_branches() {
1345        // Create temp directory
1346        let dir = tempdir().expect("Failed to create temp directory");
1347        let repo_path = dir.path();
1348
1349        // Initialize git repo
1350        let repo = init_test_repo(repo_path);
1351
1352        // Create database
1353        let db = create_test_db(repo_path);
1354
1355        // Define time window: 1 hour ago to now
1356        let now = Utc::now();
1357        let session_start = now - Duration::hours(1);
1358        let session_end = now;
1359
1360        // Create initial commit on main branch (default)
1361        let main_commit_time = session_start + Duration::minutes(10);
1362        let sha_main = create_test_commit(&repo, "Commit on main", main_commit_time);
1363
1364        // Get the default branch name (could be master or main depending on git config)
1365        let head_ref = repo.head().expect("Should have HEAD after commit");
1366        let default_branch = head_ref
1367            .shorthand()
1368            .expect("HEAD should have a name")
1369            .to_string();
1370
1371        // Create a feature branch and switch to it
1372        let main_commit = head_ref.peel_to_commit().unwrap();
1373        repo.branch("feature-branch", &main_commit, false)
1374            .expect("Failed to create branch");
1375        repo.set_head("refs/heads/feature-branch")
1376            .expect("Failed to switch branch");
1377
1378        // Create commit on feature branch
1379        let feature_commit_time = session_start + Duration::minutes(30);
1380        let sha_feature = create_test_commit(&repo, "Commit on feature", feature_commit_time);
1381
1382        // Switch back to default branch and create another commit
1383        repo.set_head(&format!("refs/heads/{}", default_branch))
1384            .expect("Failed to switch to default branch");
1385        // Need to reset the working directory to default branch
1386        let main_obj = repo
1387            .revparse_single(&default_branch)
1388            .expect("Should find default branch");
1389        repo.reset(&main_obj, git2::ResetType::Hard, None)
1390            .expect("Failed to reset to default branch");
1391
1392        let main_commit_time2 = session_start + Duration::minutes(50);
1393        let sha_main2 = create_test_commit(&repo, "Second commit on main", main_commit_time2);
1394
1395        // Create and insert session
1396        let session = create_test_session_with_times(
1397            &repo_path.to_string_lossy(),
1398            session_start,
1399            session_end,
1400        );
1401        db.insert_session(&session)
1402            .expect("Failed to insert session");
1403
1404        // Create watcher and call auto_link
1405        let watcher = SessionWatcher {
1406            file_positions: HashMap::new(),
1407            watch_dirs: vec![],
1408            db_config: DbConfig {
1409                path: repo_path.join("test.db"),
1410            },
1411        };
1412
1413        let linked_count = watcher
1414            .auto_link_session_commits(
1415                &db,
1416                session.id,
1417                &repo_path.to_string_lossy(),
1418                session_start,
1419                session_end,
1420            )
1421            .expect("auto_link_session_commits should succeed");
1422
1423        // Should link all 3 commits from both branches
1424        assert_eq!(
1425            linked_count, 3,
1426            "Should have linked commits from both branches"
1427        );
1428
1429        // Verify each commit is linked
1430        assert!(
1431            db.link_exists(&session.id, &sha_main)
1432                .expect("link_exists should succeed"),
1433            "First main commit should be linked"
1434        );
1435        assert!(
1436            db.link_exists(&session.id, &sha_feature)
1437                .expect("link_exists should succeed"),
1438            "Feature branch commit should be linked"
1439        );
1440        assert!(
1441            db.link_exists(&session.id, &sha_main2)
1442                .expect("link_exists should succeed"),
1443            "Second main commit should be linked"
1444        );
1445    }
1446
1447    #[test]
1448    fn test_update_existing_session_triggers_auto_link() {
1449        // This test verifies that when a session already exists in the database
1450        // and the file grows (new messages), re-importing triggers auto-linking.
1451
1452        // Create temp directory
1453        let dir = tempdir().expect("Failed to create temp directory");
1454        let repo_path = dir.path();
1455
1456        // Initialize git repo and create database
1457        let repo = init_test_repo(repo_path);
1458        let db = create_test_db(repo_path);
1459
1460        // Define time window
1461        let now = Utc::now();
1462        let session_start = now - Duration::hours(1);
1463        let session_end = now;
1464
1465        // Create a commit during the session
1466        let commit_time = session_start + Duration::minutes(30);
1467        let sha = create_test_commit(&repo, "Commit during session", commit_time);
1468
1469        // Create a session WITHOUT ended_at (simulating ongoing session from CLI import)
1470        let session_id = Uuid::new_v4();
1471        let ongoing_session = Session {
1472            id: session_id,
1473            tool: "test-tool".to_string(),
1474            tool_version: Some("1.0.0".to_string()),
1475            started_at: session_start,
1476            ended_at: None, // Not ended yet
1477            model: Some("test-model".to_string()),
1478            working_directory: repo_path.to_string_lossy().to_string(),
1479            git_branch: Some("main".to_string()),
1480            source_path: Some("/test/session.jsonl".to_string()),
1481            message_count: 5,
1482            machine_id: Some("test-machine".to_string()),
1483        };
1484
1485        db.insert_session(&ongoing_session)
1486            .expect("Failed to insert session");
1487
1488        // Verify commit is NOT linked yet (since session has not ended)
1489        assert!(
1490            !db.link_exists(&session_id, &sha)
1491                .expect("link_exists should succeed"),
1492            "Commit should NOT be linked to ongoing session"
1493        );
1494
1495        // Now create an updated session with ended_at set
1496        let ended_session = Session {
1497            id: session_id,
1498            tool: "test-tool".to_string(),
1499            tool_version: Some("1.0.0".to_string()),
1500            started_at: session_start,
1501            ended_at: Some(session_end), // Now ended
1502            model: Some("test-model".to_string()),
1503            working_directory: repo_path.to_string_lossy().to_string(),
1504            git_branch: Some("main".to_string()),
1505            source_path: Some("/test/session.jsonl".to_string()),
1506            message_count: 10,
1507            machine_id: Some("test-machine".to_string()),
1508        };
1509
1510        // Create watcher
1511        let watcher = SessionWatcher {
1512            file_positions: HashMap::new(),
1513            watch_dirs: vec![],
1514            db_config: DbConfig {
1515                path: repo_path.join("test.db"),
1516            },
1517        };
1518
1519        // Simulate what update_existing_session does:
1520        // 1. Update session in DB
1521        db.insert_session(&ended_session)
1522            .expect("Failed to update session");
1523
1524        // 2. Run auto-linking (this is what the fix enables)
1525        let linked_count = watcher
1526            .auto_link_session_commits(
1527                &db,
1528                session_id,
1529                &repo_path.to_string_lossy(),
1530                session_start,
1531                session_end,
1532            )
1533            .expect("auto_link_session_commits should succeed");
1534
1535        // Verify the commit is now linked
1536        assert_eq!(linked_count, 1, "Should have linked 1 commit");
1537        assert!(
1538            db.link_exists(&session_id, &sha)
1539                .expect("link_exists should succeed"),
1540            "Commit should be linked after session ended"
1541        );
1542    }
1543}