1use crate::git::shell_fetch;
2use crate::git::shell_push::push_current_branch;
3use crate::git::utils::is_worktree_dirty;
4use anyhow::Context;
5use anyhow::Result;
6use chrono::DateTime;
7use chrono::Utc;
8use colored::*;
9use git2::IndexAddOption;
10use git2::Repository;
11use git2::Signature;
12use serde::Deserialize;
13use serde::Serialize;
14use std::collections::HashMap;
15use std::path::Path;
16
17#[derive(Debug, Deserialize, Serialize)]
20struct LogEntryForMerge {
21 call_id: String,
22 started_at: DateTime<Utc>,
23 #[serde(flatten)]
24 rest: serde_json::Value,
25}
26
27fn is_tool_log_file(path: &str) -> bool {
33 if let Some(logs_idx) = path.find("/logs/") {
34 let after_logs = &path[logs_idx + 6..]; after_logs.starts_with("tool_logs_") && path.ends_with(".jsonl")
36 } else {
37 false
38 }
39}
40
41fn merge_jsonl_logs(ours_content: &[u8], theirs_content: &[u8]) -> Vec<u8> {
47 let mut records: HashMap<String, (DateTime<Utc>, String)> = HashMap::new();
48 let mut unparseable_lines: Vec<String> = Vec::new();
49
50 for line in String::from_utf8_lossy(ours_content).lines() {
52 if line.trim().is_empty() {
53 continue;
54 }
55 match serde_json::from_str::<LogEntryForMerge>(line) {
56 Ok(entry) => {
57 records.insert(entry.call_id.clone(), (entry.started_at, line.to_string()));
58 }
59 Err(_) => {
60 unparseable_lines.push(line.to_string());
61 }
62 }
63 }
64
65 for line in String::from_utf8_lossy(theirs_content).lines() {
67 if line.trim().is_empty() {
68 continue;
69 }
70 match serde_json::from_str::<LogEntryForMerge>(line) {
71 Ok(entry) => {
72 records.insert(entry.call_id.clone(), (entry.started_at, line.to_string()));
74 }
75 Err(_) => {
76 if !unparseable_lines.contains(&line.to_string()) {
78 unparseable_lines.push(line.to_string());
79 }
80 }
81 }
82 }
83
84 let mut sorted: Vec<_> = records.into_values().collect();
86 sorted.sort_by_key(|(ts, _)| *ts);
87
88 let mut output = sorted
90 .into_iter()
91 .map(|(_, line)| line)
92 .collect::<Vec<_>>()
93 .join("\n");
94
95 if !unparseable_lines.is_empty() {
96 if !output.is_empty() {
97 output.push('\n');
98 }
99 output.push_str(&unparseable_lines.join("\n"));
100 }
101
102 if !output.is_empty() {
103 output.push('\n');
104 }
105
106 output.into_bytes()
107}
108
109#[allow(dead_code)] pub(crate) struct DivergenceAnalysis {
112 pub(crate) is_diverged: bool,
114 pub(crate) is_ahead: bool,
116 pub(crate) is_behind: bool,
118}
119
120pub struct GitSync {
121 repo: Repository,
122 repo_path: std::path::PathBuf,
123 subpath: Option<String>,
124}
125
126impl GitSync {
127 pub fn new(repo_path: &Path, subpath: Option<String>) -> Result<Self> {
128 let repo = Repository::open(repo_path)?;
129 Ok(Self {
130 repo,
131 repo_path: repo_path.to_path_buf(),
132 subpath,
133 })
134 }
135
136 pub async fn sync(&self, mount_name: &str) -> Result<()> {
137 println!(" {} {}", "Syncing".cyan(), mount_name);
138
139 if let Err(e) = self.preflight_fetch() {
141 println!(" {} Pre-flight fetch failed: {}", "⚠".yellow(), e);
142 }
144
145 let changes_staged = self.stage_changes().await?;
147
148 if changes_staged {
150 self.commit(mount_name).await?;
151 println!(" {} Committed changes", "✓".green());
152 } else {
153 println!(" {} No changes to commit", "○".dimmed());
154 }
155
156 match self.pull_rebase().await {
158 Ok(pulled) => {
159 if pulled {
160 println!(" {} Pulled remote changes", "✓".green());
161 }
162 }
163 Err(e) => {
164 println!(" {} Pull failed: {}", "⚠".yellow(), e);
165 }
167 }
168
169 match self.push().await {
171 Ok(_) => println!(" {} Pushed to remote", "✓".green()),
172 Err(e) => {
173 println!(" {} Push failed: {}", "⚠".yellow(), e);
174 println!(" {} Changes saved locally only", "Info".dimmed());
175 }
176 }
177
178 Ok(())
179 }
180
181 fn preflight_fetch(&self) -> Result<()> {
185 if self.repo.find_remote("origin").is_err() {
187 return Ok(()); }
189
190 shell_fetch::fetch(&self.repo_path, "origin")?;
192
193 if let Ok(analysis) = self.check_divergence()
195 && analysis.is_diverged
196 {
197 println!(
198 " {} Detected divergence from remote - will attempt rebase",
199 "Info".dimmed()
200 );
201 }
202
203 Ok(())
204 }
205
206 pub(crate) fn check_divergence(&self) -> Result<DivergenceAnalysis> {
208 let head = self.repo.head()?;
209 let branch_name = head.shorthand().unwrap_or("HEAD");
210 let upstream_ref = format!("refs/remotes/origin/{}", branch_name);
211
212 let local_oid = head
213 .target()
214 .ok_or_else(|| anyhow::anyhow!("No HEAD target"))?;
215
216 let upstream_oid = match self.repo.refname_to_id(&upstream_ref) {
217 Ok(oid) => oid,
218 Err(_) => {
219 return Ok(DivergenceAnalysis {
221 is_diverged: false,
222 is_ahead: true,
223 is_behind: false,
224 });
225 }
226 };
227
228 let (ahead, behind) = self.repo.graph_ahead_behind(local_oid, upstream_oid)?;
231
232 Ok(DivergenceAnalysis {
233 is_diverged: ahead > 0 && behind > 0,
234 is_ahead: ahead > 0,
235 is_behind: behind > 0,
236 })
237 }
238
239 async fn stage_changes(&self) -> Result<bool> {
240 let mut index = self.repo.index()?;
241
242 let pathspecs: Vec<String> = if let Some(subpath) = &self.subpath {
244 vec![
247 format!("{}/*", subpath), format!("{}/**/*", subpath), ]
250 } else {
251 vec![".".to_string()]
253 };
254
255 let flags = IndexAddOption::DEFAULT;
257
258 let mut staged_files = 0;
260
261 let cb = &mut |_path: &std::path::Path, _matched_spec: &[u8]| -> i32 {
263 staged_files += 1;
264 0 };
266
267 index.add_all(
269 pathspecs.iter(),
270 flags,
271 Some(cb as &mut git2::IndexMatchedPath),
272 )?;
273
274 index.update_all(pathspecs.iter(), None)?;
276
277 index.write()?;
278
279 let diff = match self.repo.head() {
282 Ok(head) => {
283 let head_tree = self.repo.find_commit(head.target().unwrap())?.tree()?;
284 self.repo
285 .diff_tree_to_index(Some(&head_tree), Some(&index), None)?
286 }
287 Err(e) if e.code() == git2::ErrorCode::UnbornBranch => {
288 self.repo.diff_tree_to_index(None, Some(&index), None)?
290 }
291 Err(e) => return Err(e.into()),
292 };
293
294 Ok(diff.stats()?.files_changed() > 0)
295 }
296
297 async fn commit(&self, mount_name: &str) -> Result<()> {
298 let sig = Signature::now("thoughts-sync", "thoughts@sync.local")?;
299 let tree_id = self.repo.index()?.write_tree()?;
300 let tree = self.repo.find_tree(tree_id)?;
301
302 let message = if let Some(subpath) = &self.subpath {
304 format!("Auto-sync thoughts for {mount_name} (subpath: {subpath})")
305 } else {
306 format!("Auto-sync thoughts for {mount_name}")
307 };
308
309 match self.repo.head() {
311 Ok(head) => {
312 let parent = self.repo.find_commit(head.target().unwrap())?;
314 self.repo
315 .commit(Some("HEAD"), &sig, &sig, &message, &tree, &[&parent])?;
316 }
317 Err(e) if e.code() == git2::ErrorCode::UnbornBranch => {
318 self.repo.commit(
320 Some("HEAD"),
321 &sig,
322 &sig,
323 &message,
324 &tree,
325 &[], )?;
327 }
328 Err(e) => return Err(e.into()),
329 }
330
331 Ok(())
332 }
333
334 async fn pull_rebase(&self) -> Result<bool> {
335 if self.repo.find_remote("origin").is_err() {
337 println!(
338 " {} No remote 'origin' configured (local-only)",
339 "Info".dimmed()
340 );
341 return Ok(false);
342 }
343
344 shell_fetch::fetch(&self.repo_path, "origin").with_context(|| {
346 format!(
347 "Fetch from origin failed for repo '{}'",
348 self.repo_path.display()
349 )
350 })?;
351
352 let head = self.repo.head()?;
354 let branch_name = head.shorthand().unwrap_or("main");
355
356 let upstream_oid = match self
358 .repo
359 .refname_to_id(&format!("refs/remotes/origin/{branch_name}"))
360 {
361 Ok(oid) => oid,
362 Err(_) => {
363 return Ok(false);
365 }
366 };
367
368 let upstream_commit = self.repo.find_annotated_commit(upstream_oid)?;
369 let head_commit = self.repo.find_annotated_commit(head.target().unwrap())?;
370
371 let analysis = self.repo.merge_analysis(&[&upstream_commit])?;
373
374 if analysis.0.is_up_to_date() {
375 return Ok(false);
376 }
377
378 if analysis.0.is_fast_forward() {
379 if is_worktree_dirty(&self.repo)? {
381 anyhow::bail!(
382 "Cannot fast-forward: working tree has uncommitted changes. Please commit or stash before syncing."
383 );
384 }
385 let obj = self.repo.find_object(upstream_oid, None)?;
389 self.repo.reset(
390 &obj,
391 git2::ResetType::Hard,
392 Some(git2::build::CheckoutBuilder::default().force()),
393 )?;
394 return Ok(true);
395 }
396
397 let mut rebase =
399 self.repo
400 .rebase(Some(&head_commit), Some(&upstream_commit), None, None)?;
401
402 let rebase_result: Result<bool> = (|| {
403 while let Some(operation) = rebase.next() {
404 let _op =
406 operation.map_err(|e| anyhow::anyhow!("Rebase operation failed: {}", e))?;
407
408 if self.repo.index()?.has_conflicts() {
409 self.resolve_conflicts_prefer_remote()?;
411 }
412 rebase.commit(
413 None,
414 &Signature::now("thoughts-sync", "thoughts@sync.local")?,
415 None,
416 )?;
417 }
418 rebase.finish(None)?;
419 Ok(true)
420 })();
421
422 if rebase_result.is_err() {
424 let _ = rebase.abort(); }
426
427 rebase_result
428 }
429
430 async fn push(&self) -> Result<()> {
431 if self.repo.find_remote("origin").is_err() {
432 println!(
433 " {} No remote 'origin' configured (local-only)",
434 "Info".dimmed()
435 );
436 return Ok(());
437 }
438
439 let head = self.repo.head()?;
440 let branch = head.shorthand().unwrap_or("main");
441
442 push_current_branch(&self.repo_path, "origin", branch)?;
444 Ok(())
445 }
446
447 fn resolve_conflicts_prefer_remote(&self) -> Result<()> {
461 let mut index = self.repo.index()?;
462 let conflicts: Vec<_> = index.conflicts()?.collect::<Result<Vec<_>, _>>()?;
463
464 for conflict in conflicts {
465 let path = conflict
467 .our
468 .as_ref()
469 .or(conflict.their.as_ref())
470 .map(|e| String::from_utf8_lossy(&e.path).to_string());
471
472 let path_str = path.as_deref().unwrap_or("");
473
474 if is_tool_log_file(path_str)
476 && let (Some(our), Some(their)) = (&conflict.our, &conflict.their)
477 {
478 let our_blob = self.repo.find_blob(our.id)?;
479 let their_blob = self.repo.find_blob(their.id)?;
480
481 let merged = merge_jsonl_logs(our_blob.content(), their_blob.content());
482
483 let file_path = self.repo_path.join(path_str);
485 std::fs::write(&file_path, &merged)?;
486
487 index.add_path(std::path::Path::new(path_str))?;
489 continue;
490 }
491
492 if let Some(our) = conflict.our {
494 index.add(&our)?;
495 } else if let Some(their) = conflict.their {
496 index.add(&their)?;
498 }
499 }
501
502 index.write()?;
503 Ok(())
504 }
505}
506
507#[cfg(test)]
508mod tests {
509 use super::*;
510
511 #[test]
512 fn test_merge_jsonl_deduplicates_by_call_id() {
513 let ours = br#"{"call_id":"abc","started_at":"2025-01-01T10:00:00Z","tool":"foo"}
514{"call_id":"def","started_at":"2025-01-01T11:00:00Z","tool":"bar"}"#;
515 let theirs = br#"{"call_id":"abc","started_at":"2025-01-01T10:00:00Z","tool":"foo_updated"}
516{"call_id":"ghi","started_at":"2025-01-01T12:00:00Z","tool":"baz"}"#;
517
518 let merged = merge_jsonl_logs(ours, theirs);
519 let merged_str = String::from_utf8_lossy(&merged);
520
521 assert!(merged_str.contains("foo_updated"));
523 assert!(!merged_str.contains(r#""tool":"foo""#)); assert!(merged_str.contains("def"));
525 assert!(merged_str.contains("ghi"));
526 }
527
528 #[test]
529 fn test_merge_jsonl_preserves_unparseable() {
530 let ours = b"not valid json\n";
531 let theirs = br#"{"call_id":"abc","started_at":"2025-01-01T10:00:00Z","tool":"foo"}"#;
532
533 let merged = merge_jsonl_logs(ours, theirs);
534 let merged_str = String::from_utf8_lossy(&merged);
535
536 assert!(merged_str.contains("not valid json"));
537 assert!(merged_str.contains("call_id"));
538 }
539
540 #[test]
541 fn test_merge_jsonl_sorts_by_timestamp() {
542 let ours = br#"{"call_id":"late","started_at":"2025-01-01T15:00:00Z","tool":"c"}"#;
543 let theirs = br#"{"call_id":"early","started_at":"2025-01-01T09:00:00Z","tool":"a"}
544{"call_id":"mid","started_at":"2025-01-01T12:00:00Z","tool":"b"}"#;
545
546 let merged = merge_jsonl_logs(ours, theirs);
547 let merged_str = String::from_utf8_lossy(&merged);
548 let lines: Vec<_> = merged_str.lines().collect();
549
550 assert!(lines[0].contains("early"));
551 assert!(lines[1].contains("mid"));
552 assert!(lines[2].contains("late"));
553 }
554
555 #[test]
556 fn test_merge_jsonl_empty_files() {
557 let merged = merge_jsonl_logs(b"", b"");
558 assert!(merged.is_empty());
559 }
560
561 #[test]
562 fn test_merge_jsonl_one_side_empty() {
563 let content = br#"{"call_id":"abc","started_at":"2025-01-01T10:00:00Z","tool":"foo"}"#;
564
565 let merged_ours_empty = merge_jsonl_logs(b"", content);
566 assert!(String::from_utf8_lossy(&merged_ours_empty).contains("abc"));
567
568 let merged_theirs_empty = merge_jsonl_logs(content, b"");
569 assert!(String::from_utf8_lossy(&merged_theirs_empty).contains("abc"));
570 }
571
572 #[test]
573 fn test_is_tool_log_file() {
574 assert!(is_tool_log_file("branch/logs/tool_logs_2025-01-01.jsonl"));
576 assert!(is_tool_log_file(
577 "foo/logs/tool_logs_2025-01-01_abc123.jsonl"
578 ));
579 assert!(is_tool_log_file("a/b/c/logs/tool_logs_whatever.jsonl"));
580
581 assert!(!is_tool_log_file("branch/logs/other.jsonl"));
583
584 assert!(!is_tool_log_file(
586 "branch/research/tool_logs_2025-01-01.jsonl"
587 ));
588
589 assert!(!is_tool_log_file("branch/logs/tool_logs_2025-01-01.json"));
591
592 assert!(!is_tool_log_file("tool_logs_config/logs/readme.jsonl"));
594 assert!(!is_tool_log_file("tool_logs_foo/logs/bar.jsonl"));
595
596 assert!(!is_tool_log_file("tool_logs_2025-01-01.jsonl"));
598 }
599
600 fn git_ok(dir: &std::path::Path, args: &[&str]) {
607 let out = std::process::Command::new("git")
608 .current_dir(dir)
609 .args(args)
610 .output()
611 .expect("failed to spawn git");
612 assert!(
613 out.status.success(),
614 "git {:?} failed: {}",
615 args,
616 String::from_utf8_lossy(&out.stderr)
617 );
618 }
619
620 fn git_stdout(dir: &std::path::Path, args: &[&str]) -> String {
622 let out = std::process::Command::new("git")
623 .current_dir(dir)
624 .args(args)
625 .output()
626 .expect("failed to spawn git");
627 assert!(out.status.success());
628 String::from_utf8_lossy(&out.stdout).trim().to_string()
629 }
630
631 #[test]
634 fn divergence_no_upstream_ref() {
635 let repo = tempfile::TempDir::new().unwrap();
636 git_ok(repo.path(), &["init"]);
637 std::fs::write(repo.path().join("a.txt"), "a").unwrap();
638 git_ok(repo.path(), &["add", "."]);
639 git_ok(
640 repo.path(),
641 &[
642 "-c",
643 "user.name=Test",
644 "-c",
645 "user.email=test@example.com",
646 "commit",
647 "-m",
648 "initial",
649 ],
650 );
651
652 let sync = GitSync::new(repo.path(), None).unwrap();
653 let analysis = sync.check_divergence().unwrap();
654
655 assert!(!analysis.is_diverged, "should not be diverged");
656 assert!(analysis.is_ahead, "should be ahead (no upstream)");
657 assert!(!analysis.is_behind, "should not be behind");
658 }
659
660 #[test]
663 fn divergence_up_to_date() {
664 let repo = tempfile::TempDir::new().unwrap();
665 git_ok(repo.path(), &["init"]);
666 std::fs::write(repo.path().join("a.txt"), "a").unwrap();
667 git_ok(repo.path(), &["add", "."]);
668 git_ok(
669 repo.path(),
670 &[
671 "-c",
672 "user.name=Test",
673 "-c",
674 "user.email=test@example.com",
675 "commit",
676 "-m",
677 "initial",
678 ],
679 );
680 git_ok(repo.path(), &["branch", "-M", "main"]);
682
683 let head_oid = git_stdout(repo.path(), &["rev-parse", "HEAD"]);
684 git_ok(
685 repo.path(),
686 &["update-ref", "refs/remotes/origin/main", &head_oid],
687 );
688
689 let sync = GitSync::new(repo.path(), None).unwrap();
690 let analysis = sync.check_divergence().unwrap();
691
692 assert!(!analysis.is_diverged, "should not be diverged");
693 assert!(!analysis.is_ahead, "should not be ahead");
694 assert!(!analysis.is_behind, "should not be behind");
695 }
696
697 #[test]
700 fn divergence_local_ahead_only() {
701 let repo = tempfile::TempDir::new().unwrap();
702 git_ok(repo.path(), &["init"]);
703 std::fs::write(repo.path().join("a.txt"), "a").unwrap();
704 git_ok(repo.path(), &["add", "."]);
705 git_ok(
706 repo.path(),
707 &[
708 "-c",
709 "user.name=Test",
710 "-c",
711 "user.email=test@example.com",
712 "commit",
713 "-m",
714 "C1",
715 ],
716 );
717 git_ok(repo.path(), &["branch", "-M", "main"]);
719
720 let c1_oid = git_stdout(repo.path(), &["rev-parse", "HEAD"]);
721 git_ok(
722 repo.path(),
723 &["update-ref", "refs/remotes/origin/main", &c1_oid],
724 );
725
726 std::fs::write(repo.path().join("b.txt"), "b").unwrap();
727 git_ok(repo.path(), &["add", "."]);
728 git_ok(
729 repo.path(),
730 &[
731 "-c",
732 "user.name=Test",
733 "-c",
734 "user.email=test@example.com",
735 "commit",
736 "-m",
737 "C2",
738 ],
739 );
740
741 let sync = GitSync::new(repo.path(), None).unwrap();
742 let analysis = sync.check_divergence().unwrap();
743
744 assert!(!analysis.is_diverged, "should not be diverged");
745 assert!(analysis.is_ahead, "should be ahead");
746 assert!(!analysis.is_behind, "should not be behind");
747 }
748
749 #[test]
752 fn divergence_local_behind_only() {
753 let repo = tempfile::TempDir::new().unwrap();
754 git_ok(repo.path(), &["init"]);
755 std::fs::write(repo.path().join("a.txt"), "a").unwrap();
756 git_ok(repo.path(), &["add", "."]);
757 git_ok(
758 repo.path(),
759 &[
760 "-c",
761 "user.name=Test",
762 "-c",
763 "user.email=test@example.com",
764 "commit",
765 "-m",
766 "C1",
767 ],
768 );
769 git_ok(repo.path(), &["branch", "-M", "main"]);
771
772 std::fs::write(repo.path().join("b.txt"), "b").unwrap();
773 git_ok(repo.path(), &["add", "."]);
774 git_ok(
775 repo.path(),
776 &[
777 "-c",
778 "user.name=Test",
779 "-c",
780 "user.email=test@example.com",
781 "commit",
782 "-m",
783 "C2",
784 ],
785 );
786
787 let c2_oid = git_stdout(repo.path(), &["rev-parse", "HEAD"]);
788 git_ok(repo.path(), &["reset", "--hard", "HEAD~1"]);
789 git_ok(
790 repo.path(),
791 &["update-ref", "refs/remotes/origin/main", &c2_oid],
792 );
793
794 let sync = GitSync::new(repo.path(), None).unwrap();
795 let analysis = sync.check_divergence().unwrap();
796
797 assert!(!analysis.is_diverged, "should not be diverged");
798 assert!(!analysis.is_ahead, "should not be ahead");
799 assert!(analysis.is_behind, "should be behind");
800 }
801
802 #[test]
805 fn divergence_diverged() {
806 let repo = tempfile::TempDir::new().unwrap();
807 git_ok(repo.path(), &["init"]);
808 std::fs::write(repo.path().join("a.txt"), "a").unwrap();
809 git_ok(repo.path(), &["add", "."]);
810 git_ok(
811 repo.path(),
812 &[
813 "-c",
814 "user.name=Test",
815 "-c",
816 "user.email=test@example.com",
817 "commit",
818 "-m",
819 "C1",
820 ],
821 );
822 git_ok(repo.path(), &["branch", "-M", "main"]);
824
825 let c1_oid = git_stdout(repo.path(), &["rev-parse", "HEAD"]);
826
827 std::fs::write(repo.path().join("b.txt"), "b").unwrap();
828 git_ok(repo.path(), &["add", "."]);
829 git_ok(
830 repo.path(),
831 &[
832 "-c",
833 "user.name=Test",
834 "-c",
835 "user.email=test@example.com",
836 "commit",
837 "-m",
838 "C2-local",
839 ],
840 );
841
842 git_ok(repo.path(), &["branch", "remote-sim", &c1_oid]);
843 git_ok(repo.path(), &["checkout", "remote-sim"]);
844 std::fs::write(repo.path().join("c.txt"), "c").unwrap();
845 git_ok(repo.path(), &["add", "."]);
846 git_ok(
847 repo.path(),
848 &[
849 "-c",
850 "user.name=Test",
851 "-c",
852 "user.email=test@example.com",
853 "commit",
854 "-m",
855 "C3-remote",
856 ],
857 );
858
859 let c3_oid = git_stdout(repo.path(), &["rev-parse", "HEAD"]);
860 git_ok(repo.path(), &["checkout", "main"]);
861 git_ok(
862 repo.path(),
863 &["update-ref", "refs/remotes/origin/main", &c3_oid],
864 );
865
866 let sync = GitSync::new(repo.path(), None).unwrap();
867 let analysis = sync.check_divergence().unwrap();
868
869 assert!(analysis.is_diverged, "should be diverged");
870 assert!(analysis.is_ahead, "should be ahead");
871 assert!(analysis.is_behind, "should be behind");
872 }
873}