1use crate::git::shell_fetch;
2use crate::git::shell_push::PushFailureKind;
3use crate::git::shell_push::push_current_branch_with_result;
4use crate::git::utils::ensure_repo_ready_for_sync;
5use crate::git::utils::get_sync_branch;
6use crate::git::utils::is_worktree_dirty;
7use anyhow::Context;
8use anyhow::Result;
9use anyhow::bail;
10use chrono::DateTime;
11use chrono::Utc;
12use colored::Colorize;
13use git2::Commit;
14use git2::ErrorCode;
15use git2::Index;
16use git2::IndexAddOption;
17use git2::Oid;
18use git2::Repository;
19use git2::Signature;
20use git2::Tree;
21use serde::Deserialize;
22use serde::Serialize;
23use std::collections::HashMap;
24use std::path::Path;
25use std::time::Duration;
26use tokio::time::sleep;
27
28#[derive(Debug, Deserialize, Serialize)]
31struct LogEntryForMerge {
32 call_id: String,
33 started_at: DateTime<Utc>,
34 #[serde(flatten)]
35 rest: serde_json::Value,
36}
37
38fn is_tool_log_file(path: &str) -> bool {
44 if let Some(logs_idx) = path.find("/logs/") {
45 let after_logs = &path[logs_idx + 6..]; after_logs.starts_with("tool_logs_")
47 && std::path::Path::new(path)
48 .extension()
49 .is_some_and(|ext| ext.eq_ignore_ascii_case("jsonl"))
50 } else {
51 false
52 }
53}
54
55fn merge_jsonl_logs(ours_content: &[u8], theirs_content: &[u8]) -> Vec<u8> {
61 let mut records: HashMap<String, (DateTime<Utc>, String)> = HashMap::new();
62 let mut unparseable_lines: Vec<String> = Vec::new();
63
64 for line in String::from_utf8_lossy(ours_content).lines() {
66 if line.trim().is_empty() {
67 continue;
68 }
69 match serde_json::from_str::<LogEntryForMerge>(line) {
70 Ok(entry) => {
71 records.insert(entry.call_id.clone(), (entry.started_at, line.to_string()));
72 }
73 Err(_) => {
74 unparseable_lines.push(line.to_string());
75 }
76 }
77 }
78
79 for line in String::from_utf8_lossy(theirs_content).lines() {
81 if line.trim().is_empty() {
82 continue;
83 }
84 match serde_json::from_str::<LogEntryForMerge>(line) {
85 Ok(entry) => {
86 records.insert(entry.call_id.clone(), (entry.started_at, line.to_string()));
88 }
89 Err(_) => {
90 if !unparseable_lines.contains(&line.to_string()) {
92 unparseable_lines.push(line.to_string());
93 }
94 }
95 }
96 }
97
98 let mut sorted: Vec<_> = records.into_values().collect();
100 sorted.sort_by_key(|(ts, _)| *ts);
101
102 let mut output = sorted
104 .into_iter()
105 .map(|(_, line)| line)
106 .collect::<Vec<_>>()
107 .join("\n");
108
109 if !unparseable_lines.is_empty() {
110 if !output.is_empty() {
111 output.push('\n');
112 }
113 output.push_str(&unparseable_lines.join("\n"));
114 }
115
116 if !output.is_empty() {
117 output.push('\n');
118 }
119
120 output.into_bytes()
121}
122
123pub(crate) struct DivergenceAnalysis {
125 pub(crate) is_diverged: bool,
127 pub(crate) is_ahead: bool,
129 pub(crate) is_behind: bool,
131}
132
133const MAX_PUSH_RETRIES: u32 = 3;
134const RETRY_BASE_MS: u64 = 500;
135
136#[derive(Debug, Clone, Copy, PartialEq, Eq)]
137enum SyncRelation {
138 NoUpstream,
139 UpToDate,
140 AheadOnly,
141 BehindOnly,
142 Diverged,
143}
144
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146enum SyncAttemptOutcome {
147 NoHeadChange,
148 FastForwarded,
149 Committed,
150}
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153enum PushRaceResetMode {
154 Mixed,
155 Hard,
156}
157
158#[derive(Debug, Clone, Copy, PartialEq, Eq)]
159enum CommitParentPlan {
160 None,
161 HeadOnly,
162 UpstreamOnly,
163 HeadAndUpstream,
164}
165
166pub struct GitSync {
167 repo: Repository,
168 repo_path: std::path::PathBuf,
169 subpath: Option<String>,
170}
171
172impl GitSync {
173 pub fn new(repo_path: &Path, subpath: Option<String>) -> Result<Self> {
174 let repo = Repository::open(repo_path)?;
175 Ok(Self {
176 repo,
177 repo_path: repo_path.to_path_buf(),
178 subpath,
179 })
180 }
181
182 #[expect(
183 clippy::future_not_send,
184 reason = "git2::Repository is Send but not Sync; this is a known limitation"
185 )]
186 pub async fn sync(&self, mount_name: &str) -> Result<()> {
187 println!(" {} {}", "Syncing".cyan(), mount_name);
188
189 ensure_repo_ready_for_sync(&self.repo_path)?;
190
191 if self.repo.find_remote("origin").is_err() {
194 println!(
195 " {} No remote 'origin' configured (local-only)",
196 "Info".dimmed()
197 );
198 self.sync_without_remote(mount_name)?;
199 return Ok(());
200 }
201
202 let branch_name = get_sync_branch(&self.repo_path)?;
205
206 for attempt in 0..MAX_PUSH_RETRIES {
207 let attempt_head = self.head_commit_oid()?;
208 let sync_outcome = self.sync_once(mount_name, &branch_name)?;
209
210 let push_result =
211 push_current_branch_with_result(&self.repo_path, "origin", &branch_name)?;
212 if push_result.success {
213 println!(" {} Pushed to remote", "✓".green());
214 return Ok(());
215 }
216
217 let failure_kind = push_result.failure_kind.unwrap_or(PushFailureKind::Other);
218 if failure_kind == PushFailureKind::Race && attempt < MAX_PUSH_RETRIES - 1 {
219 println!(
220 " {} Push race detected; retrying after re-fetch",
221 "Info".dimmed()
222 );
223 let reset_mode = match sync_outcome {
224 SyncAttemptOutcome::FastForwarded => PushRaceResetMode::Hard,
225 SyncAttemptOutcome::NoHeadChange | SyncAttemptOutcome::Committed => {
226 PushRaceResetMode::Mixed
227 }
228 };
229 self.reset_after_push_race(attempt_head, reset_mode)?;
230 sleep(Duration::from_millis(RETRY_BASE_MS * 2u64.pow(attempt))).await;
231 continue;
232 }
233
234 let stderr = push_result.stderr.trim();
235 if stderr.is_empty() {
236 bail!("git push failed ({failure_kind:?})");
237 }
238 bail!("git push failed ({failure_kind:?}): {stderr}");
239 }
240
241 bail!("git push race retry budget exhausted after {MAX_PUSH_RETRIES} attempts")
242 }
243
244 fn sync_without_remote(&self, mount_name: &str) -> Result<()> {
245 let changes_staged = self.stage_changes()?;
246 if !changes_staged {
247 println!(" {} No changes to commit", "○".dimmed());
248 return Ok(());
249 }
250
251 let head_commit = self.head_commit()?;
252 let local_tree = self.local_tree_from_index()?;
253 let commit_oid = self.create_commit_from_relation(
254 mount_name,
255 &local_tree,
256 head_commit.as_ref(),
257 None,
258 SyncRelation::NoUpstream,
259 )?;
260 self.refresh_worktree_after_commit(commit_oid)?;
261 println!(" {} Committed changes", "✓".green());
262 Ok(())
263 }
264
265 fn sync_once(&self, mount_name: &str, branch_name: &str) -> Result<SyncAttemptOutcome> {
266 shell_fetch::fetch(&self.repo_path, "origin").with_context(|| {
267 format!(
268 "Fetch from origin failed for repo '{}'",
269 self.repo_path.display()
270 )
271 })?;
272
273 let head_commit = self.head_commit()?;
274 let upstream_commit = self.find_upstream_commit(branch_name)?;
275 let relation =
276 self.sync_relation(head_commit.as_ref(), upstream_commit.as_ref(), branch_name)?;
277
278 if let Some(upstream_commit) = upstream_commit.as_ref()
279 && self.should_premerge_before_staging(relation)?
280 {
281 self.premerge_jsonl_files(&upstream_commit.tree()?)?;
282 }
283
284 let changes_staged = self.stage_changes()?;
285 let local_tree = self.local_tree_from_index()?;
286
287 match relation {
288 SyncRelation::NoUpstream => {
289 if changes_staged {
290 let commit_oid = self.create_commit_from_relation(
291 mount_name,
292 &local_tree,
293 head_commit.as_ref(),
294 None,
295 relation,
296 )?;
297 self.refresh_worktree_after_commit(commit_oid)?;
298 println!(" {} Committed changes", "✓".green());
299 return Ok(SyncAttemptOutcome::Committed);
300 }
301 return Ok(SyncAttemptOutcome::NoHeadChange);
302 }
303 SyncRelation::UpToDate | SyncRelation::AheadOnly => {
304 if changes_staged {
305 let commit_oid = self.create_commit_from_relation(
306 mount_name,
307 &local_tree,
308 head_commit.as_ref(),
309 upstream_commit.as_ref(),
310 relation,
311 )?;
312 self.refresh_worktree_after_commit(commit_oid)?;
313 println!(" {} Committed changes", "✓".green());
314 return Ok(SyncAttemptOutcome::Committed);
315 }
316 println!(" {} No changes to commit", "○".dimmed());
317 return Ok(SyncAttemptOutcome::NoHeadChange);
318 }
319 SyncRelation::BehindOnly => {
320 let upstream_commit = upstream_commit.as_ref().ok_or_else(|| {
321 anyhow::anyhow!("Missing upstream commit for behind-only sync")
322 })?;
323 if !changes_staged {
324 self.fast_forward_to_commit(branch_name, upstream_commit)?;
325 println!(" {} Pulled remote changes", "✓".green());
326 return Ok(SyncAttemptOutcome::FastForwarded);
327 }
328 }
329 SyncRelation::Diverged => {
330 println!(
331 " {} Detected divergence from remote - merging before commit",
332 "Info".dimmed()
333 );
334 }
335 }
336
337 let upstream_commit = upstream_commit
338 .as_ref()
339 .ok_or_else(|| anyhow::anyhow!("Missing upstream commit for merge integration"))?;
340 let merged_tree = self.integrate_local_tree(
341 head_commit.as_ref(),
342 &local_tree,
343 upstream_commit,
344 relation,
345 )?;
346 let commit_oid = self.create_commit_from_relation(
347 mount_name,
348 &merged_tree,
349 head_commit.as_ref(),
350 Some(upstream_commit),
351 relation,
352 )?;
353 self.refresh_worktree_after_commit(commit_oid)?;
354 println!(" {} Integrated remote changes", "✓".green());
355
356 Ok(SyncAttemptOutcome::Committed)
357 }
358
359 fn should_premerge_before_staging(&self, relation: SyncRelation) -> Result<bool> {
360 Ok(match relation {
361 SyncRelation::Diverged => true,
362 SyncRelation::BehindOnly => is_worktree_dirty(&self.repo)?,
363 SyncRelation::NoUpstream | SyncRelation::UpToDate | SyncRelation::AheadOnly => false,
364 })
365 }
366
367 pub(crate) fn check_divergence(&self, branch_name: &str) -> Result<DivergenceAnalysis> {
369 let head = self.repo.head()?;
370 let upstream_ref = format!("refs/remotes/origin/{branch_name}");
371
372 let local_oid = head
373 .target()
374 .ok_or_else(|| anyhow::anyhow!("No HEAD target"))?;
375
376 let Ok(upstream_oid) = self.repo.refname_to_id(&upstream_ref) else {
377 return Ok(DivergenceAnalysis {
379 is_diverged: false,
380 is_ahead: true,
381 is_behind: false,
382 });
383 };
384
385 let (ahead, behind) = self.repo.graph_ahead_behind(local_oid, upstream_oid)?;
388
389 Ok(DivergenceAnalysis {
390 is_diverged: ahead > 0 && behind > 0,
391 is_ahead: ahead > 0,
392 is_behind: behind > 0,
393 })
394 }
395
396 fn sync_relation(
397 &self,
398 head_commit: Option<&Commit<'_>>,
399 upstream_commit: Option<&Commit<'_>>,
400 branch_name: &str,
401 ) -> Result<SyncRelation> {
402 match (head_commit, upstream_commit) {
403 (_, None) => Ok(SyncRelation::NoUpstream),
404 (None, Some(_)) => Ok(SyncRelation::BehindOnly),
405 (Some(_), Some(_)) => {
406 let analysis = self.check_divergence(branch_name)?;
407 Ok(
408 match (analysis.is_diverged, analysis.is_ahead, analysis.is_behind) {
409 (false, true, false) => SyncRelation::AheadOnly,
410 (false, false, true) => SyncRelation::BehindOnly,
411 (false, false, false) => SyncRelation::UpToDate,
412 _ => SyncRelation::Diverged,
414 },
415 )
416 }
417 }
418 }
419
420 fn head_commit(&self) -> Result<Option<Commit<'_>>> {
421 match self.repo.head() {
422 Ok(head) => {
423 let target = head
424 .target()
425 .ok_or_else(|| anyhow::anyhow!("No HEAD target"))?;
426 Ok(Some(self.repo.find_commit(target)?))
427 }
428 Err(e) if e.code() == ErrorCode::UnbornBranch => Ok(None),
429 Err(e) => Err(e.into()),
430 }
431 }
432
433 fn head_commit_oid(&self) -> Result<Option<Oid>> {
434 Ok(self.head_commit()?.map(|commit| commit.id()))
435 }
436
437 fn find_upstream_commit(&self, branch_name: &str) -> Result<Option<Commit<'_>>> {
438 match self
439 .repo
440 .refname_to_id(&format!("refs/remotes/origin/{branch_name}"))
441 {
442 Ok(oid) => Ok(Some(self.repo.find_commit(oid)?)),
443 Err(_) => Ok(None),
444 }
445 }
446
447 fn local_tree_from_index(&self) -> Result<Tree<'_>> {
448 let mut index = self.repo.index()?;
449 let tree_id = index.write_tree()?;
450 self.repo.find_tree(tree_id).map_err(Into::into)
451 }
452
453 fn integrate_local_tree(
454 &self,
455 head_commit: Option<&Commit<'_>>,
456 local_tree: &Tree<'_>,
457 upstream_commit: &Commit<'_>,
458 relation: SyncRelation,
459 ) -> Result<Tree<'_>> {
460 let ancestor_tree_id =
461 self.ancestor_tree_for_merge(head_commit, upstream_commit, relation)?;
462 let ancestor_tree = self.repo.find_tree(ancestor_tree_id)?;
463 let upstream_tree = upstream_commit.tree()?;
464 let mut merged_index =
465 self.repo
466 .merge_trees(&ancestor_tree, local_tree, &upstream_tree, None)?;
467
468 if merged_index.has_conflicts() {
469 self.resolve_merge_conflicts(&mut merged_index)?;
470 }
471 if merged_index.has_conflicts() {
472 bail!("Failed to resolve merge conflicts before final commit");
473 }
474
475 let tree_id = merged_index.write_tree_to(&self.repo)?;
476 self.repo.find_tree(tree_id).map_err(Into::into)
477 }
478
479 fn ancestor_tree_for_merge(
480 &self,
481 head_commit: Option<&Commit<'_>>,
482 upstream_commit: &Commit<'_>,
483 relation: SyncRelation,
484 ) -> Result<Oid> {
485 match relation {
486 SyncRelation::BehindOnly => match head_commit {
487 Some(head_commit) => Ok(head_commit.tree_id()),
488 None => self.empty_tree().map(|tree| tree.id()),
489 },
490 SyncRelation::Diverged => {
491 let head_commit = head_commit
492 .ok_or_else(|| anyhow::anyhow!("Missing HEAD commit for diverged merge"))?;
493 match self.repo.merge_base(head_commit.id(), upstream_commit.id()) {
494 Ok(merge_base_oid) => Ok(self.repo.find_commit(merge_base_oid)?.tree_id()),
495 Err(_) => self.empty_tree().map(|tree| tree.id()),
496 }
497 }
498 _ => self.empty_tree().map(|tree| tree.id()),
499 }
500 }
501
502 fn empty_tree(&self) -> Result<Tree<'_>> {
503 let mut index = Index::new()?;
504 let tree_id = index.write_tree_to(&self.repo)?;
505 self.repo.find_tree(tree_id).map_err(Into::into)
506 }
507
508 fn resolve_merge_conflicts(&self, index: &mut Index) -> Result<()> {
509 const GIT_INDEX_ENTRY_STAGEMASK: u16 = 0x3000;
511
512 let conflicts: Vec<_> = index
513 .conflicts()?
514 .collect::<std::result::Result<Vec<_>, _>>()?;
515
516 for conflict in conflicts {
517 let path = conflict
518 .our
519 .as_ref()
520 .or(conflict.their.as_ref())
521 .or(conflict.ancestor.as_ref())
522 .map(|entry| String::from_utf8_lossy(&entry.path).to_string())
523 .unwrap_or_default();
524
525 if is_tool_log_file(&path)
526 && let (Some(local), Some(remote)) = (&conflict.our, &conflict.their)
527 {
528 let local_blob = self.repo.find_blob(local.id)?;
529 let remote_blob = self.repo.find_blob(remote.id)?;
530 let merged = merge_jsonl_logs(remote_blob.content(), local_blob.content());
531
532 let file_path = self.repo_path.join(&path);
534 if let Some(parent) = file_path.parent() {
535 std::fs::create_dir_all(parent)?;
536 }
537 std::fs::write(&file_path, &merged)?;
538
539 let blob_oid = self.repo.blob(&merged)?;
543
544 index.conflict_remove(Path::new(&path))?;
548
549 let entry = git2::IndexEntry {
550 id: blob_oid,
551 file_size: u32::try_from(merged.len()).unwrap_or(u32::MAX),
552 ctime: local.ctime,
554 mtime: local.mtime,
555 dev: local.dev,
556 ino: local.ino,
557 mode: local.mode,
558 uid: local.uid,
559 gid: local.gid,
560 flags: local.flags & !GIT_INDEX_ENTRY_STAGEMASK,
561 flags_extended: local.flags_extended,
562 path: local.path.clone(),
563 };
564 index.add(&entry)?;
565 continue;
566 }
567
568 match (&conflict.our, &conflict.their) {
570 (_, Some(remote)) => {
571 index.conflict_remove(Path::new(&path))?;
573 let resolved = git2::IndexEntry {
574 ctime: remote.ctime,
575 mtime: remote.mtime,
576 dev: remote.dev,
577 ino: remote.ino,
578 mode: remote.mode,
579 uid: remote.uid,
580 gid: remote.gid,
581 file_size: remote.file_size,
582 id: remote.id,
583 flags: remote.flags & !GIT_INDEX_ENTRY_STAGEMASK,
584 flags_extended: remote.flags_extended,
585 path: remote.path.clone(),
586 };
587 index.add(&resolved)?;
588 }
589 (Some(_), None) => {
590 index.conflict_remove(Path::new(&path))?;
592 }
593 (None, None) => {}
594 }
595 }
596
597 Ok(())
600 }
601
602 fn create_commit_from_relation(
603 &self,
604 mount_name: &str,
605 tree: &Tree<'_>,
606 head_commit: Option<&Commit<'_>>,
607 upstream_commit: Option<&Commit<'_>>,
608 relation: SyncRelation,
609 ) -> Result<Oid> {
610 match commit_parent_plan(relation, head_commit.is_some(), upstream_commit.is_some())? {
611 CommitParentPlan::None => self.create_commit_for_tree(mount_name, tree, &[]),
612 CommitParentPlan::HeadOnly => {
613 let parents = head_commit.map(|commit| vec![commit]).unwrap_or_default();
614 self.create_commit_for_tree(mount_name, tree, &parents)
615 }
616 CommitParentPlan::UpstreamOnly => {
617 let upstream_commit = upstream_commit.ok_or_else(|| {
618 anyhow::anyhow!("Missing upstream commit for behind-only commit")
619 })?;
620 self.create_commit_for_tree(mount_name, tree, &[upstream_commit])
621 }
622 CommitParentPlan::HeadAndUpstream => {
623 let head_commit = head_commit
624 .ok_or_else(|| anyhow::anyhow!("Missing HEAD commit for diverged commit"))?;
625 let upstream_commit = upstream_commit.ok_or_else(|| {
626 anyhow::anyhow!("Missing upstream commit for diverged commit")
627 })?;
628 self.create_commit_for_tree(mount_name, tree, &[head_commit, upstream_commit])
629 }
630 }
631 }
632
633 fn create_commit_for_tree(
634 &self,
635 mount_name: &str,
636 tree: &Tree<'_>,
637 parents: &[&Commit<'_>],
638 ) -> Result<Oid> {
639 let sig = Signature::now("thoughts-sync", "thoughts@sync.local")?;
640 let message = if let Some(subpath) = &self.subpath {
641 format!("Auto-sync thoughts for {mount_name} (subpath: {subpath})")
642 } else {
643 format!("Auto-sync thoughts for {mount_name}")
644 };
645
646 let commit_oid = self
650 .repo
651 .commit(None, &sig, &sig, &message, tree, parents)?;
652
653 let (refname, is_branch) = match self.repo.head() {
657 Ok(head_ref) => {
658 let name = head_ref
659 .name()
660 .ok_or_else(|| anyhow::anyhow!("HEAD has no name"))?;
661 (name.to_string(), head_ref.is_branch())
662 }
663 Err(e) if e.code() == git2::ErrorCode::UnbornBranch => {
664 let head_ref = self.repo.find_reference("HEAD")?;
667 let symbolic_target = head_ref
668 .symbolic_target()
669 .ok_or_else(|| anyhow::anyhow!("HEAD has no symbolic target"))?;
670 (symbolic_target.to_string(), true)
671 }
672 Err(e) => return Err(e.into()),
673 };
674
675 if is_branch {
678 self.repo.reference(
679 &refname,
680 commit_oid,
681 true, &format!("thoughts-sync: {message}"),
683 )?;
684 } else {
685 self.repo.set_head_detached(commit_oid)?;
686 }
687
688 Ok(commit_oid)
689 }
690
691 fn refresh_worktree_after_commit(&self, commit_oid: Oid) -> Result<()> {
692 if self.subpath.is_some() {
693 let commit = self.repo.find_commit(commit_oid)?;
694 self.refresh_subpath_after_commit(&commit)?;
695 return Ok(());
696 }
697
698 let obj = self.repo.find_object(commit_oid, None)?;
699 self.repo.reset(
700 &obj,
701 git2::ResetType::Hard,
702 Some(git2::build::CheckoutBuilder::default().force()),
703 )?;
704 Ok(())
705 }
706
707 fn refresh_subpath_after_commit(&self, commit: &Commit<'_>) -> Result<()> {
708 let subpath = self
709 .subpath
710 .as_deref()
711 .ok_or_else(|| anyhow::anyhow!("Missing subpath for subpath refresh"))?;
712 let tree = commit.tree()?;
713 let mut checkout = git2::build::CheckoutBuilder::default();
714 checkout.force().path(subpath);
715 self.repo
716 .checkout_tree(tree.as_object(), Some(&mut checkout))?;
717 self.refresh_index_in_scope()
718 }
719
720 fn fast_forward_to_commit(
721 &self,
722 branch_name: &str,
723 upstream_commit: &Commit<'_>,
724 ) -> Result<()> {
725 if is_worktree_dirty(&self.repo)? {
726 bail!(
727 "Cannot fast-forward: working tree has uncommitted changes. Please commit or stash before syncing."
728 );
729 }
730
731 self.repo.set_head(&format!("refs/heads/{branch_name}"))?;
732 let obj = self.repo.find_object(upstream_commit.id(), None)?;
733 self.repo.reset(
734 &obj,
735 git2::ResetType::Hard,
736 Some(git2::build::CheckoutBuilder::default().force()),
737 )?;
738 Ok(())
739 }
740
741 fn reset_after_push_race(
742 &self,
743 original_head: Option<Oid>,
744 reset_mode: PushRaceResetMode,
745 ) -> Result<()> {
746 if let Some(original_head) = original_head {
747 let obj = self.repo.find_object(original_head, None)?;
748 match reset_mode {
749 PushRaceResetMode::Mixed => {
750 self.repo.reset(&obj, git2::ResetType::Mixed, None)?;
751 }
752 PushRaceResetMode::Hard => {
753 self.repo.reset(
754 &obj,
755 git2::ResetType::Hard,
756 Some(git2::build::CheckoutBuilder::default().force()),
757 )?;
758 }
759 }
760 } else {
761 let branch_name = get_sync_branch(&self.repo_path)?;
762 self.repo.set_head(&format!("refs/heads/{branch_name}"))?;
763 self.repo.cleanup_state()?;
764 }
765 Ok(())
766 }
767
768 fn premerge_jsonl_files(&self, upstream_tree: &Tree<'_>) -> Result<()> {
769 for rel_path in self.tool_log_files_in_scope()? {
770 let Some(upstream_content) = self.read_tree_blob(upstream_tree, &rel_path)? else {
771 continue;
772 };
773
774 let local_path = self.repo_path.join(&rel_path);
775 let local_content = std::fs::read(&local_path)?;
776 let merged = merge_jsonl_logs(&upstream_content, &local_content);
777 if merged != local_content {
778 std::fs::write(local_path, merged)?;
779 }
780 }
781 Ok(())
782 }
783
784 fn tool_log_files_in_scope(&self) -> Result<Vec<String>> {
785 let root = self.subpath.as_ref().map_or_else(
786 || self.repo_path.clone(),
787 |subpath| self.repo_path.join(subpath),
788 );
789 let mut files = Vec::new();
790 self.collect_tool_log_files(&root, &mut files)?;
791 files.sort();
792 Ok(files)
793 }
794
795 fn collect_tool_log_files(&self, dir: &Path, files: &mut Vec<String>) -> Result<()> {
796 if !dir.exists() {
797 return Ok(());
798 }
799
800 for entry in std::fs::read_dir(dir)? {
801 let entry = entry?;
802 let path = entry.path();
803 if path.file_name().is_some_and(|name| name == ".git") {
804 continue;
805 }
806
807 if path.is_dir() {
808 self.collect_tool_log_files(&path, files)?;
809 continue;
810 }
811
812 let rel_path = path
813 .strip_prefix(&self.repo_path)
814 .with_context(|| format!("Failed to strip repo prefix from {}", path.display()))?;
815 let rel_path = rel_path.to_string_lossy().replace('\\', "/");
816 if is_tool_log_file(&rel_path) {
817 files.push(rel_path);
818 }
819 }
820
821 Ok(())
822 }
823
824 fn read_tree_blob(&self, tree: &Tree<'_>, rel_path: &str) -> Result<Option<Vec<u8>>> {
825 let entry = match tree.get_path(Path::new(rel_path)) {
826 Ok(entry) => entry,
827 Err(err) if err.code() == ErrorCode::NotFound => return Ok(None),
828 Err(err) => return Err(err.into()),
829 };
830
831 let blob = self.repo.find_blob(entry.id())?;
832 Ok(Some(blob.content().to_vec()))
833 }
834
835 fn stage_changes(&self) -> Result<bool> {
836 self.refresh_index_in_scope()?;
837
838 let index = self.repo.index()?;
839
840 let diff = match self.repo.head() {
843 Ok(head) => {
844 let head_oid = head
845 .target()
846 .ok_or_else(|| anyhow::anyhow!("HEAD reference has no target"))?;
847 let head_tree = self.repo.find_commit(head_oid)?.tree()?;
848 self.repo
849 .diff_tree_to_index(Some(&head_tree), Some(&index), None)?
850 }
851 Err(e) if e.code() == git2::ErrorCode::UnbornBranch => {
852 self.repo.diff_tree_to_index(None, Some(&index), None)?
854 }
855 Err(e) => return Err(e.into()),
856 };
857
858 Ok(diff.stats()?.files_changed() > 0)
859 }
860
861 fn refresh_index_in_scope(&self) -> Result<()> {
862 let mut index = self.repo.index()?;
863 let pathspecs = self.scoped_pathspecs();
864
865 index.add_all(pathspecs.iter(), IndexAddOption::DEFAULT, None)?;
866
867 index.update_all(pathspecs.iter(), None)?;
869
870 index.write()?;
871
872 Ok(())
873 }
874
875 fn scoped_pathspecs(&self) -> Vec<String> {
876 if let Some(subpath) = &self.subpath {
877 vec![format!("{}/*", subpath), format!("{}/**/*", subpath)]
878 } else {
879 vec![".".to_string()]
880 }
881 }
882}
883
884fn commit_parent_plan(
885 relation: SyncRelation,
886 has_head: bool,
887 has_upstream: bool,
888) -> Result<CommitParentPlan> {
889 Ok(match relation {
890 SyncRelation::NoUpstream | SyncRelation::UpToDate | SyncRelation::AheadOnly => {
891 if has_head {
892 CommitParentPlan::HeadOnly
893 } else {
894 CommitParentPlan::None
895 }
896 }
897 SyncRelation::BehindOnly => {
898 if !has_upstream {
899 bail!("Missing upstream commit for behind-only commit");
900 }
901 CommitParentPlan::UpstreamOnly
902 }
903 SyncRelation::Diverged => {
904 if !has_head || !has_upstream {
905 bail!("Missing head or upstream commit for diverged commit");
906 }
907 CommitParentPlan::HeadAndUpstream
908 }
909 })
910}
911
912#[cfg(test)]
913mod tests {
914 use super::*;
915
916 #[test]
917 fn test_merge_jsonl_deduplicates_by_call_id() {
918 let ours = br#"{"call_id":"abc","started_at":"2025-01-01T10:00:00Z","tool":"foo"}
919{"call_id":"def","started_at":"2025-01-01T11:00:00Z","tool":"bar"}"#;
920 let theirs = br#"{"call_id":"abc","started_at":"2025-01-01T10:00:00Z","tool":"foo_updated"}
921{"call_id":"ghi","started_at":"2025-01-01T12:00:00Z","tool":"baz"}"#;
922
923 let merged = merge_jsonl_logs(ours, theirs);
924 let merged_str = String::from_utf8_lossy(&merged);
925
926 assert!(merged_str.contains("foo_updated"));
928 assert!(!merged_str.contains(r#""tool":"foo""#)); assert!(merged_str.contains("def"));
930 assert!(merged_str.contains("ghi"));
931 }
932
933 #[test]
934 fn test_merge_jsonl_preserves_unparseable() {
935 let ours = b"not valid json\n";
936 let theirs = br#"{"call_id":"abc","started_at":"2025-01-01T10:00:00Z","tool":"foo"}"#;
937
938 let merged = merge_jsonl_logs(ours, theirs);
939 let merged_str = String::from_utf8_lossy(&merged);
940
941 assert!(merged_str.contains("not valid json"));
942 assert!(merged_str.contains("call_id"));
943 }
944
945 #[test]
946 fn test_merge_jsonl_sorts_by_timestamp() {
947 let ours = br#"{"call_id":"late","started_at":"2025-01-01T15:00:00Z","tool":"c"}"#;
948 let theirs = br#"{"call_id":"early","started_at":"2025-01-01T09:00:00Z","tool":"a"}
949{"call_id":"mid","started_at":"2025-01-01T12:00:00Z","tool":"b"}"#;
950
951 let merged = merge_jsonl_logs(ours, theirs);
952 let merged_str = String::from_utf8_lossy(&merged);
953 let lines: Vec<_> = merged_str.lines().collect();
954
955 assert!(lines[0].contains("early"));
956 assert!(lines[1].contains("mid"));
957 assert!(lines[2].contains("late"));
958 }
959
960 #[test]
961 fn test_merge_jsonl_empty_files() {
962 let merged = merge_jsonl_logs(b"", b"");
963 assert!(merged.is_empty());
964 }
965
966 #[test]
967 fn test_merge_jsonl_one_side_empty() {
968 let content = br#"{"call_id":"abc","started_at":"2025-01-01T10:00:00Z","tool":"foo"}"#;
969
970 let merged_ours_empty = merge_jsonl_logs(b"", content);
971 assert!(String::from_utf8_lossy(&merged_ours_empty).contains("abc"));
972
973 let merged_theirs_empty = merge_jsonl_logs(content, b"");
974 assert!(String::from_utf8_lossy(&merged_theirs_empty).contains("abc"));
975 }
976
977 #[test]
978 fn test_merge_context_jsonl_keeps_local_on_collision() {
979 let remote = br#"{"call_id":"same","started_at":"2025-01-01T10:00:00Z","tool":"remote"}"#;
980 let local = br#"{"call_id":"same","started_at":"2025-01-01T10:00:00Z","tool":"local"}"#;
981
982 let merged = merge_jsonl_logs(remote, local);
983 let merged_str = String::from_utf8_lossy(&merged);
984
985 assert!(merged_str.contains("local"));
986 assert!(!merged_str.contains("remote"));
987 }
988
989 #[test]
990 fn test_is_tool_log_file() {
991 assert!(is_tool_log_file("branch/logs/tool_logs_2025-01-01.jsonl"));
993 assert!(is_tool_log_file(
994 "foo/logs/tool_logs_2025-01-01_abc123.jsonl"
995 ));
996 assert!(is_tool_log_file("a/b/c/logs/tool_logs_whatever.jsonl"));
997
998 assert!(!is_tool_log_file("branch/logs/other.jsonl"));
1000
1001 assert!(!is_tool_log_file(
1003 "branch/research/tool_logs_2025-01-01.jsonl"
1004 ));
1005
1006 assert!(!is_tool_log_file("branch/logs/tool_logs_2025-01-01.json"));
1008
1009 assert!(!is_tool_log_file("tool_logs_config/logs/readme.jsonl"));
1011 assert!(!is_tool_log_file("tool_logs_foo/logs/bar.jsonl"));
1012
1013 assert!(!is_tool_log_file("tool_logs_2025-01-01.jsonl"));
1015 }
1016
1017 #[test]
1018 fn commit_parent_plan_selects_expected_parents() {
1019 assert_eq!(
1020 commit_parent_plan(SyncRelation::NoUpstream, false, false).unwrap(),
1021 CommitParentPlan::None
1022 );
1023 assert_eq!(
1024 commit_parent_plan(SyncRelation::UpToDate, true, true).unwrap(),
1025 CommitParentPlan::HeadOnly
1026 );
1027 assert_eq!(
1028 commit_parent_plan(SyncRelation::AheadOnly, true, false).unwrap(),
1029 CommitParentPlan::HeadOnly
1030 );
1031 assert_eq!(
1032 commit_parent_plan(SyncRelation::BehindOnly, true, true).unwrap(),
1033 CommitParentPlan::UpstreamOnly
1034 );
1035 assert_eq!(
1036 commit_parent_plan(SyncRelation::Diverged, true, true).unwrap(),
1037 CommitParentPlan::HeadAndUpstream
1038 );
1039 }
1040
1041 fn git_ok(dir: &std::path::Path, args: &[&str]) {
1048 let out = std::process::Command::new("git")
1049 .current_dir(dir)
1050 .args(args)
1051 .output()
1052 .expect("failed to spawn git");
1053 assert!(
1054 out.status.success(),
1055 "git {:?} failed: {}",
1056 args,
1057 String::from_utf8_lossy(&out.stderr)
1058 );
1059 }
1060
1061 fn git_stdout(dir: &std::path::Path, args: &[&str]) -> String {
1063 let out = std::process::Command::new("git")
1064 .current_dir(dir)
1065 .args(args)
1066 .output()
1067 .expect("failed to spawn git");
1068 assert!(out.status.success());
1069 String::from_utf8_lossy(&out.stdout).trim().to_string()
1070 }
1071
1072 #[test]
1075 fn divergence_no_upstream_ref() {
1076 let repo = tempfile::TempDir::new().unwrap();
1077 git_ok(repo.path(), &["init"]);
1078 std::fs::write(repo.path().join("a.txt"), "a").unwrap();
1079 git_ok(repo.path(), &["add", "."]);
1080 git_ok(
1081 repo.path(),
1082 &[
1083 "-c",
1084 "user.name=Test",
1085 "-c",
1086 "user.email=test@example.com",
1087 "commit",
1088 "-m",
1089 "initial",
1090 ],
1091 );
1092
1093 let sync = GitSync::new(repo.path(), None).unwrap();
1094 let analysis = sync.check_divergence("main").unwrap();
1095
1096 assert!(!analysis.is_diverged, "should not be diverged");
1097 assert!(analysis.is_ahead, "should be ahead (no upstream)");
1098 assert!(!analysis.is_behind, "should not be behind");
1099 }
1100
1101 #[test]
1104 fn divergence_up_to_date() {
1105 let repo = tempfile::TempDir::new().unwrap();
1106 git_ok(repo.path(), &["init"]);
1107 std::fs::write(repo.path().join("a.txt"), "a").unwrap();
1108 git_ok(repo.path(), &["add", "."]);
1109 git_ok(
1110 repo.path(),
1111 &[
1112 "-c",
1113 "user.name=Test",
1114 "-c",
1115 "user.email=test@example.com",
1116 "commit",
1117 "-m",
1118 "initial",
1119 ],
1120 );
1121 git_ok(repo.path(), &["branch", "-M", "main"]);
1123
1124 let head_oid = git_stdout(repo.path(), &["rev-parse", "HEAD"]);
1125 git_ok(
1126 repo.path(),
1127 &["update-ref", "refs/remotes/origin/main", &head_oid],
1128 );
1129
1130 let sync = GitSync::new(repo.path(), None).unwrap();
1131 let analysis = sync.check_divergence("main").unwrap();
1132
1133 assert!(!analysis.is_diverged, "should not be diverged");
1134 assert!(!analysis.is_ahead, "should not be ahead");
1135 assert!(!analysis.is_behind, "should not be behind");
1136 }
1137
1138 #[test]
1141 fn divergence_local_ahead_only() {
1142 let repo = tempfile::TempDir::new().unwrap();
1143 git_ok(repo.path(), &["init"]);
1144 std::fs::write(repo.path().join("a.txt"), "a").unwrap();
1145 git_ok(repo.path(), &["add", "."]);
1146 git_ok(
1147 repo.path(),
1148 &[
1149 "-c",
1150 "user.name=Test",
1151 "-c",
1152 "user.email=test@example.com",
1153 "commit",
1154 "-m",
1155 "C1",
1156 ],
1157 );
1158 git_ok(repo.path(), &["branch", "-M", "main"]);
1160
1161 let c1_oid = git_stdout(repo.path(), &["rev-parse", "HEAD"]);
1162 git_ok(
1163 repo.path(),
1164 &["update-ref", "refs/remotes/origin/main", &c1_oid],
1165 );
1166
1167 std::fs::write(repo.path().join("b.txt"), "b").unwrap();
1168 git_ok(repo.path(), &["add", "."]);
1169 git_ok(
1170 repo.path(),
1171 &[
1172 "-c",
1173 "user.name=Test",
1174 "-c",
1175 "user.email=test@example.com",
1176 "commit",
1177 "-m",
1178 "C2",
1179 ],
1180 );
1181
1182 let sync = GitSync::new(repo.path(), None).unwrap();
1183 let analysis = sync.check_divergence("main").unwrap();
1184
1185 assert!(!analysis.is_diverged, "should not be diverged");
1186 assert!(analysis.is_ahead, "should be ahead");
1187 assert!(!analysis.is_behind, "should not be behind");
1188 }
1189
1190 #[test]
1193 fn divergence_local_behind_only() {
1194 let repo = tempfile::TempDir::new().unwrap();
1195 git_ok(repo.path(), &["init"]);
1196 std::fs::write(repo.path().join("a.txt"), "a").unwrap();
1197 git_ok(repo.path(), &["add", "."]);
1198 git_ok(
1199 repo.path(),
1200 &[
1201 "-c",
1202 "user.name=Test",
1203 "-c",
1204 "user.email=test@example.com",
1205 "commit",
1206 "-m",
1207 "C1",
1208 ],
1209 );
1210 git_ok(repo.path(), &["branch", "-M", "main"]);
1212
1213 std::fs::write(repo.path().join("b.txt"), "b").unwrap();
1214 git_ok(repo.path(), &["add", "."]);
1215 git_ok(
1216 repo.path(),
1217 &[
1218 "-c",
1219 "user.name=Test",
1220 "-c",
1221 "user.email=test@example.com",
1222 "commit",
1223 "-m",
1224 "C2",
1225 ],
1226 );
1227
1228 let c2_oid = git_stdout(repo.path(), &["rev-parse", "HEAD"]);
1229 git_ok(repo.path(), &["reset", "--hard", "HEAD~1"]);
1230 git_ok(
1231 repo.path(),
1232 &["update-ref", "refs/remotes/origin/main", &c2_oid],
1233 );
1234
1235 let sync = GitSync::new(repo.path(), None).unwrap();
1236 let analysis = sync.check_divergence("main").unwrap();
1237
1238 assert!(!analysis.is_diverged, "should not be diverged");
1239 assert!(!analysis.is_ahead, "should not be ahead");
1240 assert!(analysis.is_behind, "should be behind");
1241 }
1242
1243 #[test]
1246 fn divergence_diverged() {
1247 let repo = tempfile::TempDir::new().unwrap();
1248 git_ok(repo.path(), &["init"]);
1249 std::fs::write(repo.path().join("a.txt"), "a").unwrap();
1250 git_ok(repo.path(), &["add", "."]);
1251 git_ok(
1252 repo.path(),
1253 &[
1254 "-c",
1255 "user.name=Test",
1256 "-c",
1257 "user.email=test@example.com",
1258 "commit",
1259 "-m",
1260 "C1",
1261 ],
1262 );
1263 git_ok(repo.path(), &["branch", "-M", "main"]);
1265
1266 let c1_oid = git_stdout(repo.path(), &["rev-parse", "HEAD"]);
1267
1268 std::fs::write(repo.path().join("b.txt"), "b").unwrap();
1269 git_ok(repo.path(), &["add", "."]);
1270 git_ok(
1271 repo.path(),
1272 &[
1273 "-c",
1274 "user.name=Test",
1275 "-c",
1276 "user.email=test@example.com",
1277 "commit",
1278 "-m",
1279 "C2-local",
1280 ],
1281 );
1282
1283 git_ok(repo.path(), &["branch", "remote-sim", &c1_oid]);
1284 git_ok(repo.path(), &["checkout", "remote-sim"]);
1285 std::fs::write(repo.path().join("c.txt"), "c").unwrap();
1286 git_ok(repo.path(), &["add", "."]);
1287 git_ok(
1288 repo.path(),
1289 &[
1290 "-c",
1291 "user.name=Test",
1292 "-c",
1293 "user.email=test@example.com",
1294 "commit",
1295 "-m",
1296 "C3-remote",
1297 ],
1298 );
1299
1300 let c3_oid = git_stdout(repo.path(), &["rev-parse", "HEAD"]);
1301 git_ok(repo.path(), &["checkout", "main"]);
1302 git_ok(
1303 repo.path(),
1304 &["update-ref", "refs/remotes/origin/main", &c3_oid],
1305 );
1306
1307 let sync = GitSync::new(repo.path(), None).unwrap();
1308 let analysis = sync.check_divergence("main").unwrap();
1309
1310 assert!(analysis.is_diverged, "should be diverged");
1311 assert!(analysis.is_ahead, "should be ahead");
1312 assert!(analysis.is_behind, "should be behind");
1313 }
1314
1315 #[test]
1316 fn refresh_worktree_after_commit_refreshes_only_subpath() {
1317 let repo = tempfile::TempDir::new().unwrap();
1318 git_ok(repo.path(), &["init"]);
1319 std::fs::create_dir_all(repo.path().join("branch")).unwrap();
1320 std::fs::write(repo.path().join("branch/data.txt"), "committed\n").unwrap();
1321 std::fs::write(repo.path().join("outside.txt"), "outside\n").unwrap();
1322 git_ok(repo.path(), &["add", "."]);
1323 git_ok(
1324 repo.path(),
1325 &[
1326 "-c",
1327 "user.name=Test",
1328 "-c",
1329 "user.email=test@example.com",
1330 "commit",
1331 "-m",
1332 "initial",
1333 ],
1334 );
1335 git_ok(repo.path(), &["branch", "-M", "main"]);
1336
1337 std::fs::write(repo.path().join("branch/data.txt"), "stale branch\n").unwrap();
1338 std::fs::write(repo.path().join("outside.txt"), "stale outside\n").unwrap();
1339 git_ok(repo.path(), &["add", "branch/data.txt", "outside.txt"]);
1340
1341 let sync = GitSync::new(repo.path(), Some("branch".to_string())).unwrap();
1342 let head_oid = Oid::from_str(&git_stdout(repo.path(), &["rev-parse", "HEAD"])).unwrap();
1343
1344 sync.refresh_worktree_after_commit(head_oid).unwrap();
1345
1346 assert_eq!(
1347 std::fs::read_to_string(repo.path().join("branch/data.txt")).unwrap(),
1348 "committed\n"
1349 );
1350 assert_eq!(
1351 std::fs::read_to_string(repo.path().join("outside.txt")).unwrap(),
1352 "stale outside\n"
1353 );
1354
1355 let status = git_stdout(repo.path(), &["status", "--short"]);
1356 assert!(!status.contains("branch/data.txt"), "status was: {status}");
1357 assert!(status.contains("outside.txt"), "status was: {status}");
1358 }
1359
1360 #[test]
1361 fn reset_after_push_race_hard_restores_fast_forwarded_worktree() {
1362 let repo = tempfile::TempDir::new().unwrap();
1363 git_ok(repo.path(), &["init"]);
1364 std::fs::write(repo.path().join("base.txt"), "one\n").unwrap();
1365 git_ok(repo.path(), &["add", "."]);
1366 git_ok(
1367 repo.path(),
1368 &[
1369 "-c",
1370 "user.name=Test",
1371 "-c",
1372 "user.email=test@example.com",
1373 "commit",
1374 "-m",
1375 "c1",
1376 ],
1377 );
1378 git_ok(repo.path(), &["branch", "-M", "main"]);
1379 let c1 = git_stdout(repo.path(), &["rev-parse", "HEAD"]);
1380
1381 std::fs::write(repo.path().join("base.txt"), "two\n").unwrap();
1382 git_ok(repo.path(), &["add", "."]);
1383 git_ok(
1384 repo.path(),
1385 &[
1386 "-c",
1387 "user.name=Test",
1388 "-c",
1389 "user.email=test@example.com",
1390 "commit",
1391 "-m",
1392 "c2",
1393 ],
1394 );
1395 let c2 = git_stdout(repo.path(), &["rev-parse", "HEAD"]);
1396 git_ok(repo.path(), &["reset", "--hard", &c1]);
1397
1398 let sync = GitSync::new(repo.path(), None).unwrap();
1399 let c2_commit = sync.repo.find_commit(Oid::from_str(&c2).unwrap()).unwrap();
1400
1401 sync.fast_forward_to_commit("main", &c2_commit).unwrap();
1402 assert_eq!(
1403 std::fs::read_to_string(repo.path().join("base.txt")).unwrap(),
1404 "two\n"
1405 );
1406
1407 sync.reset_after_push_race(Some(Oid::from_str(&c1).unwrap()), PushRaceResetMode::Hard)
1408 .unwrap();
1409
1410 assert_eq!(git_stdout(repo.path(), &["rev-parse", "HEAD"]), c1);
1411 assert_eq!(
1412 std::fs::read_to_string(repo.path().join("base.txt")).unwrap(),
1413 "one\n"
1414 );
1415 assert!(git_stdout(repo.path(), &["status", "--short"]).is_empty());
1416 }
1417}