Skip to main content

nusy_arrow_git/
merge.rs

1//! Merge — 3-way merge with conflict detection.
2//!
3//! Finds common ancestor, computes diffs from both branches,
4//! detects conflicts (same subject+predicate with different objects),
5//! and produces either a clean merge or a conflict report.
6
7use crate::checkout;
8use crate::commit::{Commit, CommitError, CommitsTable, create_commit};
9use crate::diff::{self, DiffEntry};
10use crate::history::find_common_ancestor;
11use crate::object_store::GitObjectStore;
12use nusy_arrow_core::{Namespace, Triple, YLayer, col};
13use std::collections::{HashMap, HashSet};
14
15/// A merge conflict: same (subject, predicate, namespace) with different objects.
16#[derive(Debug, Clone)]
17pub struct Conflict {
18    pub subject: String,
19    pub predicate: String,
20    pub namespace: String,
21    /// The object value from branch A.
22    pub object_a: String,
23    /// The object value from branch B.
24    pub object_b: String,
25}
26
27/// Result of a merge operation.
28#[derive(Debug)]
29pub enum MergeResult {
30    /// Clean merge — all changes applied, new commit created.
31    Clean(Commit),
32    /// Conflicts detected — manual resolution needed.
33    Conflict(Vec<Conflict>),
34    /// No common ancestor found (disconnected histories).
35    NoCommonAncestor,
36}
37
38/// Errors from merge operations.
39#[derive(Debug, thiserror::Error)]
40pub enum MergeError {
41    #[error("Commit error: {0}")]
42    Commit(#[from] CommitError),
43
44    #[error("Store error: {0}")]
45    Store(#[from] nusy_arrow_core::StoreError),
46}
47
48/// How to resolve a single conflict.
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum Resolution {
51    /// Keep the value from branch A (current HEAD).
52    KeepOurs,
53    /// Keep the value from branch B (incoming).
54    KeepTheirs,
55    /// Keep both values as separate triples.
56    KeepBoth,
57    /// Drop both values (neither survives the merge).
58    Drop,
59}
60
61/// Strategy for automatic conflict resolution during merge.
62pub enum MergeStrategy {
63    /// Default: return `MergeResult::Conflict` for manual resolution.
64    Manual,
65    /// Keep branch A's value for all conflicts.
66    Ours,
67    /// Keep branch B's value for all conflicts.
68    Theirs,
69    /// Compare timestamps, keep the newer value.
70    LastWriterWins,
71    /// Caller-defined logic: receives a `&Conflict` and returns a `Resolution`.
72    Custom(Box<dyn Fn(&Conflict) -> Resolution>),
73}
74
75/// Key for conflict detection: (subject, predicate, namespace).
76#[derive(Debug, Clone, PartialEq, Eq, Hash)]
77struct ConflictKey {
78    subject: String,
79    predicate: String,
80    namespace: String,
81}
82
83/// Perform a 3-way merge between two commits.
84///
85/// 1. Find common ancestor
86/// 2. Diff ancestor→A and ancestor→B
87/// 3. Detect conflicts (same subject+predicate, different objects)
88/// 4. If no conflicts, apply both diffs and create merge commit
89pub fn merge(
90    obj_store: &mut GitObjectStore,
91    commits_table: &mut CommitsTable,
92    commit_a_id: &str,
93    commit_b_id: &str,
94    author: &str,
95) -> Result<MergeResult, MergeError> {
96    // Find common ancestor
97    let ancestor = match find_common_ancestor(commits_table, commit_a_id, commit_b_id) {
98        Some(a) => a.commit_id.clone(),
99        None => return Ok(MergeResult::NoCommonAncestor),
100    };
101
102    // Diff ancestor→A
103    let diff_a = diff::diff(obj_store, commits_table, &ancestor, commit_a_id)?;
104    // Diff ancestor→B
105    let diff_b = diff::diff(obj_store, commits_table, &ancestor, commit_b_id)?;
106
107    // Check for conflicts: additions in both with same (subject, predicate, namespace) but different object
108    let a_adds: HashMap<ConflictKey, &DiffEntry> = diff_a
109        .added
110        .iter()
111        .map(|e| {
112            (
113                ConflictKey {
114                    subject: e.subject.clone(),
115                    predicate: e.predicate.clone(),
116                    namespace: e.namespace.clone(),
117                },
118                e,
119            )
120        })
121        .collect();
122
123    let mut conflicts = Vec::new();
124    for entry_b in &diff_b.added {
125        let key = ConflictKey {
126            subject: entry_b.subject.clone(),
127            predicate: entry_b.predicate.clone(),
128            namespace: entry_b.namespace.clone(),
129        };
130        if let Some(entry_a) = a_adds.get(&key)
131            && entry_a.object != entry_b.object
132        {
133            conflicts.push(Conflict {
134                subject: key.subject,
135                predicate: key.predicate,
136                namespace: key.namespace,
137                object_a: entry_a.object.clone(),
138                object_b: entry_b.object.clone(),
139            });
140        }
141    }
142
143    if !conflicts.is_empty() {
144        return Ok(MergeResult::Conflict(conflicts));
145    }
146
147    // No conflicts — delegate to shared clean merge path
148    apply_clean_merge(
149        obj_store,
150        commits_table,
151        &ancestor,
152        commit_a_id,
153        commit_b_id,
154        &diff_a,
155        &diff_b,
156        author,
157    )
158}
159
160/// Perform a 3-way merge with automatic conflict resolution.
161///
162/// Like [`merge()`], but applies a [`MergeStrategy`] when conflicts are detected.
163/// With `MergeStrategy::Manual`, this behaves identically to `merge()`.
164pub fn merge_with_strategy(
165    obj_store: &mut GitObjectStore,
166    commits_table: &mut CommitsTable,
167    commit_a_id: &str,
168    commit_b_id: &str,
169    author: &str,
170    strategy: &MergeStrategy,
171) -> Result<MergeResult, MergeError> {
172    // Find common ancestor
173    let ancestor = match find_common_ancestor(commits_table, commit_a_id, commit_b_id) {
174        Some(a) => a.commit_id.clone(),
175        None => return Ok(MergeResult::NoCommonAncestor),
176    };
177
178    // Diff ancestor→A and ancestor→B
179    let diff_a = diff::diff(obj_store, commits_table, &ancestor, commit_a_id)?;
180    let diff_b = diff::diff(obj_store, commits_table, &ancestor, commit_b_id)?;
181
182    // Detect conflicts
183    let a_adds: HashMap<ConflictKey, &DiffEntry> = diff_a
184        .added
185        .iter()
186        .map(|e| {
187            (
188                ConflictKey {
189                    subject: e.subject.clone(),
190                    predicate: e.predicate.clone(),
191                    namespace: e.namespace.clone(),
192                },
193                e,
194            )
195        })
196        .collect();
197
198    let b_adds: HashMap<ConflictKey, &DiffEntry> = diff_b
199        .added
200        .iter()
201        .map(|e| {
202            (
203                ConflictKey {
204                    subject: e.subject.clone(),
205                    predicate: e.predicate.clone(),
206                    namespace: e.namespace.clone(),
207                },
208                e,
209            )
210        })
211        .collect();
212
213    let mut conflicts = Vec::new();
214    for (key, entry_b) in &b_adds {
215        if let Some(entry_a) = a_adds.get(key)
216            && entry_a.object != entry_b.object
217        {
218            conflicts.push(Conflict {
219                subject: key.subject.clone(),
220                predicate: key.predicate.clone(),
221                namespace: key.namespace.clone(),
222                object_a: entry_a.object.clone(),
223                object_b: entry_b.object.clone(),
224            });
225        }
226    }
227
228    // If no conflicts, delegate to the clean merge path
229    if conflicts.is_empty() {
230        return apply_clean_merge(
231            obj_store,
232            commits_table,
233            &ancestor,
234            commit_a_id,
235            commit_b_id,
236            &diff_a,
237            &diff_b,
238            author,
239        );
240    }
241
242    // Apply strategy
243    if matches!(strategy, MergeStrategy::Manual) {
244        return Ok(MergeResult::Conflict(conflicts));
245    }
246
247    // Resolve each conflict according to strategy
248    let mut resolved_keys: HashMap<ConflictKey, Resolution> = HashMap::new();
249    for conflict in &conflicts {
250        let resolution = match strategy {
251            MergeStrategy::Manual => unreachable!(),
252            MergeStrategy::Ours => Resolution::KeepOurs,
253            MergeStrategy::Theirs => Resolution::KeepTheirs,
254            MergeStrategy::LastWriterWins => {
255                // Compare consolidated_at timestamps from the diff entries
256                let key = ConflictKey {
257                    subject: conflict.subject.clone(),
258                    predicate: conflict.predicate.clone(),
259                    namespace: conflict.namespace.clone(),
260                };
261                let ts_a = a_adds
262                    .get(&key)
263                    .and_then(|e| e.consolidated_at)
264                    .unwrap_or(0);
265                let ts_b = b_adds
266                    .get(&key)
267                    .and_then(|e| e.consolidated_at)
268                    .unwrap_or(0);
269                if ts_a >= ts_b {
270                    Resolution::KeepOurs
271                } else {
272                    Resolution::KeepTheirs
273                }
274            }
275            MergeStrategy::Custom(f) => f(conflict),
276        };
277        resolved_keys.insert(
278            ConflictKey {
279                subject: conflict.subject.clone(),
280                predicate: conflict.predicate.clone(),
281                namespace: conflict.namespace.clone(),
282            },
283            resolution,
284        );
285    }
286
287    // Start from ancestor state
288    checkout::checkout(obj_store, commits_table, &ancestor)?;
289
290    // Build the set of additions, applying resolutions to conflicts
291    let mut all_adds: HashMap<(String, String, String, String), &DiffEntry> = HashMap::new();
292
293    // First add all non-conflicting additions from both branches
294    for entry in diff_a.added.iter().chain(diff_b.added.iter()) {
295        let conflict_key = ConflictKey {
296            subject: entry.subject.clone(),
297            predicate: entry.predicate.clone(),
298            namespace: entry.namespace.clone(),
299        };
300
301        if let Some(resolution) = resolved_keys.get(&conflict_key) {
302            // This is a conflicted key — handle based on resolution
303            let spo_key = (
304                entry.subject.clone(),
305                entry.predicate.clone(),
306                entry.object.clone(),
307                entry.namespace.clone(),
308            );
309            match resolution {
310                Resolution::KeepOurs => {
311                    if a_adds.contains_key(&conflict_key)
312                        && a_adds.get(&conflict_key).map(|e| &e.object) == Some(&entry.object)
313                    {
314                        all_adds.insert(spo_key, entry);
315                    }
316                }
317                Resolution::KeepTheirs => {
318                    if b_adds.contains_key(&conflict_key)
319                        && b_adds.get(&conflict_key).map(|e| &e.object) == Some(&entry.object)
320                    {
321                        all_adds.insert(spo_key, entry);
322                    }
323                }
324                Resolution::KeepBoth => {
325                    all_adds.insert(spo_key, entry);
326                }
327                Resolution::Drop => {
328                    // Skip — don't add either
329                }
330            }
331        } else {
332            // Non-conflicting — add as normal (dedup by spo key, first wins)
333            let key = (
334                entry.subject.clone(),
335                entry.predicate.clone(),
336                entry.object.clone(),
337                entry.namespace.clone(),
338            );
339            all_adds.entry(key).or_insert(entry);
340        }
341    }
342
343    // Apply additions to store
344    for entry in all_adds.values() {
345        let ns = Namespace::from_str_loose(&entry.namespace).unwrap_or(Namespace::World);
346        let y_layer = YLayer::from_u8(entry.y_layer).unwrap_or(YLayer::Semantic);
347        let triple = Triple {
348            subject: entry.subject.clone(),
349            predicate: entry.predicate.clone(),
350            object: entry.object.clone(),
351            graph: entry.graph.clone(),
352            confidence: entry.confidence,
353            source_document: entry.source_document.clone(),
354            source_chunk_id: entry.source_chunk_id.clone(),
355            extracted_by: Some(format!("merge by {author}")),
356            caused_by: entry.caused_by.clone(),
357            derived_from: entry.derived_from.clone(),
358            consolidated_at: entry.consolidated_at,
359            certifiability_class: entry.certifiability_class.clone(),
360        };
361        obj_store.store.add_triple(&triple, ns, y_layer)?;
362    }
363
364    // Apply removals from both diffs
365    apply_removals(obj_store, &diff_a, &diff_b);
366
367    // Create merge commit
368    let merge_commit = create_commit(
369        obj_store,
370        commits_table,
371        vec![commit_a_id.to_string(), commit_b_id.to_string()],
372        &format!("Merge {} into {} (resolved)", commit_b_id, commit_a_id),
373        author,
374    )?;
375
376    Ok(MergeResult::Clean(merge_commit))
377}
378
379/// Apply the clean merge path (shared by `merge` and `merge_with_strategy`).
380#[allow(clippy::too_many_arguments)]
381fn apply_clean_merge(
382    obj_store: &mut GitObjectStore,
383    commits_table: &mut CommitsTable,
384    ancestor: &str,
385    commit_a_id: &str,
386    commit_b_id: &str,
387    diff_a: &diff::DiffResult,
388    diff_b: &diff::DiffResult,
389    author: &str,
390) -> Result<MergeResult, MergeError> {
391    checkout::checkout(obj_store, commits_table, ancestor)?;
392
393    let mut all_adds: HashMap<(String, String, String, String), &DiffEntry> = HashMap::new();
394    for entry in diff_a.added.iter().chain(diff_b.added.iter()) {
395        let key = (
396            entry.subject.clone(),
397            entry.predicate.clone(),
398            entry.object.clone(),
399            entry.namespace.clone(),
400        );
401        all_adds.entry(key).or_insert(entry);
402    }
403
404    for entry in all_adds.values() {
405        let ns = Namespace::from_str_loose(&entry.namespace).unwrap_or(Namespace::World);
406        let y_layer = YLayer::from_u8(entry.y_layer).unwrap_or(YLayer::Semantic);
407        let triple = Triple {
408            subject: entry.subject.clone(),
409            predicate: entry.predicate.clone(),
410            object: entry.object.clone(),
411            graph: entry.graph.clone(),
412            confidence: entry.confidence,
413            source_document: entry.source_document.clone(),
414            source_chunk_id: entry.source_chunk_id.clone(),
415            extracted_by: Some(format!("merge by {author}")),
416            caused_by: entry.caused_by.clone(),
417            derived_from: entry.derived_from.clone(),
418            consolidated_at: entry.consolidated_at,
419            certifiability_class: entry.certifiability_class.clone(),
420        };
421        obj_store.store.add_triple(&triple, ns, y_layer)?;
422    }
423
424    apply_removals(obj_store, diff_a, diff_b);
425
426    let merge_commit = create_commit(
427        obj_store,
428        commits_table,
429        vec![commit_a_id.to_string(), commit_b_id.to_string()],
430        &format!("Merge {} into {}", commit_b_id, commit_a_id),
431        author,
432    )?;
433
434    Ok(MergeResult::Clean(merge_commit))
435}
436
437/// Apply removals from both diffs to the current store state.
438fn apply_removals(
439    obj_store: &mut GitObjectStore,
440    diff_a: &diff::DiffResult,
441    diff_b: &diff::DiffResult,
442) {
443    let all_removals: HashSet<(String, String, String, String)> = diff_a
444        .removed
445        .iter()
446        .chain(diff_b.removed.iter())
447        .map(|e| {
448            (
449                e.subject.clone(),
450                e.predicate.clone(),
451                e.object.clone(),
452                e.namespace.clone(),
453            )
454        })
455        .collect();
456
457    for ns in Namespace::ALL {
458        let batches = obj_store.store.get_namespace_batches(ns);
459        let ns_str = ns.as_str().to_string();
460
461        let mut ids_to_delete = Vec::new();
462        for batch in batches {
463            let id_col = batch
464                .column(col::TRIPLE_ID)
465                .as_any()
466                .downcast_ref::<arrow::array::StringArray>()
467                .expect("triple_id column");
468            let subj_col = batch
469                .column(col::SUBJECT)
470                .as_any()
471                .downcast_ref::<arrow::array::StringArray>()
472                .expect("subject column");
473            let pred_col = batch
474                .column(col::PREDICATE)
475                .as_any()
476                .downcast_ref::<arrow::array::StringArray>()
477                .expect("predicate column");
478            let obj_col = batch
479                .column(col::OBJECT)
480                .as_any()
481                .downcast_ref::<arrow::array::StringArray>()
482                .expect("object column");
483
484            for i in 0..batch.num_rows() {
485                let key = (
486                    subj_col.value(i).to_string(),
487                    pred_col.value(i).to_string(),
488                    obj_col.value(i).to_string(),
489                    ns_str.clone(),
490                );
491                if all_removals.contains(&key) {
492                    ids_to_delete.push(id_col.value(i).to_string());
493                }
494            }
495        }
496
497        for id in &ids_to_delete {
498            let _ = obj_store.store.delete(id);
499        }
500    }
501}
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506    use crate::commit::create_commit;
507
508    fn sample_triple(subj: &str, obj: &str) -> Triple {
509        Triple {
510            subject: subj.to_string(),
511            predicate: "rdf:type".to_string(),
512            object: obj.to_string(),
513            graph: None,
514            confidence: Some(0.9),
515            source_document: None,
516            source_chunk_id: None,
517            extracted_by: None,
518            caused_by: None,
519            derived_from: None,
520            consolidated_at: None,
521            certifiability_class: None,
522        }
523    }
524
525    #[test]
526    fn test_non_conflicting_merge() {
527        let tmp = tempfile::tempdir().unwrap();
528        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
529        let mut commits = CommitsTable::new();
530
531        // Create base commit
532        obj.store
533            .add_triple(
534                &sample_triple("base", "Base"),
535                Namespace::World,
536                YLayer::Semantic,
537            )
538            .unwrap();
539        let base = create_commit(&obj, &mut commits, vec![], "base", "DGX").unwrap();
540
541        // Branch A: add triple A
542        obj.store
543            .add_triple(
544                &sample_triple("a-only", "A"),
545                Namespace::World,
546                YLayer::Semantic,
547            )
548            .unwrap();
549        let ca = create_commit(
550            &obj,
551            &mut commits,
552            vec![base.commit_id.clone()],
553            "branch-a",
554            "DGX",
555        )
556        .unwrap();
557
558        // Checkout base, add triple B
559        checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
560        obj.store
561            .add_triple(
562                &sample_triple("b-only", "B"),
563                Namespace::World,
564                YLayer::Semantic,
565            )
566            .unwrap();
567        let cb = create_commit(
568            &obj,
569            &mut commits,
570            vec![base.commit_id.clone()],
571            "branch-b",
572            "DGX",
573        )
574        .unwrap();
575
576        // Merge
577        let result = merge(&mut obj, &mut commits, &ca.commit_id, &cb.commit_id, "DGX").unwrap();
578
579        match result {
580            MergeResult::Clean(mc) => {
581                assert_eq!(mc.parent_ids.len(), 2);
582                // After merge, store should have base + a-only + b-only
583                assert!(obj.store.len() >= 3);
584            }
585            MergeResult::Conflict(_) => panic!("Expected clean merge"),
586            MergeResult::NoCommonAncestor => panic!("Expected common ancestor"),
587        }
588    }
589
590    #[test]
591    fn test_conflicting_merge() {
592        let tmp = tempfile::tempdir().unwrap();
593        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
594        let mut commits = CommitsTable::new();
595
596        // Base
597        obj.store
598            .add_triple(
599                &sample_triple("base", "Base"),
600                Namespace::World,
601                YLayer::Semantic,
602            )
603            .unwrap();
604        let base = create_commit(&obj, &mut commits, vec![], "base", "DGX").unwrap();
605
606        // Branch A: add (conflict-subj, rdf:type, TypeA)
607        obj.store
608            .add_triple(
609                &sample_triple("conflict-subj", "TypeA"),
610                Namespace::World,
611                YLayer::Semantic,
612            )
613            .unwrap();
614        let ca = create_commit(
615            &obj,
616            &mut commits,
617            vec![base.commit_id.clone()],
618            "branch-a",
619            "DGX",
620        )
621        .unwrap();
622
623        // Checkout base, Branch B: add (conflict-subj, rdf:type, TypeB)
624        checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
625        obj.store
626            .add_triple(
627                &sample_triple("conflict-subj", "TypeB"),
628                Namespace::World,
629                YLayer::Semantic,
630            )
631            .unwrap();
632        let cb = create_commit(
633            &obj,
634            &mut commits,
635            vec![base.commit_id.clone()],
636            "branch-b",
637            "DGX",
638        )
639        .unwrap();
640
641        // Merge should detect conflict
642        let result = merge(&mut obj, &mut commits, &ca.commit_id, &cb.commit_id, "DGX").unwrap();
643
644        match result {
645            MergeResult::Conflict(conflicts) => {
646                assert_eq!(conflicts.len(), 1);
647                assert_eq!(conflicts[0].subject, "conflict-subj");
648                assert_eq!(conflicts[0].object_a, "TypeA");
649                assert_eq!(conflicts[0].object_b, "TypeB");
650            }
651            _ => panic!("Expected conflict"),
652        }
653    }
654
655    #[test]
656    fn test_merge_commit_has_two_parents() {
657        let tmp = tempfile::tempdir().unwrap();
658        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
659        let mut commits = CommitsTable::new();
660
661        obj.store
662            .add_triple(
663                &sample_triple("base", "Base"),
664                Namespace::World,
665                YLayer::Semantic,
666            )
667            .unwrap();
668        let base = create_commit(&obj, &mut commits, vec![], "base", "DGX").unwrap();
669
670        // Two branches with non-conflicting changes
671        obj.store
672            .add_triple(&sample_triple("a", "A"), Namespace::World, YLayer::Semantic)
673            .unwrap();
674        let ca =
675            create_commit(&obj, &mut commits, vec![base.commit_id.clone()], "a", "DGX").unwrap();
676
677        checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
678        obj.store
679            .add_triple(&sample_triple("b", "B"), Namespace::Work, YLayer::Semantic)
680            .unwrap();
681        let cb =
682            create_commit(&obj, &mut commits, vec![base.commit_id.clone()], "b", "DGX").unwrap();
683
684        let result = merge(&mut obj, &mut commits, &ca.commit_id, &cb.commit_id, "DGX").unwrap();
685
686        match result {
687            MergeResult::Clean(mc) => {
688                assert_eq!(mc.parent_ids.len(), 2);
689                assert!(mc.parent_ids.contains(&ca.commit_id));
690                assert!(mc.parent_ids.contains(&cb.commit_id));
691            }
692            _ => panic!("Expected clean merge"),
693        }
694    }
695
696    /// Helper: create a conflicting scenario for strategy tests.
697    /// Returns (obj_store, commits_table, commit_a_id, commit_b_id).
698    fn setup_conflict_scenario() -> (GitObjectStore, CommitsTable, String, String) {
699        let tmp = tempfile::tempdir().unwrap();
700        // Leak the tempdir so it lives long enough
701        let tmp_path = tmp.path().to_owned();
702        std::mem::forget(tmp);
703        let mut obj = GitObjectStore::with_snapshot_dir(&tmp_path);
704        let mut commits = CommitsTable::new();
705
706        // Base
707        obj.store
708            .add_triple(
709                &sample_triple("base", "Base"),
710                Namespace::World,
711                YLayer::Semantic,
712            )
713            .unwrap();
714        let base = create_commit(&obj, &mut commits, vec![], "base", "DGX").unwrap();
715
716        // Branch A
717        obj.store
718            .add_triple(
719                &sample_triple("conflict-subj", "TypeA"),
720                Namespace::World,
721                YLayer::Semantic,
722            )
723            .unwrap();
724        let ca = create_commit(
725            &obj,
726            &mut commits,
727            vec![base.commit_id.clone()],
728            "branch-a",
729            "DGX",
730        )
731        .unwrap();
732
733        // Branch B
734        checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
735        obj.store
736            .add_triple(
737                &sample_triple("conflict-subj", "TypeB"),
738                Namespace::World,
739                YLayer::Semantic,
740            )
741            .unwrap();
742        let cb = create_commit(
743            &obj,
744            &mut commits,
745            vec![base.commit_id.clone()],
746            "branch-b",
747            "DGX",
748        )
749        .unwrap();
750
751        (obj, commits, ca.commit_id, cb.commit_id)
752    }
753
754    #[test]
755    fn test_strategy_manual_returns_conflict() {
756        let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
757        let result = merge_with_strategy(
758            &mut obj,
759            &mut commits,
760            &ca_id,
761            &cb_id,
762            "DGX",
763            &MergeStrategy::Manual,
764        )
765        .unwrap();
766        match result {
767            MergeResult::Conflict(conflicts) => {
768                assert_eq!(conflicts.len(), 1);
769                assert_eq!(conflicts[0].subject, "conflict-subj");
770            }
771            _ => panic!("Manual strategy should return Conflict"),
772        }
773    }
774
775    #[test]
776    fn test_strategy_ours() {
777        let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
778        let result = merge_with_strategy(
779            &mut obj,
780            &mut commits,
781            &ca_id,
782            &cb_id,
783            "DGX",
784            &MergeStrategy::Ours,
785        )
786        .unwrap();
787        match result {
788            MergeResult::Clean(_) => {
789                // Store should have base + TypeA (ours), NOT TypeB
790                let batches = obj
791                    .store
792                    .query(&nusy_arrow_core::QuerySpec {
793                        subject: Some("conflict-subj".to_string()),
794                        ..Default::default()
795                    })
796                    .unwrap();
797                let total: usize = batches.iter().map(|b| b.num_rows()).sum();
798                assert_eq!(total, 1, "Should have exactly one triple for conflict-subj");
799                // Verify it's TypeA
800                let batch = &batches[0];
801                let obj_col = batch
802                    .column(col::OBJECT)
803                    .as_any()
804                    .downcast_ref::<arrow::array::StringArray>()
805                    .unwrap();
806                assert_eq!(obj_col.value(0), "TypeA");
807            }
808            _ => panic!("Ours strategy should produce Clean merge"),
809        }
810    }
811
812    #[test]
813    fn test_strategy_theirs() {
814        let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
815        let result = merge_with_strategy(
816            &mut obj,
817            &mut commits,
818            &ca_id,
819            &cb_id,
820            "DGX",
821            &MergeStrategy::Theirs,
822        )
823        .unwrap();
824        match result {
825            MergeResult::Clean(_) => {
826                let batches = obj
827                    .store
828                    .query(&nusy_arrow_core::QuerySpec {
829                        subject: Some("conflict-subj".to_string()),
830                        ..Default::default()
831                    })
832                    .unwrap();
833                let total: usize = batches.iter().map(|b| b.num_rows()).sum();
834                assert_eq!(total, 1, "Should have exactly one triple for conflict-subj");
835                let batch = &batches[0];
836                let obj_col = batch
837                    .column(col::OBJECT)
838                    .as_any()
839                    .downcast_ref::<arrow::array::StringArray>()
840                    .unwrap();
841                assert_eq!(obj_col.value(0), "TypeB");
842            }
843            _ => panic!("Theirs strategy should produce Clean merge"),
844        }
845    }
846
847    #[test]
848    fn test_strategy_keep_both() {
849        let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
850        let result = merge_with_strategy(
851            &mut obj,
852            &mut commits,
853            &ca_id,
854            &cb_id,
855            "DGX",
856            &MergeStrategy::Custom(Box::new(|_| Resolution::KeepBoth)),
857        )
858        .unwrap();
859        match result {
860            MergeResult::Clean(_) => {
861                let batches = obj
862                    .store
863                    .query(&nusy_arrow_core::QuerySpec {
864                        subject: Some("conflict-subj".to_string()),
865                        ..Default::default()
866                    })
867                    .unwrap();
868                let total: usize = batches.iter().map(|b| b.num_rows()).sum();
869                assert_eq!(total, 2, "KeepBoth should preserve both triples");
870            }
871            _ => panic!("Custom(KeepBoth) strategy should produce Clean merge"),
872        }
873    }
874
875    #[test]
876    fn test_strategy_drop() {
877        let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
878        let result = merge_with_strategy(
879            &mut obj,
880            &mut commits,
881            &ca_id,
882            &cb_id,
883            "DGX",
884            &MergeStrategy::Custom(Box::new(|_| Resolution::Drop)),
885        )
886        .unwrap();
887        match result {
888            MergeResult::Clean(_) => {
889                let batches = obj
890                    .store
891                    .query(&nusy_arrow_core::QuerySpec {
892                        subject: Some("conflict-subj".to_string()),
893                        ..Default::default()
894                    })
895                    .unwrap();
896                let total: usize = batches.iter().map(|b| b.num_rows()).sum();
897                assert_eq!(total, 0, "Drop should remove both conflicting triples");
898            }
899            _ => panic!("Custom(Drop) strategy should produce Clean merge"),
900        }
901    }
902
903    #[test]
904    fn test_strategy_last_writer_wins() {
905        let tmp = tempfile::tempdir().unwrap();
906        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
907        let mut commits = CommitsTable::new();
908
909        // Base
910        obj.store
911            .add_triple(
912                &sample_triple("base", "Base"),
913                Namespace::World,
914                YLayer::Semantic,
915            )
916            .unwrap();
917        let base = create_commit(&obj, &mut commits, vec![], "base", "DGX").unwrap();
918
919        // Branch A: older timestamp
920        let triple_a = Triple {
921            subject: "ts-subj".to_string(),
922            predicate: "rdf:type".to_string(),
923            object: "OlderValue".to_string(),
924            graph: None,
925            confidence: Some(0.9),
926            source_document: None,
927            source_chunk_id: None,
928            extracted_by: None,
929            caused_by: None,
930            derived_from: None,
931            consolidated_at: Some(1000), // older
932            certifiability_class: None,
933        };
934        obj.store
935            .add_triple(&triple_a, Namespace::World, YLayer::Semantic)
936            .unwrap();
937        let ca = create_commit(
938            &obj,
939            &mut commits,
940            vec![base.commit_id.clone()],
941            "older",
942            "DGX",
943        )
944        .unwrap();
945
946        // Branch B: newer timestamp
947        checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
948        let triple_b = Triple {
949            subject: "ts-subj".to_string(),
950            predicate: "rdf:type".to_string(),
951            object: "NewerValue".to_string(),
952            graph: None,
953            confidence: Some(0.9),
954            source_document: None,
955            source_chunk_id: None,
956            extracted_by: None,
957            caused_by: None,
958            derived_from: None,
959            consolidated_at: Some(2000), // newer
960            certifiability_class: None,
961        };
962        obj.store
963            .add_triple(&triple_b, Namespace::World, YLayer::Semantic)
964            .unwrap();
965        let cb = create_commit(
966            &obj,
967            &mut commits,
968            vec![base.commit_id.clone()],
969            "newer",
970            "DGX",
971        )
972        .unwrap();
973
974        let result = merge_with_strategy(
975            &mut obj,
976            &mut commits,
977            &ca.commit_id,
978            &cb.commit_id,
979            "DGX",
980            &MergeStrategy::LastWriterWins,
981        )
982        .unwrap();
983
984        match result {
985            MergeResult::Clean(_) => {
986                let batches = obj
987                    .store
988                    .query(&nusy_arrow_core::QuerySpec {
989                        subject: Some("ts-subj".to_string()),
990                        ..Default::default()
991                    })
992                    .unwrap();
993                let total: usize = batches.iter().map(|b| b.num_rows()).sum();
994                assert_eq!(total, 1);
995                let batch = &batches[0];
996                let obj_col = batch
997                    .column(col::OBJECT)
998                    .as_any()
999                    .downcast_ref::<arrow::array::StringArray>()
1000                    .unwrap();
1001                assert_eq!(
1002                    obj_col.value(0),
1003                    "NewerValue",
1004                    "LastWriterWins should pick the newer timestamp"
1005                );
1006            }
1007            _ => panic!("LastWriterWins should produce Clean merge"),
1008        }
1009    }
1010
1011    #[test]
1012    fn test_strategy_custom_conditional() {
1013        let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
1014        // Custom strategy: keep ours if subject starts with "conflict", theirs otherwise
1015        let result = merge_with_strategy(
1016            &mut obj,
1017            &mut commits,
1018            &ca_id,
1019            &cb_id,
1020            "DGX",
1021            &MergeStrategy::Custom(Box::new(|c| {
1022                if c.subject.starts_with("conflict") {
1023                    Resolution::KeepOurs
1024                } else {
1025                    Resolution::KeepTheirs
1026                }
1027            })),
1028        )
1029        .unwrap();
1030        match result {
1031            MergeResult::Clean(_) => {
1032                let batches = obj
1033                    .store
1034                    .query(&nusy_arrow_core::QuerySpec {
1035                        subject: Some("conflict-subj".to_string()),
1036                        ..Default::default()
1037                    })
1038                    .unwrap();
1039                let batch = &batches[0];
1040                let obj_col = batch
1041                    .column(col::OBJECT)
1042                    .as_any()
1043                    .downcast_ref::<arrow::array::StringArray>()
1044                    .unwrap();
1045                assert_eq!(
1046                    obj_col.value(0),
1047                    "TypeA",
1048                    "Custom should keep ours for conflict-* subjects"
1049                );
1050            }
1051            _ => panic!("Custom strategy should produce Clean merge"),
1052        }
1053    }
1054
1055    #[test]
1056    fn test_strategy_on_no_conflict_still_clean() {
1057        // When there are no conflicts, strategy doesn't matter — should still merge clean
1058        let tmp = tempfile::tempdir().unwrap();
1059        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
1060        let mut commits = CommitsTable::new();
1061
1062        obj.store
1063            .add_triple(
1064                &sample_triple("base", "Base"),
1065                Namespace::World,
1066                YLayer::Semantic,
1067            )
1068            .unwrap();
1069        let base = create_commit(&obj, &mut commits, vec![], "base", "DGX").unwrap();
1070
1071        // Non-conflicting branches
1072        obj.store
1073            .add_triple(
1074                &sample_triple("a-only", "A"),
1075                Namespace::World,
1076                YLayer::Semantic,
1077            )
1078            .unwrap();
1079        let ca =
1080            create_commit(&obj, &mut commits, vec![base.commit_id.clone()], "a", "DGX").unwrap();
1081
1082        checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
1083        obj.store
1084            .add_triple(
1085                &sample_triple("b-only", "B"),
1086                Namespace::Work,
1087                YLayer::Semantic,
1088            )
1089            .unwrap();
1090        let cb =
1091            create_commit(&obj, &mut commits, vec![base.commit_id.clone()], "b", "DGX").unwrap();
1092
1093        let result = merge_with_strategy(
1094            &mut obj,
1095            &mut commits,
1096            &ca.commit_id,
1097            &cb.commit_id,
1098            "DGX",
1099            &MergeStrategy::Ours,
1100        )
1101        .unwrap();
1102        match result {
1103            MergeResult::Clean(mc) => {
1104                assert_eq!(mc.parent_ids.len(), 2);
1105                assert!(obj.store.len() >= 3);
1106            }
1107            _ => panic!("Non-conflicting merge with any strategy should be Clean"),
1108        }
1109    }
1110}