lore_cli/daemon/
watcher.rs

1//! File system watcher for AI tool session files.
2//!
3//! Watches directories configured by the WatcherRegistry for new and modified
4//! session files. Performs incremental parsing to efficiently handle file
5//! updates without re-reading entire files.
6//!
7//! Currently supports:
8//! - Claude Code JSONL files in `~/.claude/projects/`
9
10use anyhow::{Context, Result};
11use chrono::Utc;
12use notify::RecursiveMode;
13use notify_debouncer_mini::{new_debouncer, DebouncedEvent};
14use std::collections::HashMap;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::{mpsc, RwLock};
19use uuid::Uuid;
20
21use crate::capture::watchers::{default_registry, Watcher};
22use crate::git::get_commits_in_time_range;
23use crate::storage::models::{LinkCreator, LinkType, SessionLink};
24use crate::storage::Database;
25
26use super::state::DaemonStats;
27
28/// Database path for creating connections within the watcher.
29/// rusqlite connections are not thread-safe, so we create a new
30/// connection when needed rather than sharing one across threads.
31#[derive(Clone)]
32pub struct DbConfig {
33    path: PathBuf,
34}
35
36impl DbConfig {
37    /// Creates a new DbConfig for the default database location.
38    pub fn default_config() -> Result<Self> {
39        let path = crate::storage::db::default_db_path()?;
40        Ok(Self { path })
41    }
42
43    /// Opens a new database connection.
44    pub fn open(&self) -> Result<Database> {
45        Database::open(&self.path)
46    }
47}
48
49/// Watches for session file changes and imports new messages.
50///
51/// Tracks the byte position in each file to enable incremental reading,
52/// avoiding the need to re-parse entire files on each modification.
53pub struct SessionWatcher {
54    /// Maps file paths to their last read position (byte offset).
55    file_positions: HashMap<PathBuf, u64>,
56    /// Directories to watch for session files.
57    watch_dirs: Vec<PathBuf>,
58    /// Database configuration for creating connections.
59    db_config: DbConfig,
60}
61
62impl SessionWatcher {
63    /// Creates a new SessionWatcher.
64    ///
65    /// Uses the default watcher registry to determine which directories
66    /// to watch for session files.
67    ///
68    /// # Errors
69    ///
70    /// Returns an error if the database configuration cannot be created.
71    pub fn new() -> Result<Self> {
72        let registry = default_registry();
73        let watch_dirs = registry.all_watch_paths();
74
75        let db_config = DbConfig::default_config()?;
76
77        Ok(Self {
78            file_positions: HashMap::new(),
79            watch_dirs,
80            db_config,
81        })
82    }
83
84    /// Returns the directories being watched.
85    ///
86    /// This method is part of the public API for status reporting
87    /// and may be used by CLI commands in the future.
88    #[allow(dead_code)]
89    pub fn watch_dirs(&self) -> &[PathBuf] {
90        &self.watch_dirs
91    }
92
93    /// Starts watching for file changes and processing them.
94    ///
95    /// This function runs until the shutdown signal is received.
96    ///
97    /// # Arguments
98    ///
99    /// * `stats` - Shared statistics to update on imports
100    /// * `shutdown_rx` - Receiver that signals when to stop watching
101    ///
102    /// # Errors
103    ///
104    /// Returns an error if the watcher cannot be created or started.
105    pub async fn watch(
106        &mut self,
107        stats: Arc<RwLock<DaemonStats>>,
108        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
109    ) -> Result<()> {
110        // Log which directories we will watch
111        for dir in &self.watch_dirs {
112            if dir.exists() {
113                tracing::info!("Will watch for session files in {:?}", dir);
114            } else {
115                tracing::info!("Watch directory does not exist yet: {:?}", dir);
116            }
117        }
118
119        // Create a channel for file events
120        let (tx, mut rx) = mpsc::channel::<Vec<DebouncedEvent>>(100);
121
122        // Create the debounced watcher
123        let mut debouncer = new_debouncer(
124            Duration::from_millis(500),
125            move |events: Result<Vec<DebouncedEvent>, notify::Error>| {
126                if let Ok(events) = events {
127                    // Filter for JSONL and SQLite database files
128                    let filtered: Vec<DebouncedEvent> = events
129                        .into_iter()
130                        .filter(|e| {
131                            let ext = e.path.extension().and_then(|ext| ext.to_str());
132                            matches!(ext, Some("jsonl") | Some("vscdb"))
133                        })
134                        .collect();
135
136                    if !filtered.is_empty() {
137                        let _ = tx.blocking_send(filtered);
138                    }
139                }
140            },
141        )
142        .context("Failed to create file watcher")?;
143
144        // Start watching directories that exist
145        for dir in &self.watch_dirs {
146            if dir.exists() {
147                debouncer
148                    .watcher()
149                    .watch(dir, RecursiveMode::Recursive)
150                    .context(format!("Failed to start watching directory {dir:?}"))?;
151
152                tracing::info!("Watching for session files in {:?}", dir);
153            }
154        }
155
156        // Do an initial scan (sync, before entering async loop)
157        self.initial_scan(&stats).await?;
158
159        // Process events
160        loop {
161            tokio::select! {
162                Some(events) = rx.recv() => {
163                    for event in events {
164                        if let Err(e) = self.handle_file_event(&event.path, &stats).await {
165                            let error_msg = e.to_string();
166                            // Database unavailable errors are transient (e.g., during lore init)
167                            // Log at debug level to avoid spam
168                            if error_msg.contains("unable to open database")
169                                || error_msg.contains("database is locked")
170                            {
171                                tracing::debug!(
172                                    "Database temporarily unavailable for {:?}: {}",
173                                    event.path,
174                                    e
175                                );
176                            } else {
177                                tracing::warn!(
178                                    "Error handling file event for {:?}: {}",
179                                    event.path,
180                                    e
181                                );
182                                let mut stats_guard = stats.write().await;
183                                stats_guard.errors += 1;
184                            }
185                        }
186                    }
187                }
188                _ = shutdown_rx.recv() => {
189                    tracing::info!("Session watcher shutting down");
190                    break;
191                }
192            }
193        }
194
195        Ok(())
196    }
197
198    /// Opens a database connection for this operation.
199    fn open_db(&self) -> Result<Database> {
200        self.db_config.open()
201    }
202
203    /// Finds the watcher that owns a given file path.
204    ///
205    /// A watcher "owns" a path if one of its `watch_paths()` is an ancestor
206    /// of the given path. This ensures files are parsed by the correct watcher
207    /// rather than trying all watchers in arbitrary order.
208    ///
209    /// Returns `None` if no watcher claims the path.
210    fn find_owning_watcher<'a>(
211        path: &Path,
212        watchers: &'a [&'a dyn Watcher],
213    ) -> Option<&'a dyn Watcher> {
214        for watcher in watchers {
215            for watch_path in watcher.watch_paths() {
216                if path.starts_with(&watch_path) {
217                    return Some(*watcher);
218                }
219            }
220        }
221        None
222    }
223
224    /// Performs an initial scan of existing session files.
225    ///
226    /// Called when the watcher starts to import any sessions that were
227    /// created while the daemon was not running. Uses the watcher registry
228    /// to find session sources from all available watchers.
229    async fn initial_scan(&mut self, stats: &Arc<RwLock<DaemonStats>>) -> Result<()> {
230        tracing::info!("Performing initial scan of session files...");
231
232        let registry = default_registry();
233        let mut total_files = 0;
234
235        for watcher in registry.available_watchers() {
236            let watcher_name = watcher.info().name;
237            match watcher.find_sources() {
238                Ok(sources) => {
239                    tracing::info!("Found {} sources for {}", sources.len(), watcher_name);
240                    total_files += sources.len();
241
242                    for path in sources {
243                        // Process each file synchronously to avoid Send issues
244                        match self.process_file_sync(&path) {
245                            Ok(Some((sessions_imported, messages_imported))) => {
246                                let mut stats_guard = stats.write().await;
247                                stats_guard.sessions_imported += sessions_imported;
248                                stats_guard.messages_imported += messages_imported;
249                                stats_guard.files_watched = self.file_positions.len();
250                            }
251                            Ok(None) => {
252                                // File was already imported, just track position
253                            }
254                            Err(e) => {
255                                tracing::warn!("Failed to import {:?}: {}", path, e);
256                                let mut stats_guard = stats.write().await;
257                                stats_guard.errors += 1;
258                            }
259                        }
260                    }
261                }
262                Err(e) => {
263                    tracing::warn!("Failed to find sources for {}: {}", watcher_name, e);
264                }
265            }
266        }
267
268        {
269            let mut stats_guard = stats.write().await;
270            stats_guard.files_watched = total_files;
271        }
272
273        Ok(())
274    }
275
276    /// Handles a file system event for a session file.
277    async fn handle_file_event(
278        &mut self,
279        path: &Path,
280        stats: &Arc<RwLock<DaemonStats>>,
281    ) -> Result<()> {
282        let ext = path.extension().and_then(|e| e.to_str());
283
284        // Skip files that are not session sources
285        if !matches!(ext, Some("jsonl") | Some("vscdb")) {
286            return Ok(());
287        }
288
289        // Skip agent files (Claude Code specific)
290        if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
291            if name.starts_with("agent-") {
292                return Ok(());
293            }
294        }
295
296        // Check if file exists (might be a delete event)
297        if !path.exists() {
298            // File was deleted, remove from tracking
299            self.file_positions.remove(path);
300            return Ok(());
301        }
302
303        // Process the file synchronously
304        match self.process_file_sync(path) {
305            Ok(Some((sessions_imported, messages_imported))) => {
306                let mut stats_guard = stats.write().await;
307                stats_guard.sessions_imported += sessions_imported;
308                stats_guard.messages_imported += messages_imported;
309                stats_guard.files_watched = self.file_positions.len();
310            }
311            Ok(None) => {
312                // File unchanged or already processed
313            }
314            Err(e) => {
315                return Err(e);
316            }
317        }
318
319        Ok(())
320    }
321
322    /// Processes a file synchronously, returning import counts if anything was imported.
323    ///
324    /// Returns `Ok(Some((sessions, messages)))` if data was imported,
325    /// `Ok(None)` if the file was already processed, or an error.
326    ///
327    /// When a session file already exists in the database but has grown (new messages
328    /// added), this function re-imports the entire file. The database layer handles
329    /// deduplication: `insert_session` uses ON CONFLICT to update metadata, and
330    /// `insert_message` uses ON CONFLICT DO NOTHING to skip duplicates.
331    /// After re-import, auto-linking is triggered if the session has ended.
332    fn process_file_sync(&mut self, path: &Path) -> Result<Option<(u64, u64)>> {
333        let db = self.open_db()?;
334        let path_str = path.to_string_lossy();
335        let last_pos = self.file_positions.get(path).copied().unwrap_or(0);
336
337        // Get current file size
338        let metadata = std::fs::metadata(path).context("Failed to get file metadata")?;
339        let current_size = metadata.len();
340
341        if current_size <= last_pos {
342            // File hasn't grown (might have been truncated)
343            if current_size < last_pos {
344                // File was truncated, reset position
345                self.file_positions.insert(path.to_path_buf(), 0);
346            }
347            return Ok(None);
348        }
349
350        // Check if session already exists in the database
351        let existing_session = db.get_session_by_source(&path_str)?;
352
353        if let Some(existing) = existing_session {
354            // Session exists but file has grown - re-import to get new messages
355            // and updated metadata, then run auto-linking
356            tracing::debug!(
357                "Session {} exists but file has grown, re-importing for updates",
358                &existing.id.to_string()[..8]
359            );
360            let result = self.update_existing_session(path, &db, &existing)?;
361            self.file_positions.insert(path.to_path_buf(), current_size);
362            return Ok(Some(result));
363        }
364
365        // New session file - import it
366        let result = self.import_file_sync(path, &db)?;
367
368        // Update tracked position
369        self.file_positions.insert(path.to_path_buf(), current_size);
370
371        Ok(Some(result))
372    }
373
374    /// Updates an existing session by re-importing the file.
375    ///
376    /// Re-parses the session file and updates the database. The database layer
377    /// handles deduplication automatically. After updating, triggers auto-linking
378    /// if the session has an ended_at timestamp.
379    ///
380    /// Uses path-based dispatch to find the correct watcher for the file.
381    ///
382    /// Returns (0, new_messages_imported) since the session already exists.
383    fn update_existing_session(
384        &self,
385        path: &Path,
386        db: &Database,
387        existing_session: &crate::storage::models::Session,
388    ) -> Result<(u64, u64)> {
389        tracing::debug!("Updating existing session from: {:?}", path);
390
391        let path_buf = path.to_path_buf();
392        let registry = default_registry();
393        let available = registry.available_watchers();
394
395        // Find the watcher that owns this path
396        let owning_watcher = match Self::find_owning_watcher(path, &available) {
397            Some(w) => w,
398            None => {
399                tracing::debug!("No watcher owns path {:?}", path);
400                return Ok((0, 0));
401            }
402        };
403
404        // Parse with the owning watcher
405        let parsed_sessions = match owning_watcher.parse_source(&path_buf) {
406            Ok(sessions) => sessions,
407            Err(e) => {
408                tracing::debug!(
409                    "Watcher {} could not parse {:?}: {}",
410                    owning_watcher.info().name,
411                    path,
412                    e
413                );
414                return Ok((0, 0));
415            }
416        };
417
418        if parsed_sessions.is_empty() {
419            tracing::debug!(
420                "Watcher {} returned no sessions for {:?}",
421                owning_watcher.info().name,
422                path
423            );
424            return Ok((0, 0));
425        }
426
427        let mut total_messages = 0u64;
428        let mut updated_session: Option<crate::storage::models::Session> = None;
429
430        for (session, messages) in parsed_sessions {
431            if messages.is_empty() {
432                continue;
433            }
434
435            // Update session metadata (ended_at, message_count, git_branch)
436            // insert_session uses ON CONFLICT to update these fields
437            db.insert_session(&session)?;
438
439            // Track the most recent branch from messages
440            let mut latest_branch: Option<String> = None;
441            let mut new_message_count = 0u64;
442
443            for msg in &messages {
444                // insert_message uses ON CONFLICT DO NOTHING, so duplicates are skipped
445                // We don't have a reliable way to count only new messages, but we track
446                // the total for logging purposes
447                db.insert_message(msg)?;
448                new_message_count += 1;
449
450                if msg.git_branch.is_some() {
451                    latest_branch = msg.git_branch.clone();
452                }
453            }
454
455            // Update session branch if messages show a different branch
456            if let Some(ref new_branch) = latest_branch {
457                if session.git_branch.as_ref() != Some(new_branch) {
458                    if let Err(e) = db.update_session_branch(session.id, new_branch) {
459                        tracing::warn!(
460                            "Failed to update session branch for {}: {}",
461                            &session.id.to_string()[..8],
462                            e
463                        );
464                    } else {
465                        tracing::debug!(
466                            "Updated session {} branch to {}",
467                            &session.id.to_string()[..8],
468                            new_branch
469                        );
470                    }
471                }
472            }
473
474            total_messages += new_message_count;
475            updated_session = Some(session);
476        }
477
478        // Run auto-linking if the session has ended
479        // This is the key fix: we now run auto-linking for updated sessions
480        if let Some(session) = updated_session {
481            if let Some(ended_at) = session.ended_at {
482                // Only log if session was previously ongoing
483                if existing_session.ended_at.is_none() {
484                    tracing::info!(
485                        "Session {} has ended, running auto-link",
486                        &session.id.to_string()[..8]
487                    );
488                }
489
490                let linked = self.auto_link_session_commits(
491                    db,
492                    session.id,
493                    &session.working_directory,
494                    session.started_at,
495                    ended_at,
496                );
497                if let Err(e) = linked {
498                    tracing::warn!(
499                        "Failed to auto-link commits for session {}: {}",
500                        &session.id.to_string()[..8],
501                        e
502                    );
503                }
504            }
505        }
506
507        // Return 0 sessions since we updated an existing one, not imported new
508        Ok((0, total_messages))
509    }
510
511    /// Imports a complete session file synchronously.
512    /// Returns (sessions_imported, messages_imported) counts.
513    ///
514    /// Uses path-based dispatch to find the correct watcher for the file.
515    /// Files are parsed only by the watcher that owns their parent directory,
516    /// preventing incorrect parsing by unrelated watchers.
517    fn import_file_sync(&mut self, path: &Path, db: &Database) -> Result<(u64, u64)> {
518        tracing::debug!("Importing session file: {:?}", path);
519
520        let path_buf = path.to_path_buf();
521        let registry = default_registry();
522        let available = registry.available_watchers();
523
524        // Find the watcher that owns this path
525        let owning_watcher = match Self::find_owning_watcher(path, &available) {
526            Some(w) => w,
527            None => {
528                tracing::debug!("No watcher owns path {:?}", path);
529                return Ok((0, 0));
530            }
531        };
532
533        // Parse with the owning watcher
534        let parsed_sessions = match owning_watcher.parse_source(&path_buf) {
535            Ok(sessions) => sessions,
536            Err(e) => {
537                tracing::debug!(
538                    "Watcher {} could not parse {:?}: {}",
539                    owning_watcher.info().name,
540                    path,
541                    e
542                );
543                return Ok((0, 0));
544            }
545        };
546
547        if parsed_sessions.is_empty() {
548            tracing::debug!(
549                "Watcher {} returned no sessions for {:?}",
550                owning_watcher.info().name,
551                path
552            );
553            return Ok((0, 0));
554        }
555
556        let mut total_sessions = 0u64;
557        let mut total_messages = 0u64;
558
559        for (session, messages) in parsed_sessions {
560            if messages.is_empty() {
561                continue;
562            }
563
564            let message_count = messages.len();
565
566            // Store session
567            db.insert_session(&session)?;
568
569            // Store messages and track the most recent branch
570            let mut latest_branch: Option<String> = None;
571            for msg in &messages {
572                db.insert_message(msg)?;
573                // Track the branch from the most recent message that has one
574                if msg.git_branch.is_some() {
575                    latest_branch = msg.git_branch.clone();
576                }
577            }
578
579            // Update session branch if the latest message has a different branch
580            // This handles the case where the user switches branches mid-session
581            if let Some(ref new_branch) = latest_branch {
582                if session.git_branch.as_ref() != Some(new_branch) {
583                    if let Err(e) = db.update_session_branch(session.id, new_branch) {
584                        tracing::warn!(
585                            "Failed to update session branch for {}: {}",
586                            &session.id.to_string()[..8],
587                            e
588                        );
589                    } else {
590                        tracing::debug!(
591                            "Updated session {} branch to {}",
592                            &session.id.to_string()[..8],
593                            new_branch
594                        );
595                    }
596                }
597            }
598
599            tracing::info!(
600                "Imported session {} with {} messages from {:?}",
601                &session.id.to_string()[..8],
602                message_count,
603                path.file_name().unwrap_or_default()
604            );
605
606            // Auto-link commits if session has ended
607            if let Some(ended_at) = session.ended_at {
608                let linked = self.auto_link_session_commits(
609                    db,
610                    session.id,
611                    &session.working_directory,
612                    session.started_at,
613                    ended_at,
614                );
615                if let Err(e) = linked {
616                    tracing::warn!(
617                        "Failed to auto-link commits for session {}: {}",
618                        &session.id.to_string()[..8],
619                        e
620                    );
621                }
622            }
623
624            total_sessions += 1;
625            total_messages += message_count as u64;
626        }
627
628        // Update file position
629        if let Ok(metadata) = std::fs::metadata(path) {
630            self.file_positions
631                .insert(path.to_path_buf(), metadata.len());
632        }
633
634        Ok((total_sessions, total_messages))
635    }
636
637    /// Auto-links commits made during a session's time window.
638    ///
639    /// Finds all commits in the session's working directory that were made
640    /// between the session's start and end time, then creates links for any
641    /// commits that are not already linked to this session.
642    ///
643    /// # Arguments
644    ///
645    /// * `db` - Database connection for storing links
646    /// * `session_id` - The session to link commits to
647    /// * `working_directory` - Path to the repository working directory
648    /// * `started_at` - Session start time
649    /// * `ended_at` - Session end time
650    ///
651    /// # Returns
652    ///
653    /// The number of commits that were linked.
654    fn auto_link_session_commits(
655        &self,
656        db: &Database,
657        session_id: Uuid,
658        working_directory: &str,
659        started_at: chrono::DateTime<Utc>,
660        ended_at: chrono::DateTime<Utc>,
661    ) -> Result<usize> {
662        let working_dir = Path::new(working_directory);
663
664        // Check if working directory is a git repository
665        if !working_dir.exists() {
666            tracing::debug!("Working directory does not exist: {}", working_directory);
667            return Ok(0);
668        }
669
670        // Get commits in the session's time range
671        let commits = match get_commits_in_time_range(working_dir, started_at, ended_at) {
672            Ok(commits) => commits,
673            Err(e) => {
674                // Not a git repository or other git error - this is expected
675                // for sessions outside git repos
676                tracing::debug!("Could not get commits for {}: {}", working_directory, e);
677                return Ok(0);
678            }
679        };
680
681        if commits.is_empty() {
682            tracing::debug!(
683                "No commits found in time range for session {}",
684                &session_id.to_string()[..8]
685            );
686            return Ok(0);
687        }
688
689        let mut linked_count = 0;
690
691        for commit in commits {
692            // Skip if link already exists
693            if db.link_exists(&session_id, &commit.sha)? {
694                tracing::debug!(
695                    "Link already exists for session {} and commit {}",
696                    &session_id.to_string()[..8],
697                    &commit.sha[..8]
698                );
699                continue;
700            }
701
702            // Create the link
703            let link = SessionLink {
704                id: Uuid::new_v4(),
705                session_id,
706                link_type: LinkType::Commit,
707                commit_sha: Some(commit.sha.clone()),
708                branch: commit.branch.clone(),
709                remote: None,
710                created_at: Utc::now(),
711                created_by: LinkCreator::Auto,
712                confidence: Some(1.0), // Direct time match is high confidence
713            };
714
715            db.insert_link(&link)?;
716            linked_count += 1;
717
718            tracing::info!(
719                "Auto-linked commit {} to session {} ({})",
720                &commit.sha[..8],
721                &session_id.to_string()[..8],
722                commit.summary.chars().take(50).collect::<String>()
723            );
724        }
725
726        if linked_count > 0 {
727            tracing::info!(
728                "Auto-linked {} commits to session {}",
729                linked_count,
730                &session_id.to_string()[..8]
731            );
732        }
733
734        Ok(linked_count)
735    }
736
737    /// Returns the number of files currently being tracked.
738    ///
739    /// This method is part of the public API for status reporting
740    /// and is used by tests.
741    #[allow(dead_code)]
742    pub fn tracked_file_count(&self) -> usize {
743        self.file_positions.len()
744    }
745}
746
747#[cfg(test)]
748mod tests {
749    use super::*;
750    use crate::storage::models::Session;
751    use chrono::Duration;
752    use tempfile::tempdir;
753
754    /// Creates a test database in the given directory.
755    fn create_test_db(dir: &Path) -> Database {
756        let db_path = dir.join("test.db");
757        Database::open(&db_path).expect("Failed to open test database")
758    }
759
760    /// Creates a test session with specified time window.
761    fn create_test_session_with_times(
762        working_directory: &str,
763        started_at: chrono::DateTime<Utc>,
764        ended_at: chrono::DateTime<Utc>,
765    ) -> Session {
766        Session {
767            id: Uuid::new_v4(),
768            tool: "test-tool".to_string(),
769            tool_version: Some("1.0.0".to_string()),
770            started_at,
771            ended_at: Some(ended_at),
772            model: Some("test-model".to_string()),
773            working_directory: working_directory.to_string(),
774            git_branch: Some("main".to_string()),
775            source_path: None,
776            message_count: 0,
777            machine_id: Some("test-machine".to_string()),
778        }
779    }
780
781    /// Creates a test commit in the repository with a specific timestamp.
782    ///
783    /// Returns the full SHA of the created commit.
784    fn create_test_commit(
785        repo: &git2::Repository,
786        message: &str,
787        time: chrono::DateTime<Utc>,
788    ) -> String {
789        let sig = git2::Signature::new(
790            "Test User",
791            "test@example.com",
792            &git2::Time::new(time.timestamp(), 0),
793        )
794        .expect("Failed to create signature");
795
796        // Get current tree (or create empty tree for first commit)
797        let tree_id = {
798            let mut index = repo.index().expect("Failed to get index");
799
800            // Create a test file to have something to commit
801            let file_path = repo
802                .workdir()
803                .unwrap()
804                .join(format!("test_{}.txt", Uuid::new_v4()));
805            std::fs::write(&file_path, format!("Content for: {message}"))
806                .expect("Failed to write test file");
807
808            index
809                .add_path(file_path.strip_prefix(repo.workdir().unwrap()).unwrap())
810                .expect("Failed to add file to index");
811            index.write().expect("Failed to write index");
812            index.write_tree().expect("Failed to write tree")
813        };
814
815        let tree = repo.find_tree(tree_id).expect("Failed to find tree");
816
817        // Get parent commit if it exists
818        let parent = repo.head().ok().and_then(|h| h.peel_to_commit().ok());
819
820        let commit_id = if let Some(parent) = parent {
821            repo.commit(Some("HEAD"), &sig, &sig, message, &tree, &[&parent])
822                .expect("Failed to create commit")
823        } else {
824            repo.commit(Some("HEAD"), &sig, &sig, message, &tree, &[])
825                .expect("Failed to create commit")
826        };
827
828        commit_id.to_string()
829    }
830
831    /// Initializes a new git repository in the given directory.
832    fn init_test_repo(dir: &Path) -> git2::Repository {
833        git2::Repository::init(dir).expect("Failed to init test repo")
834    }
835
836    #[test]
837    fn test_session_watcher_creation() {
838        let watcher = SessionWatcher::new();
839        assert!(watcher.is_ok(), "Should create watcher successfully");
840
841        // SessionWatcher creation should succeed even if no watchers are
842        // available (e.g., in CI environments where ~/.claude doesn't exist).
843        // The watch_dirs list may be empty in such environments.
844        let _watcher = watcher.unwrap();
845    }
846
847    #[test]
848    fn test_watch_dirs_from_registry() {
849        use crate::capture::watchers::default_registry;
850
851        // Test that the registry is configured with known watcher paths.
852        // Note: SessionWatcher.watch_dirs() only includes paths from AVAILABLE
853        // watchers. In CI environments, no watchers may be available because
854        // their directories (like ~/.claude) don't exist.
855
856        // Instead of testing through SessionWatcher, we verify the registry
857        // directly by checking all_watchers() (not just available ones).
858        let registry = default_registry();
859        let all_watchers = registry.all_watchers();
860
861        // Collect watch paths from ALL watchers (including unavailable ones)
862        let all_paths: Vec<_> = all_watchers.iter().flat_map(|w| w.watch_paths()).collect();
863
864        let has_claude = all_paths
865            .iter()
866            .any(|d| d.to_string_lossy().contains(".claude"));
867        let has_cursor = all_paths
868            .iter()
869            .any(|d| d.to_string_lossy().contains("Cursor"));
870
871        // The registry should have paths configured for known watchers.
872        assert!(
873            has_claude || has_cursor,
874            "Registry should configure at least one known watcher path pattern \
875             (expected .claude or Cursor in paths). Found paths: {all_paths:?}"
876        );
877    }
878
879    #[test]
880    fn test_tracked_file_count_initial() {
881        let watcher = SessionWatcher::new().unwrap();
882        assert_eq!(
883            watcher.tracked_file_count(),
884            0,
885            "Should start with no tracked files"
886        );
887    }
888
889    #[test]
890    fn test_db_config_creation() {
891        let config = DbConfig::default_config();
892        assert!(config.is_ok(), "Should create DbConfig successfully");
893    }
894
895    #[test]
896    fn test_file_position_tracking() {
897        let mut watcher = SessionWatcher::new().unwrap();
898
899        let path1 = PathBuf::from("/test/file1.jsonl");
900        let path2 = PathBuf::from("/test/file2.jsonl");
901
902        watcher.file_positions.insert(path1.clone(), 100);
903        watcher.file_positions.insert(path2.clone(), 200);
904
905        assert_eq!(watcher.tracked_file_count(), 2);
906        assert_eq!(watcher.file_positions.get(&path1), Some(&100));
907        assert_eq!(watcher.file_positions.get(&path2), Some(&200));
908    }
909
910    // ==================== find_owning_watcher Tests ====================
911
912    #[test]
913    fn test_find_owning_watcher_matches_path_under_watch_dir() {
914        use crate::capture::watchers::{Watcher, WatcherInfo};
915        use crate::storage::models::{Message, Session};
916
917        struct TestWatcher {
918            name: &'static str,
919            watch_path: PathBuf,
920        }
921
922        impl Watcher for TestWatcher {
923            fn info(&self) -> WatcherInfo {
924                WatcherInfo {
925                    name: self.name,
926                    description: "Test",
927                    default_paths: vec![],
928                }
929            }
930            fn is_available(&self) -> bool {
931                true
932            }
933            fn find_sources(&self) -> Result<Vec<PathBuf>> {
934                Ok(vec![])
935            }
936            fn parse_source(&self, _: &Path) -> Result<Vec<(Session, Vec<Message>)>> {
937                Ok(vec![])
938            }
939            fn watch_paths(&self) -> Vec<PathBuf> {
940                vec![self.watch_path.clone()]
941            }
942        }
943
944        let watcher1 = TestWatcher {
945            name: "watcher-a",
946            watch_path: PathBuf::from("/home/user/.claude/projects"),
947        };
948        let watcher2 = TestWatcher {
949            name: "watcher-b",
950            watch_path: PathBuf::from("/home/user/.aider"),
951        };
952
953        let watchers: Vec<&dyn Watcher> = vec![&watcher1, &watcher2];
954
955        // File under watcher1's path
956        let claude_file = Path::new("/home/user/.claude/projects/myproject/session.jsonl");
957        let result = SessionWatcher::find_owning_watcher(claude_file, &watchers);
958        assert!(result.is_some());
959        assert_eq!(result.unwrap().info().name, "watcher-a");
960
961        // File under watcher2's path
962        let aider_file = Path::new("/home/user/.aider/history.md");
963        let result = SessionWatcher::find_owning_watcher(aider_file, &watchers);
964        assert!(result.is_some());
965        assert_eq!(result.unwrap().info().name, "watcher-b");
966
967        // File not under any watch path
968        let other_file = Path::new("/home/user/projects/random.txt");
969        let result = SessionWatcher::find_owning_watcher(other_file, &watchers);
970        assert!(result.is_none());
971    }
972
973    // ==================== auto_link_session_commits Tests ====================
974
975    #[test]
976    fn test_auto_link_creates_links_for_commits_in_time_range() {
977        // Create temp directory for both git repo and database
978        let dir = tempdir().expect("Failed to create temp directory");
979        let repo_path = dir.path();
980
981        // Initialize git repo
982        let repo = init_test_repo(repo_path);
983
984        // Create database in the temp directory
985        let db = create_test_db(repo_path);
986
987        // Define time window for session: 1 hour ago to now
988        let now = Utc::now();
989        let session_start = now - Duration::hours(1);
990        let session_end = now;
991
992        // Create commits within the session's time window
993        let commit_time1 = session_start + Duration::minutes(10);
994        let commit_time2 = session_start + Duration::minutes(30);
995        let commit_time3 = session_start + Duration::minutes(50);
996
997        let sha1 = create_test_commit(&repo, "First commit in session", commit_time1);
998        let sha2 = create_test_commit(&repo, "Second commit in session", commit_time2);
999        let sha3 = create_test_commit(&repo, "Third commit in session", commit_time3);
1000
1001        // Create and insert session
1002        let session = create_test_session_with_times(
1003            &repo_path.to_string_lossy(),
1004            session_start,
1005            session_end,
1006        );
1007        db.insert_session(&session)
1008            .expect("Failed to insert session");
1009
1010        // Create a minimal watcher for testing
1011        let watcher = SessionWatcher {
1012            file_positions: HashMap::new(),
1013            watch_dirs: vec![],
1014            db_config: DbConfig {
1015                path: repo_path.join("test.db"),
1016            },
1017        };
1018
1019        // Call auto_link_session_commits
1020        let linked_count = watcher
1021            .auto_link_session_commits(
1022                &db,
1023                session.id,
1024                &repo_path.to_string_lossy(),
1025                session_start,
1026                session_end,
1027            )
1028            .expect("auto_link_session_commits should succeed");
1029
1030        // Assert correct number of links created
1031        assert_eq!(linked_count, 3, "Should have linked 3 commits");
1032
1033        // Verify each commit is linked
1034        assert!(
1035            db.link_exists(&session.id, &sha1)
1036                .expect("link_exists should succeed"),
1037            "First commit should be linked"
1038        );
1039        assert!(
1040            db.link_exists(&session.id, &sha2)
1041                .expect("link_exists should succeed"),
1042            "Second commit should be linked"
1043        );
1044        assert!(
1045            db.link_exists(&session.id, &sha3)
1046                .expect("link_exists should succeed"),
1047            "Third commit should be linked"
1048        );
1049    }
1050
1051    #[test]
1052    fn test_auto_link_skips_commits_outside_time_range() {
1053        // Create temp directory
1054        let dir = tempdir().expect("Failed to create temp directory");
1055        let repo_path = dir.path();
1056
1057        // Initialize git repo
1058        let repo = init_test_repo(repo_path);
1059
1060        // Create database
1061        let db = create_test_db(repo_path);
1062
1063        // Define a narrow time window: 30 minutes to 20 minutes ago
1064        let now = Utc::now();
1065        let session_start = now - Duration::minutes(30);
1066        let session_end = now - Duration::minutes(20);
1067
1068        // Create commits BEFORE the session window (40 minutes ago)
1069        let before_time = now - Duration::minutes(40);
1070        let sha_before = create_test_commit(&repo, "Commit before session", before_time);
1071
1072        // Create commit INSIDE the session window (25 minutes ago)
1073        let inside_time = now - Duration::minutes(25);
1074        let sha_inside = create_test_commit(&repo, "Commit inside session", inside_time);
1075
1076        // Create commit AFTER the session window (10 minutes ago)
1077        let after_time = now - Duration::minutes(10);
1078        let sha_after = create_test_commit(&repo, "Commit after session", after_time);
1079
1080        // Create and insert session
1081        let session = create_test_session_with_times(
1082            &repo_path.to_string_lossy(),
1083            session_start,
1084            session_end,
1085        );
1086        db.insert_session(&session)
1087            .expect("Failed to insert session");
1088
1089        // Create watcher and call auto_link
1090        let watcher = SessionWatcher {
1091            file_positions: HashMap::new(),
1092            watch_dirs: vec![],
1093            db_config: DbConfig {
1094                path: repo_path.join("test.db"),
1095            },
1096        };
1097
1098        let linked_count = watcher
1099            .auto_link_session_commits(
1100                &db,
1101                session.id,
1102                &repo_path.to_string_lossy(),
1103                session_start,
1104                session_end,
1105            )
1106            .expect("auto_link_session_commits should succeed");
1107
1108        // Only the commit inside the window should be linked
1109        assert_eq!(linked_count, 1, "Should have linked only 1 commit");
1110
1111        // Verify the commit before is NOT linked
1112        assert!(
1113            !db.link_exists(&session.id, &sha_before)
1114                .expect("link_exists should succeed"),
1115            "Commit before session should NOT be linked"
1116        );
1117
1118        // Verify the commit inside IS linked
1119        assert!(
1120            db.link_exists(&session.id, &sha_inside)
1121                .expect("link_exists should succeed"),
1122            "Commit inside session should be linked"
1123        );
1124
1125        // Verify the commit after is NOT linked
1126        assert!(
1127            !db.link_exists(&session.id, &sha_after)
1128                .expect("link_exists should succeed"),
1129            "Commit after session should NOT be linked"
1130        );
1131    }
1132
1133    #[test]
1134    fn test_auto_link_skips_existing_links() {
1135        // Create temp directory
1136        let dir = tempdir().expect("Failed to create temp directory");
1137        let repo_path = dir.path();
1138
1139        // Initialize git repo
1140        let repo = init_test_repo(repo_path);
1141
1142        // Create database
1143        let db = create_test_db(repo_path);
1144
1145        // Define time window
1146        let now = Utc::now();
1147        let session_start = now - Duration::hours(1);
1148        let session_end = now;
1149
1150        // Create a commit in the time window
1151        let commit_time = session_start + Duration::minutes(30);
1152        let sha = create_test_commit(&repo, "Test commit", commit_time);
1153
1154        // Create and insert session
1155        let session = create_test_session_with_times(
1156            &repo_path.to_string_lossy(),
1157            session_start,
1158            session_end,
1159        );
1160        db.insert_session(&session)
1161            .expect("Failed to insert session");
1162
1163        // Manually create a link for the commit
1164        let existing_link = SessionLink {
1165            id: Uuid::new_v4(),
1166            session_id: session.id,
1167            link_type: LinkType::Commit,
1168            commit_sha: Some(sha.clone()),
1169            branch: Some("main".to_string()),
1170            remote: None,
1171            created_at: Utc::now(),
1172            created_by: LinkCreator::Auto,
1173            confidence: Some(1.0),
1174        };
1175        db.insert_link(&existing_link)
1176            .expect("Failed to insert existing link");
1177
1178        // Create watcher and call auto_link
1179        let watcher = SessionWatcher {
1180            file_positions: HashMap::new(),
1181            watch_dirs: vec![],
1182            db_config: DbConfig {
1183                path: repo_path.join("test.db"),
1184            },
1185        };
1186
1187        let linked_count = watcher
1188            .auto_link_session_commits(
1189                &db,
1190                session.id,
1191                &repo_path.to_string_lossy(),
1192                session_start,
1193                session_end,
1194            )
1195            .expect("auto_link_session_commits should succeed");
1196
1197        // Should return 0 since the link already exists
1198        assert_eq!(
1199            linked_count, 0,
1200            "Should not create any new links when link already exists"
1201        );
1202
1203        // Verify the link still exists (and there is only one)
1204        assert!(
1205            db.link_exists(&session.id, &sha)
1206                .expect("link_exists should succeed"),
1207            "Link should still exist"
1208        );
1209    }
1210
1211    #[test]
1212    fn test_auto_link_handles_non_git_directory() {
1213        // Create temp directory that is NOT a git repo
1214        let dir = tempdir().expect("Failed to create temp directory");
1215        let non_repo_path = dir.path();
1216
1217        // Create database
1218        let db = create_test_db(non_repo_path);
1219
1220        // Define time window
1221        let now = Utc::now();
1222        let session_start = now - Duration::hours(1);
1223        let session_end = now;
1224
1225        // Create and insert session pointing to non-git directory
1226        let session = create_test_session_with_times(
1227            &non_repo_path.to_string_lossy(),
1228            session_start,
1229            session_end,
1230        );
1231        db.insert_session(&session)
1232            .expect("Failed to insert session");
1233
1234        // Create watcher and call auto_link
1235        let watcher = SessionWatcher {
1236            file_positions: HashMap::new(),
1237            watch_dirs: vec![],
1238            db_config: DbConfig {
1239                path: non_repo_path.join("test.db"),
1240            },
1241        };
1242
1243        let result = watcher.auto_link_session_commits(
1244            &db,
1245            session.id,
1246            &non_repo_path.to_string_lossy(),
1247            session_start,
1248            session_end,
1249        );
1250
1251        // Should return Ok(0), not an error
1252        assert!(
1253            result.is_ok(),
1254            "Should handle non-git directory gracefully: {:?}",
1255            result.err()
1256        );
1257        assert_eq!(result.unwrap(), 0, "Should return 0 for non-git directory");
1258    }
1259
1260    #[test]
1261    fn test_auto_link_finds_commits_on_multiple_branches() {
1262        // Create temp directory
1263        let dir = tempdir().expect("Failed to create temp directory");
1264        let repo_path = dir.path();
1265
1266        // Initialize git repo
1267        let repo = init_test_repo(repo_path);
1268
1269        // Create database
1270        let db = create_test_db(repo_path);
1271
1272        // Define time window: 1 hour ago to now
1273        let now = Utc::now();
1274        let session_start = now - Duration::hours(1);
1275        let session_end = now;
1276
1277        // Create initial commit on main branch (default)
1278        let main_commit_time = session_start + Duration::minutes(10);
1279        let sha_main = create_test_commit(&repo, "Commit on main", main_commit_time);
1280
1281        // Get the default branch name (could be master or main depending on git config)
1282        let head_ref = repo.head().expect("Should have HEAD after commit");
1283        let default_branch = head_ref
1284            .shorthand()
1285            .expect("HEAD should have a name")
1286            .to_string();
1287
1288        // Create a feature branch and switch to it
1289        let main_commit = head_ref.peel_to_commit().unwrap();
1290        repo.branch("feature-branch", &main_commit, false)
1291            .expect("Failed to create branch");
1292        repo.set_head("refs/heads/feature-branch")
1293            .expect("Failed to switch branch");
1294
1295        // Create commit on feature branch
1296        let feature_commit_time = session_start + Duration::minutes(30);
1297        let sha_feature = create_test_commit(&repo, "Commit on feature", feature_commit_time);
1298
1299        // Switch back to default branch and create another commit
1300        repo.set_head(&format!("refs/heads/{}", default_branch))
1301            .expect("Failed to switch to default branch");
1302        // Need to reset the working directory to default branch
1303        let main_obj = repo
1304            .revparse_single(&default_branch)
1305            .expect("Should find default branch");
1306        repo.reset(&main_obj, git2::ResetType::Hard, None)
1307            .expect("Failed to reset to default branch");
1308
1309        let main_commit_time2 = session_start + Duration::minutes(50);
1310        let sha_main2 = create_test_commit(&repo, "Second commit on main", main_commit_time2);
1311
1312        // Create and insert session
1313        let session = create_test_session_with_times(
1314            &repo_path.to_string_lossy(),
1315            session_start,
1316            session_end,
1317        );
1318        db.insert_session(&session)
1319            .expect("Failed to insert session");
1320
1321        // Create watcher and call auto_link
1322        let watcher = SessionWatcher {
1323            file_positions: HashMap::new(),
1324            watch_dirs: vec![],
1325            db_config: DbConfig {
1326                path: repo_path.join("test.db"),
1327            },
1328        };
1329
1330        let linked_count = watcher
1331            .auto_link_session_commits(
1332                &db,
1333                session.id,
1334                &repo_path.to_string_lossy(),
1335                session_start,
1336                session_end,
1337            )
1338            .expect("auto_link_session_commits should succeed");
1339
1340        // Should link all 3 commits from both branches
1341        assert_eq!(
1342            linked_count, 3,
1343            "Should have linked commits from both branches"
1344        );
1345
1346        // Verify each commit is linked
1347        assert!(
1348            db.link_exists(&session.id, &sha_main)
1349                .expect("link_exists should succeed"),
1350            "First main commit should be linked"
1351        );
1352        assert!(
1353            db.link_exists(&session.id, &sha_feature)
1354                .expect("link_exists should succeed"),
1355            "Feature branch commit should be linked"
1356        );
1357        assert!(
1358            db.link_exists(&session.id, &sha_main2)
1359                .expect("link_exists should succeed"),
1360            "Second main commit should be linked"
1361        );
1362    }
1363
1364    #[test]
1365    fn test_update_existing_session_triggers_auto_link() {
1366        // This test verifies that when a session already exists in the database
1367        // and the file grows (new messages), re-importing triggers auto-linking.
1368
1369        // Create temp directory
1370        let dir = tempdir().expect("Failed to create temp directory");
1371        let repo_path = dir.path();
1372
1373        // Initialize git repo and create database
1374        let repo = init_test_repo(repo_path);
1375        let db = create_test_db(repo_path);
1376
1377        // Define time window
1378        let now = Utc::now();
1379        let session_start = now - Duration::hours(1);
1380        let session_end = now;
1381
1382        // Create a commit during the session
1383        let commit_time = session_start + Duration::minutes(30);
1384        let sha = create_test_commit(&repo, "Commit during session", commit_time);
1385
1386        // Create a session WITHOUT ended_at (simulating ongoing session from CLI import)
1387        let session_id = Uuid::new_v4();
1388        let ongoing_session = Session {
1389            id: session_id,
1390            tool: "test-tool".to_string(),
1391            tool_version: Some("1.0.0".to_string()),
1392            started_at: session_start,
1393            ended_at: None, // Not ended yet
1394            model: Some("test-model".to_string()),
1395            working_directory: repo_path.to_string_lossy().to_string(),
1396            git_branch: Some("main".to_string()),
1397            source_path: Some("/test/session.jsonl".to_string()),
1398            message_count: 5,
1399            machine_id: Some("test-machine".to_string()),
1400        };
1401
1402        db.insert_session(&ongoing_session)
1403            .expect("Failed to insert session");
1404
1405        // Verify commit is NOT linked yet (since session has not ended)
1406        assert!(
1407            !db.link_exists(&session_id, &sha)
1408                .expect("link_exists should succeed"),
1409            "Commit should NOT be linked to ongoing session"
1410        );
1411
1412        // Now create an updated session with ended_at set
1413        let ended_session = Session {
1414            id: session_id,
1415            tool: "test-tool".to_string(),
1416            tool_version: Some("1.0.0".to_string()),
1417            started_at: session_start,
1418            ended_at: Some(session_end), // Now ended
1419            model: Some("test-model".to_string()),
1420            working_directory: repo_path.to_string_lossy().to_string(),
1421            git_branch: Some("main".to_string()),
1422            source_path: Some("/test/session.jsonl".to_string()),
1423            message_count: 10,
1424            machine_id: Some("test-machine".to_string()),
1425        };
1426
1427        // Create watcher
1428        let watcher = SessionWatcher {
1429            file_positions: HashMap::new(),
1430            watch_dirs: vec![],
1431            db_config: DbConfig {
1432                path: repo_path.join("test.db"),
1433            },
1434        };
1435
1436        // Simulate what update_existing_session does:
1437        // 1. Update session in DB
1438        db.insert_session(&ended_session)
1439            .expect("Failed to update session");
1440
1441        // 2. Run auto-linking (this is what the fix enables)
1442        let linked_count = watcher
1443            .auto_link_session_commits(
1444                &db,
1445                session_id,
1446                &repo_path.to_string_lossy(),
1447                session_start,
1448                session_end,
1449            )
1450            .expect("auto_link_session_commits should succeed");
1451
1452        // Verify the commit is now linked
1453        assert_eq!(linked_count, 1, "Should have linked 1 commit");
1454        assert!(
1455            db.link_exists(&session_id, &sha)
1456                .expect("link_exists should succeed"),
1457            "Commit should be linked after session ended"
1458        );
1459    }
1460}