liboxen 0.50.0

Oxen is a fast, unstructured data version control, to help version large machine learning datasets written in Rust.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
use crate::config::UserConfig;
use crate::constants;
use crate::core::db;
use crate::core::db::merkle_node::MerkleNodeDB;
pub use crate::core::merge::entry_merge_conflict_db_reader::EntryMergeConflictDBReader;
pub use crate::core::merge::node_merge_conflict_db_reader::NodeMergeConflictDBReader;
use crate::core::merge::node_merge_conflict_reader::NodeMergeConflictReader;
use crate::core::merge::{db_path, node_merge_conflict_writer};
use crate::core::refs::with_ref_manager;
use crate::core::v_latest::add;
use crate::core::v_latest::branches::OnConflict;
use crate::core::v_latest::commits::{get_commit_or_head, list_between};
use crate::core::v_latest::merge_marker;
use crate::error::OxenError;
use crate::model::NewCommit;
use crate::model::StagedEntryStatus;
use crate::model::merge_conflict::NodeMergeConflict;
use crate::model::merkle_tree::node::CommitNode;
use crate::model::merkle_tree::node::StagedMerkleTreeNode;
use crate::model::merkle_tree::node::commit_node::CommitNodeOpts;
use crate::model::merkle_tree::node::file_node::FileNode;
use crate::model::merkle_tree::node::{EMerkleTreeNode, MerkleTreeNode};
use crate::model::{Branch, Commit, LocalRepository};
use crate::model::{MerkleHash, PartialNode};
use crate::repositories;
use crate::repositories::commits::commit_writer;
use crate::repositories::merge::MergeCommits;
use crate::util;

use rocksdb::DB;
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::str;

use super::index::restore;
use super::index::restore::FileToRestore;

// entries_to_restore: files that ought to be restored from the currently traversed tree
// I.e., In walk_merge_commit, it contains merge commit files that are not present in or have changed from the base tree
// cannot_overwrite_entries: files that would be restored, but are modified from the from_tree, and thus would erase work if overwritten

// for walk_base_dir, the 'entries to restore' are actually entries being removed
struct MergeResult {
    pub entries_to_restore: Vec<FileToRestore>,
    pub cannot_overwrite_entries: Vec<PathBuf>,
}

impl MergeResult {
    pub fn new() -> Self {
        MergeResult {
            entries_to_restore: vec![],
            cannot_overwrite_entries: vec![],
        }
    }
}

/// Whether a merge is attached to a local working tree.
#[derive(Debug, Clone, Copy)]
pub enum LocalCheckout {
    /// Server-side merge: operates on tree data only and does not touch the working tree.
    Absent,
    /// Client-side merge: restores files on disk and advances HEAD, bracketed by MERGE_IN_PROGRESS.
    ///
    /// `is_resume` is true when a prior attempt at this target was interrupted; the restore path
    /// then force-restores instead of conflict-checking.
    Present { is_resume: bool },
}

impl LocalCheckout {
    pub fn writes_to_disk(self) -> bool {
        matches!(self, Self::Present { .. })
    }

    pub fn is_resume(self) -> bool {
        matches!(self, Self::Present { is_resume: true })
    }
}

/// Result of a three-way merge conflict analysis. Contains both the conflicts found and the files
/// from the merge branch that should be included in the merge result.
pub struct MergeConflictAnalysis {
    pub conflicts: Vec<NodeMergeConflict>,
    /// Files added, modified, or deleted on the merge branch (relative to LCA) that should be
    /// applied to the base tree. Each entry carries its `StagedEntryStatus`.
    pub entries: Vec<(PathBuf, FileNode, StagedEntryStatus)>,
}

pub async fn has_conflicts(
    repo: &LocalRepository,
    base_branch: &Branch,
    merge_branch: &Branch,
) -> Result<bool, OxenError> {
    let base_commit =
        repositories::commits::get_commit_or_head(repo, Some(base_branch.commit_id.clone()))?;
    let merge_commit =
        repositories::commits::get_commit_or_head(repo, Some(merge_branch.commit_id.clone()))?;

    let res = can_merge_commits(repo, &base_commit, &merge_commit).await?;
    Ok(!res)
}

pub fn list_conflicts(repo: &LocalRepository) -> Result<Vec<NodeMergeConflict>, OxenError> {
    match NodeMergeConflictReader::new(repo) {
        Ok(reader) => reader.list_conflicts(),
        Err(e) => {
            log::debug!("Error creating NodeMergeConflictReader: {e}");
            Ok(Vec::new())
        }
    }
}

pub fn mark_conflict_as_resolved(repo: &LocalRepository, path: &Path) -> Result<(), OxenError> {
    node_merge_conflict_writer::mark_conflict_as_resolved_in_db(repo, path)
}

/// Check if there are conflicts between the merge commit and the base commit
/// Returns true if there are no conflicts, false if there are conflicts
pub async fn can_merge_commits(
    repo: &LocalRepository,
    base_commit: &Commit,
    merge_commit: &Commit,
) -> Result<bool, OxenError> {
    let lca = lowest_common_ancestor_from_commits(repo, base_commit, merge_commit)?;

    let merge_commits = MergeCommits {
        lca,
        base: base_commit.clone(),
        merge: merge_commit.clone(),
    };

    if merge_commits.is_fast_forward_merge() {
        // If it is fast forward merge, there are no merge conflicts
        return Ok(true);
    }

    let mut _hashes = HashSet::new();
    let analysis =
        find_merge_conflicts(repo, &merge_commits, LocalCheckout::Absent, &mut _hashes).await?;
    Ok(analysis.conflicts.is_empty())
}

pub async fn list_conflicts_between_branches(
    repo: &LocalRepository,
    base_branch: &Branch,
    merge_branch: &Branch,
) -> Result<Vec<PathBuf>, OxenError> {
    let base_commit = get_commit_or_head(repo, Some(base_branch.commit_id.clone()))?;
    let merge_commit = get_commit_or_head(repo, Some(merge_branch.commit_id.clone()))?;

    list_conflicts_between_commits(repo, &base_commit, &merge_commit).await
}

pub fn list_commits_between_branches(
    repo: &LocalRepository,
    base_branch: &Branch,
    head_branch: &Branch,
) -> Result<Vec<Commit>, OxenError> {
    log::debug!("list_commits_between_branches() base: {base_branch:?} head: {head_branch:?}");
    let base_commit = get_commit_or_head(repo, Some(base_branch.commit_id.clone()))?;
    let head_commit = get_commit_or_head(repo, Some(head_branch.commit_id.clone()))?;

    let Some(lca) = lowest_common_ancestor_from_commits(repo, &base_commit, &head_commit)? else {
        return Err(OxenError::basic_str(format!(
            "Error: head commit {:?} and base commit {:?} have no common ancestor",
            head_commit.id, base_commit.id
        )));
    };

    log::debug!(
        "list_commits_between_branches {base_commit:?} -> {head_commit:?} found lca {lca:?}"
    );
    list_between(repo, &lca, &head_commit)
}

