1use anyhow::{Context, Result};
11use chrono::Utc;
12use notify::RecursiveMode;
13use notify_debouncer_mini::{new_debouncer, DebouncedEvent};
14use std::collections::HashMap;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::{mpsc, RwLock};
19use uuid::Uuid;
20
21use crate::capture::watchers::{default_registry, Watcher};
22use crate::git::get_commits_in_time_range;
23use crate::storage::models::{LinkCreator, LinkType, SessionLink};
24use crate::storage::Database;
25
26use super::state::DaemonStats;
27
28#[derive(Clone)]
32pub struct DbConfig {
33 path: PathBuf,
34}
35
36impl DbConfig {
37 pub fn default_config() -> Result<Self> {
39 let path = crate::storage::db::default_db_path()?;
40 Ok(Self { path })
41 }
42
43 pub fn open(&self) -> Result<Database> {
45 Database::open(&self.path)
46 }
47}
48
49pub struct SessionWatcher {
54 file_positions: HashMap<PathBuf, u64>,
56 watch_dirs: Vec<PathBuf>,
58 db_config: DbConfig,
60}
61
62impl SessionWatcher {
63 pub fn new() -> Result<Self> {
72 let registry = default_registry();
73 let watch_dirs = registry.all_watch_paths();
74
75 let db_config = DbConfig::default_config()?;
76
77 Ok(Self {
78 file_positions: HashMap::new(),
79 watch_dirs,
80 db_config,
81 })
82 }
83
84 #[allow(dead_code)]
89 pub fn watch_dirs(&self) -> &[PathBuf] {
90 &self.watch_dirs
91 }
92
93 pub async fn watch(
106 &mut self,
107 stats: Arc<RwLock<DaemonStats>>,
108 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
109 ) -> Result<()> {
110 for dir in &self.watch_dirs {
112 if dir.exists() {
113 tracing::info!("Will watch for session files in {:?}", dir);
114 } else {
115 tracing::info!("Watch directory does not exist yet: {:?}", dir);
116 }
117 }
118
119 let (tx, mut rx) = mpsc::channel::<Vec<DebouncedEvent>>(100);
121
122 let mut debouncer = new_debouncer(
124 Duration::from_millis(500),
125 move |events: Result<Vec<DebouncedEvent>, notify::Error>| {
126 if let Ok(events) = events {
127 let filtered: Vec<DebouncedEvent> = events
129 .into_iter()
130 .filter(|e| {
131 let ext = e.path.extension().and_then(|ext| ext.to_str());
132 matches!(ext, Some("jsonl") | Some("vscdb"))
133 })
134 .collect();
135
136 if !filtered.is_empty() {
137 let _ = tx.blocking_send(filtered);
138 }
139 }
140 },
141 )
142 .context("Failed to create file watcher")?;
143
144 for dir in &self.watch_dirs {
146 if dir.exists() {
147 debouncer
148 .watcher()
149 .watch(dir, RecursiveMode::Recursive)
150 .context(format!("Failed to start watching directory {dir:?}"))?;
151
152 tracing::info!("Watching for session files in {:?}", dir);
153 }
154 }
155
156 self.initial_scan(&stats).await?;
158
159 loop {
161 tokio::select! {
162 Some(events) = rx.recv() => {
163 for event in events {
164 if let Err(e) = self.handle_file_event(&event.path, &stats).await {
165 let error_msg = e.to_string();
166 if error_msg.contains("unable to open database")
169 || error_msg.contains("database is locked")
170 {
171 tracing::debug!(
172 "Database temporarily unavailable for {:?}: {}",
173 event.path,
174 e
175 );
176 } else {
177 tracing::warn!(
178 "Error handling file event for {:?}: {}",
179 event.path,
180 e
181 );
182 let mut stats_guard = stats.write().await;
183 stats_guard.errors += 1;
184 }
185 }
186 }
187 }
188 _ = shutdown_rx.recv() => {
189 tracing::info!("Session watcher shutting down");
190 break;
191 }
192 }
193 }
194
195 Ok(())
196 }
197
198 fn open_db(&self) -> Result<Database> {
200 self.db_config.open()
201 }
202
203 fn find_owning_watcher<'a>(
211 path: &Path,
212 watchers: &'a [&'a dyn Watcher],
213 ) -> Option<&'a dyn Watcher> {
214 for watcher in watchers {
215 for watch_path in watcher.watch_paths() {
216 if path.starts_with(&watch_path) {
217 return Some(*watcher);
218 }
219 }
220 }
221 None
222 }
223
224 async fn initial_scan(&mut self, stats: &Arc<RwLock<DaemonStats>>) -> Result<()> {
230 tracing::info!("Performing initial scan of session files...");
231
232 let registry = default_registry();
233 let mut total_files = 0;
234
235 for watcher in registry.available_watchers() {
236 let watcher_name = watcher.info().name;
237 match watcher.find_sources() {
238 Ok(sources) => {
239 tracing::info!("Found {} sources for {}", sources.len(), watcher_name);
240 total_files += sources.len();
241
242 for path in sources {
243 match self.process_file_sync(&path) {
245 Ok(Some((sessions_imported, messages_imported))) => {
246 let mut stats_guard = stats.write().await;
247 stats_guard.sessions_imported += sessions_imported;
248 stats_guard.messages_imported += messages_imported;
249 stats_guard.files_watched = self.file_positions.len();
250 }
251 Ok(None) => {
252 }
254 Err(e) => {
255 tracing::warn!("Failed to import {:?}: {}", path, e);
256 let mut stats_guard = stats.write().await;
257 stats_guard.errors += 1;
258 }
259 }
260 }
261 }
262 Err(e) => {
263 tracing::warn!("Failed to find sources for {}: {}", watcher_name, e);
264 }
265 }
266 }
267
268 {
269 let mut stats_guard = stats.write().await;
270 stats_guard.files_watched = total_files;
271 }
272
273 Ok(())
274 }
275
276 async fn handle_file_event(
278 &mut self,
279 path: &Path,
280 stats: &Arc<RwLock<DaemonStats>>,
281 ) -> Result<()> {
282 let ext = path.extension().and_then(|e| e.to_str());
283
284 if !matches!(ext, Some("jsonl") | Some("vscdb")) {
286 return Ok(());
287 }
288
289 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
291 if name.starts_with("agent-") {
292 return Ok(());
293 }
294 }
295
296 if !path.exists() {
298 self.file_positions.remove(path);
300 return Ok(());
301 }
302
303 match self.process_file_sync(path) {
305 Ok(Some((sessions_imported, messages_imported))) => {
306 let mut stats_guard = stats.write().await;
307 stats_guard.sessions_imported += sessions_imported;
308 stats_guard.messages_imported += messages_imported;
309 stats_guard.files_watched = self.file_positions.len();
310 }
311 Ok(None) => {
312 }
314 Err(e) => {
315 return Err(e);
316 }
317 }
318
319 Ok(())
320 }
321
322 fn process_file_sync(&mut self, path: &Path) -> Result<Option<(u64, u64)>> {
333 let db = self.open_db()?;
334 let path_str = path.to_string_lossy();
335 let last_pos = self.file_positions.get(path).copied().unwrap_or(0);
336
337 let metadata = std::fs::metadata(path).context("Failed to get file metadata")?;
339 let current_size = metadata.len();
340
341 if current_size <= last_pos {
342 if current_size < last_pos {
344 self.file_positions.insert(path.to_path_buf(), 0);
346 }
347 return Ok(None);
348 }
349
350 let existing_session = db.get_session_by_source(&path_str)?;
352
353 if let Some(existing) = existing_session {
354 tracing::debug!(
357 "Session {} exists but file has grown, re-importing for updates",
358 &existing.id.to_string()[..8]
359 );
360 let result = self.update_existing_session(path, &db, &existing)?;
361 self.file_positions.insert(path.to_path_buf(), current_size);
362 return Ok(Some(result));
363 }
364
365 let result = self.import_file_sync(path, &db)?;
367
368 self.file_positions.insert(path.to_path_buf(), current_size);
370
371 Ok(Some(result))
372 }
373
374 fn update_existing_session(
384 &self,
385 path: &Path,
386 db: &Database,
387 existing_session: &crate::storage::models::Session,
388 ) -> Result<(u64, u64)> {
389 tracing::debug!("Updating existing session from: {:?}", path);
390
391 let path_buf = path.to_path_buf();
392 let registry = default_registry();
393 let available = registry.available_watchers();
394
395 let owning_watcher = match Self::find_owning_watcher(path, &available) {
397 Some(w) => w,
398 None => {
399 tracing::debug!("No watcher owns path {:?}", path);
400 return Ok((0, 0));
401 }
402 };
403
404 let parsed_sessions = match owning_watcher.parse_source(&path_buf) {
406 Ok(sessions) => sessions,
407 Err(e) => {
408 tracing::debug!(
409 "Watcher {} could not parse {:?}: {}",
410 owning_watcher.info().name,
411 path,
412 e
413 );
414 return Ok((0, 0));
415 }
416 };
417
418 if parsed_sessions.is_empty() {
419 tracing::debug!(
420 "Watcher {} returned no sessions for {:?}",
421 owning_watcher.info().name,
422 path
423 );
424 return Ok((0, 0));
425 }
426
427 let mut total_messages = 0u64;
428 let mut updated_session: Option<crate::storage::models::Session> = None;
429
430 for (session, messages) in parsed_sessions {
431 if messages.is_empty() {
432 continue;
433 }
434
435 db.insert_session(&session)?;
438
439 let mut latest_branch: Option<String> = None;
441 let mut new_message_count = 0u64;
442
443 for msg in &messages {
444 db.insert_message(msg)?;
448 new_message_count += 1;
449
450 if msg.git_branch.is_some() {
451 latest_branch = msg.git_branch.clone();
452 }
453 }
454
455 if let Some(ref new_branch) = latest_branch {
457 if session.git_branch.as_ref() != Some(new_branch) {
458 if let Err(e) = db.update_session_branch(session.id, new_branch) {
459 tracing::warn!(
460 "Failed to update session branch for {}: {}",
461 &session.id.to_string()[..8],
462 e
463 );
464 } else {
465 tracing::debug!(
466 "Updated session {} branch to {}",
467 &session.id.to_string()[..8],
468 new_branch
469 );
470 }
471 }
472 }
473
474 total_messages += new_message_count;
475 updated_session = Some(session);
476 }
477
478 if let Some(session) = updated_session {
481 if let Some(ended_at) = session.ended_at {
482 if existing_session.ended_at.is_none() {
484 tracing::info!(
485 "Session {} has ended, running auto-link",
486 &session.id.to_string()[..8]
487 );
488 }
489
490 let linked = self.auto_link_session_commits(
491 db,
492 session.id,
493 &session.working_directory,
494 session.started_at,
495 ended_at,
496 );
497 if let Err(e) = linked {
498 tracing::warn!(
499 "Failed to auto-link commits for session {}: {}",
500 &session.id.to_string()[..8],
501 e
502 );
503 }
504 }
505 }
506
507 Ok((0, total_messages))
509 }
510
511 fn import_file_sync(&mut self, path: &Path, db: &Database) -> Result<(u64, u64)> {
518 tracing::debug!("Importing session file: {:?}", path);
519
520 let path_buf = path.to_path_buf();
521 let registry = default_registry();
522 let available = registry.available_watchers();
523
524 let owning_watcher = match Self::find_owning_watcher(path, &available) {
526 Some(w) => w,
527 None => {
528 tracing::debug!("No watcher owns path {:?}", path);
529 return Ok((0, 0));
530 }
531 };
532
533 let parsed_sessions = match owning_watcher.parse_source(&path_buf) {
535 Ok(sessions) => sessions,
536 Err(e) => {
537 tracing::debug!(
538 "Watcher {} could not parse {:?}: {}",
539 owning_watcher.info().name,
540 path,
541 e
542 );
543 return Ok((0, 0));
544 }
545 };
546
547 if parsed_sessions.is_empty() {
548 tracing::debug!(
549 "Watcher {} returned no sessions for {:?}",
550 owning_watcher.info().name,
551 path
552 );
553 return Ok((0, 0));
554 }
555
556 let mut total_sessions = 0u64;
557 let mut total_messages = 0u64;
558
559 for (session, messages) in parsed_sessions {
560 if messages.is_empty() {
561 continue;
562 }
563
564 let message_count = messages.len();
565
566 db.insert_session(&session)?;
568
569 let mut latest_branch: Option<String> = None;
571 for msg in &messages {
572 db.insert_message(msg)?;
573 if msg.git_branch.is_some() {
575 latest_branch = msg.git_branch.clone();
576 }
577 }
578
579 if let Some(ref new_branch) = latest_branch {
582 if session.git_branch.as_ref() != Some(new_branch) {
583 if let Err(e) = db.update_session_branch(session.id, new_branch) {
584 tracing::warn!(
585 "Failed to update session branch for {}: {}",
586 &session.id.to_string()[..8],
587 e
588 );
589 } else {
590 tracing::debug!(
591 "Updated session {} branch to {}",
592 &session.id.to_string()[..8],
593 new_branch
594 );
595 }
596 }
597 }
598
599 tracing::info!(
600 "Imported session {} with {} messages from {:?}",
601 &session.id.to_string()[..8],
602 message_count,
603 path.file_name().unwrap_or_default()
604 );
605
606 if let Some(ended_at) = session.ended_at {
608 let linked = self.auto_link_session_commits(
609 db,
610 session.id,
611 &session.working_directory,
612 session.started_at,
613 ended_at,
614 );
615 if let Err(e) = linked {
616 tracing::warn!(
617 "Failed to auto-link commits for session {}: {}",
618 &session.id.to_string()[..8],
619 e
620 );
621 }
622 }
623
624 total_sessions += 1;
625 total_messages += message_count as u64;
626 }
627
628 if let Ok(metadata) = std::fs::metadata(path) {
630 self.file_positions
631 .insert(path.to_path_buf(), metadata.len());
632 }
633
634 Ok((total_sessions, total_messages))
635 }
636
637 fn auto_link_session_commits(
655 &self,
656 db: &Database,
657 session_id: Uuid,
658 working_directory: &str,
659 started_at: chrono::DateTime<Utc>,
660 ended_at: chrono::DateTime<Utc>,
661 ) -> Result<usize> {
662 let working_dir = Path::new(working_directory);
663
664 if !working_dir.exists() {
666 tracing::debug!("Working directory does not exist: {}", working_directory);
667 return Ok(0);
668 }
669
670 let commits = match get_commits_in_time_range(working_dir, started_at, ended_at) {
672 Ok(commits) => commits,
673 Err(e) => {
674 tracing::debug!("Could not get commits for {}: {}", working_directory, e);
677 return Ok(0);
678 }
679 };
680
681 if commits.is_empty() {
682 tracing::debug!(
683 "No commits found in time range for session {}",
684 &session_id.to_string()[..8]
685 );
686 return Ok(0);
687 }
688
689 let mut linked_count = 0;
690
691 for commit in commits {
692 if db.link_exists(&session_id, &commit.sha)? {
694 tracing::debug!(
695 "Link already exists for session {} and commit {}",
696 &session_id.to_string()[..8],
697 &commit.sha[..8]
698 );
699 continue;
700 }
701
702 let link = SessionLink {
704 id: Uuid::new_v4(),
705 session_id,
706 link_type: LinkType::Commit,
707 commit_sha: Some(commit.sha.clone()),
708 branch: commit.branch.clone(),
709 remote: None,
710 created_at: Utc::now(),
711 created_by: LinkCreator::Auto,
712 confidence: Some(1.0), };
714
715 db.insert_link(&link)?;
716 linked_count += 1;
717
718 tracing::info!(
719 "Auto-linked commit {} to session {} ({})",
720 &commit.sha[..8],
721 &session_id.to_string()[..8],
722 commit.summary.chars().take(50).collect::<String>()
723 );
724 }
725
726 if linked_count > 0 {
727 tracing::info!(
728 "Auto-linked {} commits to session {}",
729 linked_count,
730 &session_id.to_string()[..8]
731 );
732 }
733
734 Ok(linked_count)
735 }
736
737 #[allow(dead_code)]
742 pub fn tracked_file_count(&self) -> usize {
743 self.file_positions.len()
744 }
745}
746
747#[cfg(test)]
748mod tests {
749 use super::*;
750 use crate::storage::models::Session;
751 use chrono::Duration;
752 use tempfile::tempdir;
753
754 fn create_test_db(dir: &Path) -> Database {
756 let db_path = dir.join("test.db");
757 Database::open(&db_path).expect("Failed to open test database")
758 }
759
760 fn create_test_session_with_times(
762 working_directory: &str,
763 started_at: chrono::DateTime<Utc>,
764 ended_at: chrono::DateTime<Utc>,
765 ) -> Session {
766 Session {
767 id: Uuid::new_v4(),
768 tool: "test-tool".to_string(),
769 tool_version: Some("1.0.0".to_string()),
770 started_at,
771 ended_at: Some(ended_at),
772 model: Some("test-model".to_string()),
773 working_directory: working_directory.to_string(),
774 git_branch: Some("main".to_string()),
775 source_path: None,
776 message_count: 0,
777 machine_id: Some("test-machine".to_string()),
778 }
779 }
780
781 fn create_test_commit(
785 repo: &git2::Repository,
786 message: &str,
787 time: chrono::DateTime<Utc>,
788 ) -> String {
789 let sig = git2::Signature::new(
790 "Test User",
791 "test@example.com",
792 &git2::Time::new(time.timestamp(), 0),
793 )
794 .expect("Failed to create signature");
795
796 let tree_id = {
798 let mut index = repo.index().expect("Failed to get index");
799
800 let file_path = repo
802 .workdir()
803 .unwrap()
804 .join(format!("test_{}.txt", Uuid::new_v4()));
805 std::fs::write(&file_path, format!("Content for: {message}"))
806 .expect("Failed to write test file");
807
808 index
809 .add_path(file_path.strip_prefix(repo.workdir().unwrap()).unwrap())
810 .expect("Failed to add file to index");
811 index.write().expect("Failed to write index");
812 index.write_tree().expect("Failed to write tree")
813 };
814
815 let tree = repo.find_tree(tree_id).expect("Failed to find tree");
816
817 let parent = repo.head().ok().and_then(|h| h.peel_to_commit().ok());
819
820 let commit_id = if let Some(parent) = parent {
821 repo.commit(Some("HEAD"), &sig, &sig, message, &tree, &[&parent])
822 .expect("Failed to create commit")
823 } else {
824 repo.commit(Some("HEAD"), &sig, &sig, message, &tree, &[])
825 .expect("Failed to create commit")
826 };
827
828 commit_id.to_string()
829 }
830
831 fn init_test_repo(dir: &Path) -> git2::Repository {
833 git2::Repository::init(dir).expect("Failed to init test repo")
834 }
835
836 #[test]
837 fn test_session_watcher_creation() {
838 let watcher = SessionWatcher::new();
839 assert!(watcher.is_ok(), "Should create watcher successfully");
840
841 let _watcher = watcher.unwrap();
845 }
846
847 #[test]
848 fn test_watch_dirs_from_registry() {
849 use crate::capture::watchers::default_registry;
850
851 let registry = default_registry();
859 let all_watchers = registry.all_watchers();
860
861 let all_paths: Vec<_> = all_watchers.iter().flat_map(|w| w.watch_paths()).collect();
863
864 let has_claude = all_paths
865 .iter()
866 .any(|d| d.to_string_lossy().contains(".claude"));
867 let has_cursor = all_paths
868 .iter()
869 .any(|d| d.to_string_lossy().contains("Cursor"));
870
871 assert!(
873 has_claude || has_cursor,
874 "Registry should configure at least one known watcher path pattern \
875 (expected .claude or Cursor in paths). Found paths: {all_paths:?}"
876 );
877 }
878
879 #[test]
880 fn test_tracked_file_count_initial() {
881 let watcher = SessionWatcher::new().unwrap();
882 assert_eq!(
883 watcher.tracked_file_count(),
884 0,
885 "Should start with no tracked files"
886 );
887 }
888
889 #[test]
890 fn test_db_config_creation() {
891 let config = DbConfig::default_config();
892 assert!(config.is_ok(), "Should create DbConfig successfully");
893 }
894
895 #[test]
896 fn test_file_position_tracking() {
897 let mut watcher = SessionWatcher::new().unwrap();
898
899 let path1 = PathBuf::from("/test/file1.jsonl");
900 let path2 = PathBuf::from("/test/file2.jsonl");
901
902 watcher.file_positions.insert(path1.clone(), 100);
903 watcher.file_positions.insert(path2.clone(), 200);
904
905 assert_eq!(watcher.tracked_file_count(), 2);
906 assert_eq!(watcher.file_positions.get(&path1), Some(&100));
907 assert_eq!(watcher.file_positions.get(&path2), Some(&200));
908 }
909
910 #[test]
913 fn test_find_owning_watcher_matches_path_under_watch_dir() {
914 use crate::capture::watchers::{Watcher, WatcherInfo};
915 use crate::storage::models::{Message, Session};
916
917 struct TestWatcher {
918 name: &'static str,
919 watch_path: PathBuf,
920 }
921
922 impl Watcher for TestWatcher {
923 fn info(&self) -> WatcherInfo {
924 WatcherInfo {
925 name: self.name,
926 description: "Test",
927 default_paths: vec![],
928 }
929 }
930 fn is_available(&self) -> bool {
931 true
932 }
933 fn find_sources(&self) -> Result<Vec<PathBuf>> {
934 Ok(vec![])
935 }
936 fn parse_source(&self, _: &Path) -> Result<Vec<(Session, Vec<Message>)>> {
937 Ok(vec![])
938 }
939 fn watch_paths(&self) -> Vec<PathBuf> {
940 vec![self.watch_path.clone()]
941 }
942 }
943
944 let watcher1 = TestWatcher {
945 name: "watcher-a",
946 watch_path: PathBuf::from("/home/user/.claude/projects"),
947 };
948 let watcher2 = TestWatcher {
949 name: "watcher-b",
950 watch_path: PathBuf::from("/home/user/.aider"),
951 };
952
953 let watchers: Vec<&dyn Watcher> = vec![&watcher1, &watcher2];
954
955 let claude_file = Path::new("/home/user/.claude/projects/myproject/session.jsonl");
957 let result = SessionWatcher::find_owning_watcher(claude_file, &watchers);
958 assert!(result.is_some());
959 assert_eq!(result.unwrap().info().name, "watcher-a");
960
961 let aider_file = Path::new("/home/user/.aider/history.md");
963 let result = SessionWatcher::find_owning_watcher(aider_file, &watchers);
964 assert!(result.is_some());
965 assert_eq!(result.unwrap().info().name, "watcher-b");
966
967 let other_file = Path::new("/home/user/projects/random.txt");
969 let result = SessionWatcher::find_owning_watcher(other_file, &watchers);
970 assert!(result.is_none());
971 }
972
973 #[test]
976 fn test_auto_link_creates_links_for_commits_in_time_range() {
977 let dir = tempdir().expect("Failed to create temp directory");
979 let repo_path = dir.path();
980
981 let repo = init_test_repo(repo_path);
983
984 let db = create_test_db(repo_path);
986
987 let now = Utc::now();
989 let session_start = now - Duration::hours(1);
990 let session_end = now;
991
992 let commit_time1 = session_start + Duration::minutes(10);
994 let commit_time2 = session_start + Duration::minutes(30);
995 let commit_time3 = session_start + Duration::minutes(50);
996
997 let sha1 = create_test_commit(&repo, "First commit in session", commit_time1);
998 let sha2 = create_test_commit(&repo, "Second commit in session", commit_time2);
999 let sha3 = create_test_commit(&repo, "Third commit in session", commit_time3);
1000
1001 let session = create_test_session_with_times(
1003 &repo_path.to_string_lossy(),
1004 session_start,
1005 session_end,
1006 );
1007 db.insert_session(&session)
1008 .expect("Failed to insert session");
1009
1010 let watcher = SessionWatcher {
1012 file_positions: HashMap::new(),
1013 watch_dirs: vec![],
1014 db_config: DbConfig {
1015 path: repo_path.join("test.db"),
1016 },
1017 };
1018
1019 let linked_count = watcher
1021 .auto_link_session_commits(
1022 &db,
1023 session.id,
1024 &repo_path.to_string_lossy(),
1025 session_start,
1026 session_end,
1027 )
1028 .expect("auto_link_session_commits should succeed");
1029
1030 assert_eq!(linked_count, 3, "Should have linked 3 commits");
1032
1033 assert!(
1035 db.link_exists(&session.id, &sha1)
1036 .expect("link_exists should succeed"),
1037 "First commit should be linked"
1038 );
1039 assert!(
1040 db.link_exists(&session.id, &sha2)
1041 .expect("link_exists should succeed"),
1042 "Second commit should be linked"
1043 );
1044 assert!(
1045 db.link_exists(&session.id, &sha3)
1046 .expect("link_exists should succeed"),
1047 "Third commit should be linked"
1048 );
1049 }
1050
1051 #[test]
1052 fn test_auto_link_skips_commits_outside_time_range() {
1053 let dir = tempdir().expect("Failed to create temp directory");
1055 let repo_path = dir.path();
1056
1057 let repo = init_test_repo(repo_path);
1059
1060 let db = create_test_db(repo_path);
1062
1063 let now = Utc::now();
1065 let session_start = now - Duration::minutes(30);
1066 let session_end = now - Duration::minutes(20);
1067
1068 let before_time = now - Duration::minutes(40);
1070 let sha_before = create_test_commit(&repo, "Commit before session", before_time);
1071
1072 let inside_time = now - Duration::minutes(25);
1074 let sha_inside = create_test_commit(&repo, "Commit inside session", inside_time);
1075
1076 let after_time = now - Duration::minutes(10);
1078 let sha_after = create_test_commit(&repo, "Commit after session", after_time);
1079
1080 let session = create_test_session_with_times(
1082 &repo_path.to_string_lossy(),
1083 session_start,
1084 session_end,
1085 );
1086 db.insert_session(&session)
1087 .expect("Failed to insert session");
1088
1089 let watcher = SessionWatcher {
1091 file_positions: HashMap::new(),
1092 watch_dirs: vec![],
1093 db_config: DbConfig {
1094 path: repo_path.join("test.db"),
1095 },
1096 };
1097
1098 let linked_count = watcher
1099 .auto_link_session_commits(
1100 &db,
1101 session.id,
1102 &repo_path.to_string_lossy(),
1103 session_start,
1104 session_end,
1105 )
1106 .expect("auto_link_session_commits should succeed");
1107
1108 assert_eq!(linked_count, 1, "Should have linked only 1 commit");
1110
1111 assert!(
1113 !db.link_exists(&session.id, &sha_before)
1114 .expect("link_exists should succeed"),
1115 "Commit before session should NOT be linked"
1116 );
1117
1118 assert!(
1120 db.link_exists(&session.id, &sha_inside)
1121 .expect("link_exists should succeed"),
1122 "Commit inside session should be linked"
1123 );
1124
1125 assert!(
1127 !db.link_exists(&session.id, &sha_after)
1128 .expect("link_exists should succeed"),
1129 "Commit after session should NOT be linked"
1130 );
1131 }
1132
1133 #[test]
1134 fn test_auto_link_skips_existing_links() {
1135 let dir = tempdir().expect("Failed to create temp directory");
1137 let repo_path = dir.path();
1138
1139 let repo = init_test_repo(repo_path);
1141
1142 let db = create_test_db(repo_path);
1144
1145 let now = Utc::now();
1147 let session_start = now - Duration::hours(1);
1148 let session_end = now;
1149
1150 let commit_time = session_start + Duration::minutes(30);
1152 let sha = create_test_commit(&repo, "Test commit", commit_time);
1153
1154 let session = create_test_session_with_times(
1156 &repo_path.to_string_lossy(),
1157 session_start,
1158 session_end,
1159 );
1160 db.insert_session(&session)
1161 .expect("Failed to insert session");
1162
1163 let existing_link = SessionLink {
1165 id: Uuid::new_v4(),
1166 session_id: session.id,
1167 link_type: LinkType::Commit,
1168 commit_sha: Some(sha.clone()),
1169 branch: Some("main".to_string()),
1170 remote: None,
1171 created_at: Utc::now(),
1172 created_by: LinkCreator::Auto,
1173 confidence: Some(1.0),
1174 };
1175 db.insert_link(&existing_link)
1176 .expect("Failed to insert existing link");
1177
1178 let watcher = SessionWatcher {
1180 file_positions: HashMap::new(),
1181 watch_dirs: vec![],
1182 db_config: DbConfig {
1183 path: repo_path.join("test.db"),
1184 },
1185 };
1186
1187 let linked_count = watcher
1188 .auto_link_session_commits(
1189 &db,
1190 session.id,
1191 &repo_path.to_string_lossy(),
1192 session_start,
1193 session_end,
1194 )
1195 .expect("auto_link_session_commits should succeed");
1196
1197 assert_eq!(
1199 linked_count, 0,
1200 "Should not create any new links when link already exists"
1201 );
1202
1203 assert!(
1205 db.link_exists(&session.id, &sha)
1206 .expect("link_exists should succeed"),
1207 "Link should still exist"
1208 );
1209 }
1210
1211 #[test]
1212 fn test_auto_link_handles_non_git_directory() {
1213 let dir = tempdir().expect("Failed to create temp directory");
1215 let non_repo_path = dir.path();
1216
1217 let db = create_test_db(non_repo_path);
1219
1220 let now = Utc::now();
1222 let session_start = now - Duration::hours(1);
1223 let session_end = now;
1224
1225 let session = create_test_session_with_times(
1227 &non_repo_path.to_string_lossy(),
1228 session_start,
1229 session_end,
1230 );
1231 db.insert_session(&session)
1232 .expect("Failed to insert session");
1233
1234 let watcher = SessionWatcher {
1236 file_positions: HashMap::new(),
1237 watch_dirs: vec![],
1238 db_config: DbConfig {
1239 path: non_repo_path.join("test.db"),
1240 },
1241 };
1242
1243 let result = watcher.auto_link_session_commits(
1244 &db,
1245 session.id,
1246 &non_repo_path.to_string_lossy(),
1247 session_start,
1248 session_end,
1249 );
1250
1251 assert!(
1253 result.is_ok(),
1254 "Should handle non-git directory gracefully: {:?}",
1255 result.err()
1256 );
1257 assert_eq!(result.unwrap(), 0, "Should return 0 for non-git directory");
1258 }
1259
1260 #[test]
1261 fn test_auto_link_finds_commits_on_multiple_branches() {
1262 let dir = tempdir().expect("Failed to create temp directory");
1264 let repo_path = dir.path();
1265
1266 let repo = init_test_repo(repo_path);
1268
1269 let db = create_test_db(repo_path);
1271
1272 let now = Utc::now();
1274 let session_start = now - Duration::hours(1);
1275 let session_end = now;
1276
1277 let main_commit_time = session_start + Duration::minutes(10);
1279 let sha_main = create_test_commit(&repo, "Commit on main", main_commit_time);
1280
1281 let head_ref = repo.head().expect("Should have HEAD after commit");
1283 let default_branch = head_ref
1284 .shorthand()
1285 .expect("HEAD should have a name")
1286 .to_string();
1287
1288 let main_commit = head_ref.peel_to_commit().unwrap();
1290 repo.branch("feature-branch", &main_commit, false)
1291 .expect("Failed to create branch");
1292 repo.set_head("refs/heads/feature-branch")
1293 .expect("Failed to switch branch");
1294
1295 let feature_commit_time = session_start + Duration::minutes(30);
1297 let sha_feature = create_test_commit(&repo, "Commit on feature", feature_commit_time);
1298
1299 repo.set_head(&format!("refs/heads/{}", default_branch))
1301 .expect("Failed to switch to default branch");
1302 let main_obj = repo
1304 .revparse_single(&default_branch)
1305 .expect("Should find default branch");
1306 repo.reset(&main_obj, git2::ResetType::Hard, None)
1307 .expect("Failed to reset to default branch");
1308
1309 let main_commit_time2 = session_start + Duration::minutes(50);
1310 let sha_main2 = create_test_commit(&repo, "Second commit on main", main_commit_time2);
1311
1312 let session = create_test_session_with_times(
1314 &repo_path.to_string_lossy(),
1315 session_start,
1316 session_end,
1317 );
1318 db.insert_session(&session)
1319 .expect("Failed to insert session");
1320
1321 let watcher = SessionWatcher {
1323 file_positions: HashMap::new(),
1324 watch_dirs: vec![],
1325 db_config: DbConfig {
1326 path: repo_path.join("test.db"),
1327 },
1328 };
1329
1330 let linked_count = watcher
1331 .auto_link_session_commits(
1332 &db,
1333 session.id,
1334 &repo_path.to_string_lossy(),
1335 session_start,
1336 session_end,
1337 )
1338 .expect("auto_link_session_commits should succeed");
1339
1340 assert_eq!(
1342 linked_count, 3,
1343 "Should have linked commits from both branches"
1344 );
1345
1346 assert!(
1348 db.link_exists(&session.id, &sha_main)
1349 .expect("link_exists should succeed"),
1350 "First main commit should be linked"
1351 );
1352 assert!(
1353 db.link_exists(&session.id, &sha_feature)
1354 .expect("link_exists should succeed"),
1355 "Feature branch commit should be linked"
1356 );
1357 assert!(
1358 db.link_exists(&session.id, &sha_main2)
1359 .expect("link_exists should succeed"),
1360 "Second main commit should be linked"
1361 );
1362 }
1363
1364 #[test]
1365 fn test_update_existing_session_triggers_auto_link() {
1366 let dir = tempdir().expect("Failed to create temp directory");
1371 let repo_path = dir.path();
1372
1373 let repo = init_test_repo(repo_path);
1375 let db = create_test_db(repo_path);
1376
1377 let now = Utc::now();
1379 let session_start = now - Duration::hours(1);
1380 let session_end = now;
1381
1382 let commit_time = session_start + Duration::minutes(30);
1384 let sha = create_test_commit(&repo, "Commit during session", commit_time);
1385
1386 let session_id = Uuid::new_v4();
1388 let ongoing_session = Session {
1389 id: session_id,
1390 tool: "test-tool".to_string(),
1391 tool_version: Some("1.0.0".to_string()),
1392 started_at: session_start,
1393 ended_at: None, model: Some("test-model".to_string()),
1395 working_directory: repo_path.to_string_lossy().to_string(),
1396 git_branch: Some("main".to_string()),
1397 source_path: Some("/test/session.jsonl".to_string()),
1398 message_count: 5,
1399 machine_id: Some("test-machine".to_string()),
1400 };
1401
1402 db.insert_session(&ongoing_session)
1403 .expect("Failed to insert session");
1404
1405 assert!(
1407 !db.link_exists(&session_id, &sha)
1408 .expect("link_exists should succeed"),
1409 "Commit should NOT be linked to ongoing session"
1410 );
1411
1412 let ended_session = Session {
1414 id: session_id,
1415 tool: "test-tool".to_string(),
1416 tool_version: Some("1.0.0".to_string()),
1417 started_at: session_start,
1418 ended_at: Some(session_end), model: Some("test-model".to_string()),
1420 working_directory: repo_path.to_string_lossy().to_string(),
1421 git_branch: Some("main".to_string()),
1422 source_path: Some("/test/session.jsonl".to_string()),
1423 message_count: 10,
1424 machine_id: Some("test-machine".to_string()),
1425 };
1426
1427 let watcher = SessionWatcher {
1429 file_positions: HashMap::new(),
1430 watch_dirs: vec![],
1431 db_config: DbConfig {
1432 path: repo_path.join("test.db"),
1433 },
1434 };
1435
1436 db.insert_session(&ended_session)
1439 .expect("Failed to update session");
1440
1441 let linked_count = watcher
1443 .auto_link_session_commits(
1444 &db,
1445 session_id,
1446 &repo_path.to_string_lossy(),
1447 session_start,
1448 session_end,
1449 )
1450 .expect("auto_link_session_commits should succeed");
1451
1452 assert_eq!(linked_count, 1, "Should have linked 1 commit");
1454 assert!(
1455 db.link_exists(&session_id, &sha)
1456 .expect("link_exists should succeed"),
1457 "Commit should be linked after session ended"
1458 );
1459 }
1460}