pub fn list_commits_between_commits(
    repo: &LocalRepository,
    base_commit: &Commit,
    head_commit: &Commit,
) -> Result<Vec<Commit>, OxenError> {
    log::debug!("list_commits_between_commits()\nbase: {base_commit}\nhead: {head_commit}");

    let Some(lca) = lowest_common_ancestor_from_commits(repo, base_commit, head_commit)? else {
        return Err(OxenError::basic_str(format!(
            "Error: head commit {:?} and base commit {:?} have no common ancestor",
            head_commit.id, base_commit.id
        )));
    };

    log::debug!("For commits {base_commit:?} -> {head_commit:?} found lca {lca:?}");

    log::debug!("Reading history from lca to head");
    list_between(repo, &lca, head_commit)
}

pub async fn list_conflicts_between_commits(
    repo: &LocalRepository,
    base_commit: &Commit,
    merge_commit: &Commit,
) -> Result<Vec<PathBuf>, OxenError> {
    let lca = lowest_common_ancestor_from_commits(repo, base_commit, merge_commit)?;

    let merge_commits = MergeCommits {
        lca,
        base: base_commit.clone(),
        merge: merge_commit.clone(),
    };
    let mut _hashes = HashSet::new();
    let analysis =
        find_merge_conflicts(repo, &merge_commits, LocalCheckout::Absent, &mut _hashes).await?;
    Ok(analysis
        .conflicts
        .iter()
        .map(|c| {
            let (_, path) = &c.base_entry;
            path.to_owned()
        })
        .collect())
}

/// Server-side merge: merge a branch into a base branch.
/// Updates the base branch ref on success. Does not modify the working directory or HEAD.
///
/// See the docs for merge_commits for details on how three-way merging works, including definitions
/// of the terms used.
///
/// # Errors
/// - `OxenError::UpstreamMergeConflict` if the branches have conflicting changes.
/// - Other `OxenError` variants for internal failures (missing commits, tree corruption, etc.).
pub async fn merge_into_base(
    repo: &LocalRepository,
    merge_branch: &Branch,
    base_branch: &Branch,
) -> Result<Commit, OxenError> {
    log::debug!("merge_into_base merge {merge_branch} into {base_branch}");

    let base_commit = get_commit_or_head(repo, Some(base_branch.commit_id.clone()))?;
    let merge_commit = get_commit_or_head(repo, Some(merge_branch.commit_id.clone()))?;

    let lca = lowest_common_ancestor_from_commits(repo, &base_commit, &merge_commit)?
        .ok_or_else(|| OxenError::basic_str("Cannot merge branches with no common ancestor"))?;
    log::debug!("merge_into_base base: {base_commit:?} merge: {merge_commit:?} lca: {lca:?}");

    let commits = MergeCommits {
        lca: Some(lca),
        base: base_commit,
        merge: merge_commit,
    };

    let result = if commits.is_already_up_to_date() {
        // Merge branch is an ancestor of base (or equal tips) — base already contains everything
        // from merge. Mirrors `git merge`'s "Already up to date" outcome: no merge commit, base
        // unchanged.
        Ok(commits.base.clone())
    } else if commits.is_fast_forward_merge() {
        Ok(commits.merge)
    } else {
        server_three_way_merge(repo, &commits).await
    };

    if let Ok(ref commit) = result {
        repositories::branches::update(repo, &base_branch.name, &commit.id)?;
    }

    result
}

/// Server-side three-way merge: creates a merge commit from tree data without touching the working
/// directory. Returns `Err(UpstreamMergeConflict)` if there are conflicts.
async fn server_three_way_merge(
    repo: &LocalRepository,
    merge_commits: &MergeCommits,
) -> Result<Commit, OxenError> {
    log::debug!(
        "server_three_way_merge: base commit {} -> merge commit {}",
        merge_commits.base.id,
        merge_commits.merge.id
    );

    // 1. Find conflicts and collect merge entries in a single tree traversal
    let mut shared_hashes = HashSet::new();
    let analysis = find_merge_conflicts(
        repo,
        merge_commits,
        LocalCheckout::Absent,
        &mut shared_hashes,
    )
    .await?;

    if !analysis.conflicts.is_empty() {
        return Err(OxenError::UpstreamMergeConflict(
            format!(
                "Unable to merge {} into {} due to {} conflicts.",
                merge_commits.merge.id,
                merge_commits.base.id,
                analysis.conflicts.len()
            )
            .into(),
        ));
    }

    if analysis.entries.is_empty() {
        // The merge branch contributes no new content on top of base — its changes are already
        // present in base via a separate path (e.g. base squash-replayed the merge branch). Build
        // an empty merge commit (two parents, base's tree) so the merge is preserved in commit
        // history. Mirrors `git merge`'s behavior on a 3-way merge with no content delta.
        log::info!(
            "server_three_way_merge: merge {} contributes no new content to base {}; creating empty merge commit",
            merge_commits.merge.id,
            merge_commits.base.id,
        );
        return create_empty_merge_commit(repo, merge_commits);
    }

    // 2. Build dir_entries HashMap (parent dir -> staged nodes) for the commit writer
    let mut dir_entries: HashMap<PathBuf, Vec<StagedMerkleTreeNode>> = HashMap::new();
    for (path, file_node, status) in &analysis.entries {
        let parent = path.parent().unwrap_or_else(|| Path::new("")).to_path_buf();
        // The commit writer's existing children use full relative paths (e.g. "data/b.txt") as
        // their names, so our staged entries must match.
        let mut named_node = file_node.clone();
        named_node.set_name(path.to_str().unwrap());
        dir_entries
            .entry(parent)
            .or_default()
            .push(StagedMerkleTreeNode {
                status: status.clone(),
                node: MerkleTreeNode::from_file(named_node),
            });
        // Ensure all ancestor directories are present in dir_entries
        let mut ancestor = path.to_path_buf();
        while let Some(p) = ancestor.parent() {
            ancestor = p.to_path_buf();
            dir_entries.entry(ancestor.clone()).or_default();
            if ancestor == Path::new("") {
                break;
            }
        }
    }

    // TODO: This is reading the server's local user config, but we should use the user/email
    // that initiated the merge request. If initiated from the client, the client should send it's
    // local user. If initiated from the hub, the hub should send the user/email of the user who
    // initiated the merge request.
    let cfg = UserConfig::get()?;
    let new_commit = crate::model::NewCommitBody {
        message: merge_commits.commit_message(),
        author: cfg.name.clone(),
        email: cfg.email.clone(),
    };

    let parent_ids = vec![
        merge_commits.base.id.clone(),
        merge_commits.merge.id.clone(),
    ];

    // Pass the base commit ID as the target revision so the existing tree comes from the base
    // commit (revisions::get resolves commit IDs)
    let commit = commit_writer::commit_dir_entries_with_parents(
        repo,
        parent_ids,
        dir_entries,
        &new_commit,
        &merge_commits.base.id,
    )?;

    Ok(commit)
}

/// Client-side merge that alters the local checkout. Merge into the current branch. Returns the
/// merge commit if successful, and None if there are conflicts
pub async fn merge(
    repo: &LocalRepository,
    branch_name: impl AsRef<str>,
) -> Result<Option<Commit>, OxenError> {
    let branch_name = branch_name.as_ref();

    let merge_branch = repositories::branches::get_by_name(repo, branch_name)?;

    let base_commit = repositories::commits::head_commit(repo)?;
    let merge_commit = get_commit_or_head(repo, Some(merge_branch.commit_id.clone()))?;
    let lca = lowest_common_ancestor_from_commits(repo, &base_commit, &merge_commit)?
        .ok_or_else(|| OxenError::basic_str("Cannot merge branches with no common ancestor"))?;
    let commits = MergeCommits {
        lca: Some(lca),
        base: base_commit,
        merge: merge_commit,
    };
    merge_commits(repo, &commits, LocalCheckout::Present { is_resume: false }).await
}

/// Server-safe merge of two commits. Does not touch the working directory or
/// HEAD — the caller is responsible for updating the branch ref.
///
/// Use this variant from server code paths that must not mutate on-disk files.
/// For the client-side equivalent that updates the checkout and HEAD, see
/// [`merge_commit_into_base`].
pub async fn merge_commit_into_base_server_safe(
    repo: &LocalRepository,
    merge_commit: &Commit,
    base_commit: &Commit,
) -> Result<Option<Commit>, OxenError> {
    let maybe_lca = lowest_common_ancestor_from_commits(repo, base_commit, merge_commit)?;
    log::debug!(
        "merge_commit_into_base_server_safe has lca {maybe_lca:?} for merge commit {merge_commit:?} and base {base_commit:?}"
    );

    let commits = MergeCommits {
        lca: maybe_lca,
        base: base_commit.to_owned(),
        merge: merge_commit.to_owned(),
    };

    merge_commits(repo, &commits, LocalCheckout::Absent).await
}

/// Client-side merge of two commits. Updates files on disk and advances HEAD.
///
/// For the server-side equivalent that never touches the working directory,
/// see [`merge_commit_into_base_server_safe`].
pub async fn merge_commit_into_base(
    repo: &LocalRepository,
    merge_commit: &Commit,
    base_commit: &Commit,
) -> Result<Option<Commit>, OxenError> {
    let maybe_lca = lowest_common_ancestor_from_commits(repo, base_commit, merge_commit)?;
    log::debug!(
        "merge_commit_into_base has lca {maybe_lca:?} for merge commit {merge_commit:?} and base {base_commit:?}"
    );

    let commits = MergeCommits {
        lca: maybe_lca,
        base: base_commit.to_owned(),
        merge: merge_commit.to_owned(),
    };

    merge_commits(repo, &commits, LocalCheckout::Present { is_resume: false }).await
}

/// Server-side merge: merge a commit into a base commit on a specific branch.
/// Updates the branch ref on success. Does not modify the working directory or HEAD.
///
/// # Errors
/// - `OxenError::UpstreamMergeConflict` if the branches have conflicting changes.
/// - Other `OxenError` variants for internal failures (missing commits, tree corruption, etc.).
pub async fn merge_commit_into_base_on_branch(
    repo: &LocalRepository,
    merge_commit: &Commit,
    base_commit: &Commit,
    branch: &Branch,
) -> Result<Commit, OxenError> {
    let lca = lowest_common_ancestor_from_commits(repo, base_commit, merge_commit)?
        .ok_or_else(|| OxenError::basic_str("Cannot merge commits with no common ancestor"))?;

    log::debug!(
        "merge_commit_into_branch has lca {lca:?} for merge commit {merge_commit:?} and base {base_commit:?}"
    );

    let merge_commits = MergeCommits {
        lca: Some(lca),
        base: base_commit.to_owned(),
        merge: merge_commit.to_owned(),
    };

    let result = if merge_commits.is_already_up_to_date() {
        // Merge is an ancestor of base (or equal tips) — base already contains everything from
        // merge. Return base unchanged rather than fabricating an empty merge commit via
        // `server_three_way_merge`. Mirrors `git merge`'s "Already up to date" outcome.
        Ok(merge_commits.base.clone())
    } else if merge_commits.is_fast_forward_merge() {
        Ok(merge_commits.merge)
    } else {
        server_three_way_merge(repo, &merge_commits).await
    };

    if let Ok(ref commit) = result {
        repositories::branches::update(repo, &branch.name, &commit.id)?;
    }

    result
}

pub fn has_file(repo: &LocalRepository, path: &Path) -> Result<bool, OxenError> {
    let db_path = db_path(repo);
    log::debug!("Merger::new() DB {db_path:?}");
    let opts = db::key_val::opts::default();
    let merge_db = DB::open(&opts, dunce::simplified(&db_path))?;

    NodeMergeConflictDBReader::has_file(&merge_db, path)
}

pub fn remove_conflict_path(repo: &LocalRepository, path: &Path) -> Result<(), OxenError> {
    let db_path = db_path(repo);
    log::debug!("Merger::new() DB {db_path:?}");
    let opts = db::key_val::opts::default();
    let merge_db = DB::open(&opts, dunce::simplified(&db_path))?;

    let path_str = path.to_str().unwrap();
    let key = path_str.as_bytes();
    merge_db.delete(key)?;
    Ok(())
}

/// Abandon an interrupted or in-conflict merge, restoring the working tree to HEAD
/// and clearing every piece of merge state on disk.
///
/// Returns `OxenError::NoMergeInProgress` if there is nothing to abort.
pub async fn abort_merge(repo: &LocalRepository) -> Result<(), OxenError> {
    let hidden_dir = util::fs::oxen_hidden_dir(&repo.path);
    let merge_head_path = hidden_dir.join(constants::MERGE_HEAD_FILE);
    let orig_head_path = hidden_dir.join(constants::ORIG_HEAD_FILE);
    let conflict_db_path = db_path(repo);

    // Gather the identifier of whatever target the in-progress merge was
    // restoring toward. Either the MERGE_IN_PROGRESS marker (fast-forward
    // mid-restore) or MERGE_HEAD (3-way conflict state) will tell us. If
    // neither is present, there's no merge to abort. (The conflicts DB
    // directory can exist as an empty rocksdb dir left over from a previous
    // merge, so it is not a reliable "in progress" signal on its own.)
    let target_id = match merge_marker::read(repo).await? {
        Some(id) => Some(id),
        None if merge_head_path.exists() => Some(
            util::fs::read_from_path(&merge_head_path)?
                .trim()
                .to_string(),
        ),
        None => None,
    };

    if target_id.is_none() {
        return Err(OxenError::NoMergeInProgress);
    }

    let head_commit = repositories::commits::head_commit(repo)?;

    // If we know what the working tree was being pushed toward, synthesize that
    // as the "from" commit so checkout_commit performs the reverse restore.
    let from_commit = target_id
        .as_deref()
        .and_then(|id| repositories::commits::get_by_id(repo, id).ok().flatten());

    repositories::branches::checkout_commit_from_commit(
        repo,
        &head_commit,
        &from_commit,
        OnConflict::Overwrite,
    )
    .await?;

    merge_marker::clear(repo).await?;
    if merge_head_path.exists() {
        tokio::fs::remove_file(&merge_head_path).await?;
    }
    if orig_head_path.exists() {
        tokio::fs::remove_file(&orig_head_path).await?;
    }
    if conflict_db_path.exists() {
        tokio::fs::remove_dir_all(&conflict_db_path).await?;
    }

    println!("Merge aborted. HEAD is {}", head_commit.id);
    Ok(())
}

pub fn find_merge_commits<S: AsRef<str>>(
    repo: &LocalRepository,
    branch_name: S,
) -> Result<MergeCommits, OxenError> {
    let branch_name = branch_name.as_ref();

    let current_branch = repositories::branches::current_branch(repo)?
        .ok_or_else(|| OxenError::basic_str("No current branch"))?;

    let head_commit =
        repositories::commits::get_commit_or_head(repo, Some(current_branch.name.clone()))?;

    let merge_commit = get_commit_or_head(repo, Some(branch_name))?;

    let lca = lowest_common_ancestor_from_commits(repo, &head_commit, &merge_commit)?;

    Ok(MergeCommits {
        lca,
        base: head_commit,
        merge: merge_commit,
    })
}

/// Check if HEAD is in the direct parent chain of the merge commit. If it is a direct parent, we can just fast forward
pub fn lowest_common_ancestor(
    repo: &LocalRepository,
    branch_name: impl AsRef<str>,
) -> Result<Option<Commit>, OxenError> {
    let branch_name = branch_name.as_ref();
    let current_branch = repositories::branches::current_branch(repo)?
        .ok_or_else(|| OxenError::basic_str("No current branch"))?;

    let base_commit =
        repositories::commits::get_commit_or_head(repo, Some(current_branch.name.clone()))?;
    let merge_commit = repositories::commits::get_commit_or_head(repo, Some(branch_name))?;

    lowest_common_ancestor_from_commits(repo, &base_commit, &merge_commit)
}

/// Fast-forward merge.
///
/// When `update_working_dir` is `true` (client-side), the function checks for
/// uncommitted local changes that would be overwritten, restores/removes files
/// on disk, and advances HEAD.
///
/// When `update_working_dir` is `false` (server-side), no working-directory or
/// HEAD operations are performed — only the merge commit is returned so the
/// caller can update the branch ref.
async fn fast_forward_merge(
    repo: &LocalRepository,
    base_commit: &Commit,
    merge_commit: &Commit,
    checkout: LocalCheckout,
) -> Result<Option<Commit>, OxenError> {
    log::debug!("FF merge!");

    if base_commit == merge_commit {
        // If the base commit is the same as the merge commit, there is nothing to merge
        return Ok(None);
    }

    if !checkout.writes_to_disk() {
        // Server-side: no checkout to update, just return the merge commit.
        return Ok(Some(merge_commit.clone()));
    }

    let is_resume = checkout.is_resume();

    // Collect all dir and vnode hashes while loading the merge tree
    // This is done to identify shared dirs/vnodes between the merge and base trees while loading the base tree
    let mut merge_hashes = HashSet::new();
    let Some(merge_tree) = repositories::tree::get_root_with_children_and_node_hashes(
        repo,
        merge_commit,
        None,
        Some(&mut merge_hashes),
        None,
    )?
    else {
        return Err(OxenError::basic_str("Cannot get root node for base commit"));
    };

    // Collect every shared dir/vnode hash between the trees, load the base tree's unique nodes and collect them as 'partial nodes'
    // These are done to skip checks for shared dirs/vnodes and avoid slow tree traversals when comparing files in the recursvie functions respectively
    let mut shared_hashes = HashSet::new();
    let mut partial_nodes = HashMap::new();
    let Some(base_tree) = repositories::tree::get_root_with_children_and_partial_nodes(
        repo,
        base_commit,
        Some(&merge_hashes),
        None,
        Some(&mut shared_hashes),
        &mut partial_nodes,
    )?
    else {
        return Err(OxenError::basic_str(
            "Cannot get root node for merge commit",
        ));
    };

    // Stop early if there are conflicts
    let (merge_tree_results, seen_files) =
        walk_merge_commit(repo, &merge_tree, &partial_nodes, &shared_hashes, is_resume).await?;

    if !merge_tree_results.cannot_overwrite_entries.is_empty() {
        return Err(OxenError::cannot_overwrite_files(
            &merge_tree_results.cannot_overwrite_entries,
        ));
    }

    let base_tree_results =
        walk_base_dir(repo, &base_tree, &shared_hashes, &seen_files, is_resume).await?;

    // If there are no conflicts, restore the entries
    // Grouping the processing of merge_tree_results and base_tree_results like this ensures no files are modified if the merge doesn't complete
    if base_tree_results.cannot_overwrite_entries.is_empty() {
        // All conflict checks have passed; the next lines mutate the working
        // tree. Write the resume marker *now* so a SIGTERM mid-restore is
        // recoverable, but no marker is left behind when the merge errors out
        // earlier for an unrelated reason.
        merge_marker::write(repo, &merge_commit.id).await?;

        let version_store = repo.version_store()?;
        for entry in merge_tree_results.entries_to_restore.iter() {
            restore::restore_file(repo, &entry.file_node, &entry.path, &version_store).await?;
        }

        // TODO: Make a new struct called 'BaseResults' that's exactly like MergeResults, but with 'entries_to_remove' instead
        for entry in base_tree_results.entries_to_restore.iter() {
            util::fs::remove_file(&entry.path)?;
        }
    } else {
        // If there are conflicts, return an error without removing anything
        return Err(OxenError::cannot_overwrite_files(
            &base_tree_results.cannot_overwrite_entries,
        ));
    }

    // Move the HEAD forward to this commit
    with_ref_manager(repo, |manager| manager.set_head_commit_id(&merge_commit.id))?;

    // Mutation complete; the marker is no longer load-bearing.
    merge_marker::clear(repo).await?;

    Ok(Some(merge_commit.clone()))
}

/// Walk the merge tree and decide, file by file, whether each entry can be safely restored
/// from the version store or whether the working copy looks like a local edit we shouldn't
/// stomp on. Returns the populated `MergeResult` plus the set of paths actually seen on the
/// merge side, which `walk_base_dir` consumes to find files HEAD has but the merge target
/// doesn't (i.e., deletions to apply).
async fn walk_merge_commit<'a>(
    repo: &LocalRepository,
    root: &'a MerkleTreeNode,
    base_files: &HashMap<PathBuf, PartialNode>,
    shared_hashes: &HashSet<MerkleHash>,
    is_resume: bool,
) -> Result<(MergeResult, HashSet<PathBuf>), OxenError> {
    let mut results = MergeResult::new();
    let mut seen_files: HashSet<PathBuf> = HashSet::new();
    let mut stack: Vec<(PathBuf, &'a MerkleTreeNode)> = vec![(PathBuf::new(), root)];

    while let Some((path, node)) = stack.pop() {
        match &node.node {
            EMerkleTreeNode::File(merge_file_node) => {
                let file_path = path.join(merge_file_node.name());
                seen_files.insert(file_path.clone());

                // If file_path is found in the base tree, look up the corresponding PartialNode —
                // the minimal representation needed to decide whether to restore. PartialNodes are
                // keyed by base-tree path so that a file moved between base and merge is correctly
                // detected as "shouldn't restore".
                if is_resume {
                    // Resuming an interrupted merge targeting this same commit: trust the target
                    // and force-restore every file the merge wants, regardless of working-tree state.
                    results.entries_to_restore.push(FileToRestore {
                        file_node: merge_file_node.clone(),
                        path: file_path.clone(),
                    });
                } else if let Some(base_file_node) = base_files.get(&file_path) {
                    if node.hash != base_file_node.hash {
                        let should_restore = restore::should_restore_partial_node(
                            repo,
                            Some(base_file_node.clone()),
                            merge_file_node,
                            &file_path,
                        )
                        .await?;
                        if should_restore {
                            results.entries_to_restore.push(FileToRestore {
                                file_node: merge_file_node.clone(),
                                path: file_path.clone(),
                            });
                        } else {
                            results.cannot_overwrite_entries.push(file_path.clone());
                        }
                    } else {
                        // Merge target matches base for this path — nothing to apply, so a
                        // locally modified working copy is not an overwrite conflict.
                        log::debug!("Merge entry has not changed: {file_path:?}");
                    }
                } else if restore::should_restore_file(repo, None, merge_file_node, &file_path)
                    .await?
                {
                    results.entries_to_restore.push(FileToRestore {
                        file_node: merge_file_node.clone(),
                        path: file_path.clone(),
                    });
                } else {
                    results.cannot_overwrite_entries.push(file_path.clone());
                }
            }
            EMerkleTreeNode::Directory(dir_node) => {
                // Early exit if the directory is the same in the from and target trees.
                if shared_hashes.contains(&node.hash) {
                    continue;
                }
                let dir_path = path.join(dir_node.name());
                // Only enqueue children of vnodes not shared between the trees.
                for vnode in &node.children {
                    if !shared_hashes.contains(&vnode.hash) {
                        for child in &vnode.children {
                            log::debug!("walk_merge_commit child_path {child}");
                            stack.push((dir_path.clone(), child));
                        }
                    }
                }
            }
            EMerkleTreeNode::Commit(_) => {
                // Skip the commit wrapper to its root directory.
                let root_dir = repositories::tree::get_root_dir(node)?;
                stack.push((path, root_dir));
            }
            _ => {
                return Err(OxenError::basic_str(
                    "Got an unexpected node type during checkout",
                ));
            }
        }
    }

    Ok((results, seen_files))
}

/// Walk the base (HEAD) tree to find files that exist in HEAD but not in the merge target —
/// those are deletions the FF merge must apply on disk. Same iterative async depth-first
/// search shape as `walk_merge_commit`. `merge_files` is the set of paths the merge walker
/// visited.
async fn walk_base_dir<'a>(
    repo: &LocalRepository,
    root: &'a MerkleTreeNode,
    shared_hashes: &HashSet<MerkleHash>,
    merge_files: &HashSet<PathBuf>,
    is_resume: bool,
) -> Result<MergeResult, OxenError> {
    let mut results = MergeResult::new();
    let mut stack: Vec<(PathBuf, &'a MerkleTreeNode)> = vec![(PathBuf::new(), root)];

    while let Some((path, node)) = stack.pop() {
        match &node.node {
            EMerkleTreeNode::File(base_file_node) => {
                let file_path = path.join(base_file_node.name());
                // Only consider paths in HEAD that aren't also in the merge tree — those are
                // the deletions to apply.
                if !merge_files.contains(&file_path) {
                    let full_path = repo.path.join(&file_path);
                    if full_path.exists() {
                        if is_resume
                            || restore::should_restore_file(repo, None, base_file_node, &file_path)
                                .await?
                        {
                            results.entries_to_restore.push(FileToRestore {
                                file_node: base_file_node.clone(),
                                path: full_path,
                            });
                        } else {
                            results.cannot_overwrite_entries.push(file_path);
                        }
                    }
                }
            }
            EMerkleTreeNode::Directory(dir_node) => {
                if shared_hashes.contains(&node.hash) {
                    continue;
                }
                let dir_path = path.join(dir_node.name());
                for vnode in &node.children {
                    if !shared_hashes.contains(&vnode.hash) {
                        for child in &vnode.children {
                            stack.push((dir_path.clone(), child));
                        }
                    }
                }
            }
            EMerkleTreeNode::Commit(_) => {
                let root_dir = repositories::tree::get_root_dir(node)?;
                stack.push((path, root_dir));
            }
            _ => {
                log::debug!("walk_base_dir unknown node type");
            }
        }
    }

    Ok(results)
}

/// Perform a merge between commits.
///
/// With `LocalCheckout::Present { .. }` (client-side), working-directory files
/// are checked, restored/removed, and HEAD is advanced.
///
/// With `LocalCheckout::Absent` (server-side), no working-directory or HEAD
/// operations are performed. For three-way merges the server-safe
/// `server_three_way_merge` path is used instead.
async fn merge_commits(
    repo: &LocalRepository,
    merge_commits: &MergeCommits,
    checkout: LocalCheckout,
) -> Result<Option<Commit>, OxenError> {
    // User output
    println!(
        "Merge commits {} -> {}",
        merge_commits.base.id, merge_commits.merge.id
    );

    log::debug!(
        "FOUND MERGE COMMITS:\nLCA: {} -> {}\nBASE: {} -> {}\nMerge: {} -> {}",
        merge_commits.lca.as_ref().map_or("None", |c| c.id.as_str()),
        merge_commits
            .lca
            .as_ref()
            .map_or("None", |c| c.message.as_str()),
        merge_commits.base.id,
        merge_commits.base.message,
        merge_commits.merge.id,
        merge_commits.merge.message,
    );

    // Inspect the resume marker. If a prior mutation was interrupted for this
    // same target, we'll force-restore merge-target files; if it named a
    // different target, abort. The marker itself is written/cleared inside the
    // restore loops (fast_forward_merge / find_merge_conflicts) so that a
    // merge which errors out cleanly on the conflict check never leaves a
    // stale marker behind.
    let checkout = if checkout.writes_to_disk() {
        match merge_marker::read(repo).await? {
            Some(existing) if existing != merge_commits.merge.id => {
                return Err(OxenError::MergeInProgressMismatch {
                    expected: existing,
                    found: merge_commits.merge.id.clone(),
                });
            }
            Some(_) => {
                log::info!(
                    "Resuming interrupted merge for target commit {}",
                    merge_commits.merge.id
                );
                LocalCheckout::Present { is_resume: true }
            }
            None => LocalCheckout::Present { is_resume: false },
        }
    } else {
        LocalCheckout::Absent
    };

    // Check which type of merge we need to do.
    // "Already up to date" must be checked before the fast-forward case: when base == merge both
    // predicates are true, and we want the no-op outcome (return base unchanged) rather than
    // calling `fast_forward_merge`, which would return Ok(None).
    if merge_commits.is_already_up_to_date() {
        // Clear any stale marker for this target left by a prior interrupted attempt at the same
        // merge: the mismatch check above already rejected markers naming a different target, so
        // anything still on disk here is for the very merge we're now declaring done.
        if checkout.writes_to_disk() {
            merge_marker::clear(repo).await?;
        }
        // Merge branch is an ancestor of base (or equal tips) — `git merge`'s "Already up to
        // date" outcome. No merge commit, working tree and HEAD unchanged.
        println!("Already up to date.");
        Ok(Some(merge_commits.base.clone()))
    } else if merge_commits.is_fast_forward_merge() {
        let commit =
            fast_forward_merge(repo, &merge_commits.base, &merge_commits.merge, checkout).await?;
        Ok(commit)
    } else {
        log::debug!(
            "Three way merge! {} -> {}",
            merge_commits.base.id,
            merge_commits.merge.id
        );

        if !checkout.writes_to_disk() {
            // Server-safe: use the server three-way merge path which operates
            // only on tree data and never touches the working directory.
            return server_three_way_merge(repo, merge_commits).await.map(Some);
        }

        let mut shared_hashes = HashSet::new();
        let analysis =
            find_merge_conflicts(repo, merge_commits, checkout, &mut shared_hashes).await?;

        if !analysis.conflicts.is_empty() {
            println!(
                r"
Found {} conflicts, please resolve them before merging.

  oxen checkout --theirs path/to/file_1.txt
  oxen checkout --ours path/to/file_2.txt
  oxen add path/to/file_1.txt path/to/file_2.txt
  oxen commit -m 'Merge conflict resolution'

",
                analysis.conflicts.len()
            );
        }

        log::debug!("Got {} conflicts", analysis.conflicts.len());

        if analysis.conflicts.is_empty() {
            let commit = if analysis.entries.is_empty() {
                // Same case as server_three_way_merge: merge contributes no new content on top of
                // base. Build an empty merge commit aliasing base's tree (matches `git merge`'s
                // 3-way no-delta behavior). Skips the staging path since there's nothing to stage,
                // and advances HEAD afterward since `commit_dir_entries_with_parents` and the
                // empty-merge-commit helper don't update HEAD on their own.
                let commit = create_empty_merge_commit(repo, merge_commits)?;
                with_ref_manager(repo, |manager| manager.set_head_commit_id(&commit.id))?;
                commit
            } else {
                create_merge_commit(repo, merge_commits, shared_hashes).await?
            };
            Ok(Some(commit))
        } else {
            let db_path = db_path(repo);
            log::debug!("Merger::new() DB {db_path:?}");
            let opts = db::key_val::opts::default();
            let merge_db = DB::open(&opts, dunce::simplified(&db_path))?;

            node_merge_conflict_writer::write_conflicts_to_disk(
                repo,
                &merge_db,
                &merge_commits.merge,
                &merge_commits.base,
                &analysis.conflicts,
            )?;
            Ok(None)
        }
    }
}

async fn create_merge_commit(
    repo: &LocalRepository,
    merge_commits: &MergeCommits,
    shared_hashes: HashSet<MerkleHash>,
) -> Result<Commit, OxenError> {
    let base_commit = merge_commits.base.clone();
    add::add_dir_except(repo, &Some(base_commit), repo.path.clone(), shared_hashes).await?;

    let commit_msg = merge_commits.commit_message();
    log::debug!("create_merge_commit {commit_msg}");

    let parent_ids: Vec<String> = vec![
        merge_commits.base.id.to_owned(),
        merge_commits.merge.id.to_owned(),
    ];

    commit_writer::commit_with_cfg(
        repo,
        commit_msg,
        &UserConfig::get()?,
        Some(parent_ids),
        &commit_writer::default_commit_progress_bar(),
    )
}

/// Create an empty merge commit: a commit with two parents whose tree is identical to base's
/// tree. Used when a 3-way merge analysis produces no entries (the merge branch contributes no
/// new content on top of base) but the merge isn't a fast-forward / "already up to date" case
/// either. Mirrors `git merge`'s empty-merge-commit behavior on a 3-way merge with no content
/// delta.
///
/// This aliases base's existing root DirNode by hash — Merkle storage is content-addressed, so
/// the new commit and base share the underlying tree storage. Only a new CommitNode and a copy
/// of base's `dir_hash_db` are written. The caller is responsible for advancing the appropriate
/// ref (server-side: the base branch ref via `merge_into_base`; client-side: HEAD via the
/// `with_ref_manager` helper).
fn create_empty_merge_commit(
    repo: &LocalRepository,
    merge_commits: &MergeCommits,
) -> Result<Commit, OxenError> {
    let cfg = UserConfig::get()?;
    let timestamp = time::OffsetDateTime::now_utc();
    let new_commit_data = NewCommit {
        parent_ids: vec![
            merge_commits.base.id.clone(),
            merge_commits.merge.id.clone(),
        ],
        message: merge_commits.commit_message(),
        author: cfg.name.clone(),
        email: cfg.email.clone(),
        timestamp,
    };
    let commit_id = commit_writer::compute_commit_id(&new_commit_data)?;

    let base_commit_hash: MerkleHash = merge_commits.base.id.parse()?;
    let merge_commit_hash: MerkleHash = merge_commits.merge.id.parse()?;

    let commit_node = CommitNode::new(
        repo,
        CommitNodeOpts {
            hash: commit_id,
            // CommitNode.parent_ids holds parent commit hashes (matches `create_empty_commit`
            // in core/v_latest/commits.rs).
            parent_ids: vec![base_commit_hash, merge_commit_hash],
            email: cfg.email.clone(),
            author: cfg.name.clone(),
            message: merge_commits.commit_message(),
            timestamp,
        },
    )?;

    // Open the new commit's MerkleNodeDB and add base's existing root DirNode as the only child.
    // Tree is content-addressed, so the new commit's tree equals base's tree without any tree
    // rebuild or copy.
    let base_node = repositories::tree::get_node_by_id_with_children(repo, &base_commit_hash)?
        .ok_or_else(|| {
            OxenError::resource_not_found(format!(
                "Merkle tree node not found for base commit '{}'",
                merge_commits.base.id
            ))
        })?;
    let mut commit_db =
        MerkleNodeDB::open_read_write(&repo.path, &commit_node, Some(base_node.hash))?;
    let root_dir = base_node
        .children
        .first()
        .expect("base commit must have a root dir as its first child")
        .dir()?;
    commit_db.add_child(&root_dir)?;

    // Copy base's path -> dir hash mapping so path-based tree lookups work for the new commit.
    repositories::tree::cp_dir_hashes_to(repo, &base_commit_hash, commit_node.hash())?;

    Ok(commit_node.to_commit())
}

pub fn lowest_common_ancestor_from_commits(
    repo: &LocalRepository,
    base_commit: &Commit,
    merge_commit: &Commit,
) -> Result<Option<Commit>, OxenError> {
    log::debug!(
        "lowest_common_ancestor_from_commits: base: {} merge: {}",
        base_commit.id,
        merge_commit.id
    );
    // Traverse the base commit back to start, keeping map of Commit -> Depth(int)
    let commit_depths_from_head =
        repositories::commits::list_from_with_depth(repo, base_commit.id.as_str())?;

    // Traverse the merge commit back
    //   check at each step if ID is in the HEAD commit history
    //   The lowest Depth Commit in HEAD should be the LCA
    let commit_depths_from_merge =
        repositories::commits::list_from_with_depth(repo, merge_commit.id.as_str())?;

    // If the commits have no common ancestor (new repository), return None
    let mut has_common_ancestor = false;
    let mut min_depth = usize::MAX;
    let mut lca: Commit = commit_depths_from_head.keys().next().unwrap().clone();
    for (commit, _) in commit_depths_from_merge.iter() {
        if let Some(depth) = commit_depths_from_head.get(commit) {
            has_common_ancestor = true;

            if depth < &min_depth {
                min_depth = *depth;
                log::debug!("setting new lca, {commit:?}");
                lca = commit.clone();
            }
        }
    }

    if has_common_ancestor {
        Ok(Some(lca))
    } else {
        Ok(None)
    }
}

/// Analyze a three-way merge between commits. Always returns conflicts and files changed on the
/// merge branch. When `write_to_disk` is true, also restores non-conflicting merge files to the
/// working directory (for client-side merges).
pub async fn find_merge_conflicts(
    repo: &LocalRepository,
    merge_commits: &MergeCommits,
    checkout: LocalCheckout,
    shared_hashes: &mut HashSet<MerkleHash>,
) -> Result<MergeConflictAnalysis, OxenError> {
    log::debug!("finding merge conflicts");
    let write_to_disk = checkout.writes_to_disk();
    let is_resume = checkout.is_resume();
    /*
    https://en.wikipedia.org/wiki/Merge_(version_control)#Three-way_merge

    C = LCA
    A = Base
    B = Merge
    D = Resulting merge commit

    C - A - D
      \   /
        B

    The three-way merge looks for sections which are the same in only two of the three files.
    In this case, there are two versions of the section,
        and the version which is in the common ancestor "C" is discarded,
        while the version that differs is preserved in the output.
    If "A" and "B" agree, that is what appears in the output.
    A section that is the same in "A" and "C" outputs the changed version in "B",
    and likewise a section that is the same in "B" and "C" outputs the version in "A".

    Sections that are different in all three files are marked as a conflict situation and left for the user to resolve.
    */

    use StagedEntryStatus::*;

    let mut conflicts: Vec<NodeMergeConflict> = vec![];
    let mut entries: Vec<(PathBuf, FileNode, StagedEntryStatus)> = vec![];
    let mut entries_to_restore: Vec<FileToRestore> = vec![];
    let mut cannot_overwrite_entries: Vec<PathBuf> = vec![];

    // Read all the entries from each commit into sets we can compare to one another
    let mut lca_hashes = HashSet::new();
    let mut base_hashes = HashSet::new();

    // Load in every node from the LCA tree (if there is one)
    let lca_commit_tree = if let Some(lca) = &merge_commits.lca {
        repositories::tree::get_root_with_children_and_node_hashes(
            repo,
            lca,
            None,
            Some(&mut lca_hashes),
            None,
        )?
    } else {
        None
    };

    // Then, load in only the nodes of the base commit tree that weren't in the LCA tree
    let base_commit_tree = repositories::tree::get_root_with_children_and_node_hashes(
        repo,
        &merge_commits.base,
        Some(&lca_hashes),
        Some(&mut base_hashes),
        Some(shared_hashes),
    )?
    .unwrap();

    // Then, we load in only the nodes of the merge tree that weren't in the Base tree (or the LCA tree)
    // After this, 'shared hashes' will have all the dir/vnode hashes shared between all 3 trees
    // This lets us skip checking these nodes, and also lets us skip adding them when creating the merge commit later

    let merge_commit_tree = repositories::tree::get_root_with_children_and_node_hashes(
        repo,
        &merge_commits.merge,
        Some(&base_hashes),
        None,
        Some(shared_hashes),
    )?
    .unwrap();

    // Note: Remove this unless debugging
    // log::debug!("lca_hashes: {lca_hashes:?}");
    //lca_commit_tree.print();
    // log::debug!("base_hashes: {base_hashes:?}");
    //base_commit_tree.print();
    // log::debug!("merge_hashes: {merge_hashes:?}");

    let starting_path = PathBuf::from("");

    // Walk the full LCA tree without the shared_hashes filter. The LCA tree is fully loaded
    // (no exclusions), and pruning it would hide files in directories shared between LCA and
    // base — causing missed deletions and incorrect "not in LCA" classifications.
    let no_filter = HashSet::new();
    let lca_entries = if let Some(lca_tree) = &lca_commit_tree {
        repositories::tree::unique_dir_entries(&starting_path, lca_tree, &no_filter)?
    } else {
        HashMap::new()
    };
    let base_entries =
        repositories::tree::unique_dir_entries(&starting_path, &base_commit_tree, shared_hashes)?;
    let merge_tree_entries =
        repositories::tree::unique_dir_entries(&starting_path, &merge_commit_tree, shared_hashes)?;

    log::debug!("lca_entries.len() {}", lca_entries.len());
    log::debug!("base_entries.len() {}", base_entries.len());
    log::debug!("merge_tree_entries.len() {}", merge_tree_entries.len());

    // Check all the entries in the candidate merge
    for (entry_path, merge_file_node) in &merge_tree_entries {
        // log::debug!("Considering entry {}", entry_path.to_string_lossy());
        // Check if the entry exists in all 3 commits
        if let Some(base_file_node) = base_entries.get(entry_path) {
            if let Some(lca_file_node) = lca_entries.get(entry_path) {
                // File exists in all three trees
                let base_eq_lca = base_file_node.combined_hash() == lca_file_node.combined_hash();
                let base_eq_merge =
                    base_file_node.combined_hash() == merge_file_node.combined_hash();

                if base_eq_lca && !base_eq_merge {
                    // Changed only on merge branch — include in merge result
                    entries.push((entry_path.clone(), merge_file_node.clone(), Modified));
                    if write_to_disk {
                        if is_resume
                            || restore::should_restore_file(
                                repo,
                                Some(base_file_node.clone()),
                                merge_file_node,
                                entry_path,
                            )
                            .await?
                        {
                            entries_to_restore.push(FileToRestore {
                                file_node: merge_file_node.clone(),
                                path: entry_path.clone(),
                            });
                        } else {
                            cannot_overwrite_entries.push(entry_path.clone());
                        }
                    }
                }

                // If all three are different, mark as conflict
                let lca_eq_merge = lca_file_node.combined_hash() == merge_file_node.combined_hash();
                if !base_eq_lca && !lca_eq_merge && !base_eq_merge {
                    conflicts.push(NodeMergeConflict {
                        lca_entry: (lca_file_node.to_owned(), entry_path.to_path_buf()),
                        base_entry: (base_file_node.to_owned(), entry_path.to_path_buf()),
                        merge_entry: (merge_file_node.to_owned(), entry_path.to_path_buf()),
                    });
                }
            } else {
                // File in base and merge but not LCA — conflict if different
                if base_file_node.combined_hash() != merge_file_node.combined_hash() {
                    conflicts.push(NodeMergeConflict {
                        lca_entry: (base_file_node.to_owned(), entry_path.to_path_buf()),
                        base_entry: (base_file_node.to_owned(), entry_path.to_path_buf()),
                        merge_entry: (merge_file_node.to_owned(), entry_path.to_path_buf()),
                    });
                }
            }
        } else {
            // File in merge_tree_entries but not in base_entries. This could mean:
            // (a) base actually deleted it, or
            // (b) the file's parent dir was shared between LCA and base, so
            //     unique_dir_entries skipped it — the file still exists in base.
            // Look up the file in the full base commit tree (not the optimized one which
            // skips shared directories).
            let base_file_node =
                repositories::tree::get_file_by_path(repo, &merge_commits.base, entry_path)?;

            if base_file_node.is_some() {
                // Case (b): file exists in base via a shared directory.
                // It's changed on merge but not on base — include merge version.
                entries.push((entry_path.clone(), merge_file_node.clone(), Modified));
                if write_to_disk {
                    // Remove the parent dir from shared_hashes so add_dir_except will
                    // scan this directory when creating the merge commit.
                    if let Some(parent) = entry_path.parent()
                        && let Some(lca_tree) = &lca_commit_tree
                        && let Some(dir_node) = lca_tree.get_by_path(parent)?
                    {
                        shared_hashes.remove(&dir_node.hash);
                    }

                    if is_resume
                        || restore::should_restore_file(
                            repo,
                            base_file_node,
                            merge_file_node,
                            entry_path,
                        )
                        .await?
                    {
                        entries_to_restore.push(FileToRestore {
                            file_node: merge_file_node.clone(),
                            path: entry_path.clone(),
                        });
                    } else {
                        cannot_overwrite_entries.push(entry_path.clone());
                    }
                }
            } else if let Some(lca_node) = lca_entries.get(entry_path) {
                // Case (a): file was in LCA, base deleted it.
                if lca_node.combined_hash() != merge_file_node.combined_hash() {
                    // Merge modified it and base deleted it — conflict
                    conflicts.push(NodeMergeConflict {
                        lca_entry: (lca_node.to_owned(), entry_path.to_path_buf()),
                        base_entry: (lca_node.to_owned(), entry_path.to_path_buf()),
                        merge_entry: (merge_file_node.to_owned(), entry_path.to_path_buf()),
                    });
                }
                // If merge didn't change it from LCA, base's deletion wins
            } else {
                // Truly new file on merge branch
                entries.push((entry_path.clone(), merge_file_node.clone(), Added));
                if write_to_disk {
                    if is_resume
                        || restore::should_restore_file(repo, None, merge_file_node, entry_path)
                            .await?
                    {
                        entries_to_restore.push(FileToRestore {
                            file_node: merge_file_node.clone(),
                            path: entry_path.to_path_buf(),
                        });
                    } else {
                        cannot_overwrite_entries.push(entry_path.clone());
                    }
                }
            }
        }
    }

    // Detect deletions: files in the LCA that are absent from the merge tree. `base_entries` is
    // pruned (shared dirs skipped), so we use `get_file_by_path` for base lookups.
    for (entry_path, lca_file_node) in &lca_entries {
        if merge_tree_entries.contains_key(entry_path)
            || repositories::tree::get_file_by_path(repo, &merge_commits.merge, entry_path)?
                .is_some()
        {
            continue;
        }
        // File is in LCA but absent from merge — check base to decide delete vs conflict.
        let base_file_node =
            repositories::tree::get_file_by_path(repo, &merge_commits.base, entry_path)?;
        if let Some(base_file_node) = base_file_node {
            if base_file_node.combined_hash() == lca_file_node.combined_hash() {
                // Unchanged on base, deleted on merge — delete it
                entries.push((entry_path.clone(), lca_file_node.clone(), Removed));
            } else {
                // Base modified it, merge deleted it — conflict
                conflicts.push(NodeMergeConflict {
                    lca_entry: (lca_file_node.to_owned(), entry_path.to_path_buf()),
                    base_entry: (base_file_node.to_owned(), entry_path.to_path_buf()),
                    merge_entry: (lca_file_node.to_owned(), entry_path.to_path_buf()),
                });
            }
        }
    }

    log::debug!("three_way_merge conflicts.len() {}", conflicts.len());

    // If there are no conflicts, restore the entries
    if cannot_overwrite_entries.is_empty() {
        // Working-tree mutation starts here. Bracket it with the resume marker
        // so a SIGTERM mid-restore can be recovered by a subsequent merge
        // against the same target. Only when write_to_disk=true — the
        // server-safe paths leave the working tree untouched.
        if write_to_disk {
            merge_marker::write(repo, &merge_commits.merge.id).await?;
        }

        let version_store = repo.version_store()?;
        for entry in entries_to_restore.iter() {
            restore::restore_file(repo, &entry.file_node, &entry.path, &version_store).await?;

            // If it's a tabular file, we also need to stage the schema
            if util::fs::is_tabular(&entry.path)
                && let Some(schema) = repositories::data_frames::schemas::get_by_path(
                    repo,
                    &merge_commits.merge,
                    &entry.path,
                )?
            {
                for field in schema.fields {
                    if let Some(metadata) = field.metadata {
                        let _ = repositories::data_frames::schemas::add_column_metadata(
                            repo,
                            &entry.path,
                            &field.name,
                            &metadata,
                        )?;
                    }
                }

                if let Some(metadata) = schema.metadata {
                    let _ = repositories::data_frames::schemas::add_schema_metadata(
                        repo,
                        &entry.path,
                        &metadata,
                    )?;
                }
            }
        }

        if write_to_disk {
            merge_marker::clear(repo).await?;
        }
    } else {
        // If there are conflicts, return an error without restoring anything
        return Err(OxenError::cannot_overwrite_files(&cannot_overwrite_entries));
    }

    Ok(MergeConflictAnalysis { conflicts, entries })
}