Skip to main content

arrow_graph_git/
merge.rs

1//! Merge — 3-way merge with conflict detection.
2//!
3//! Finds common ancestor, computes diffs from both branches,
4//! detects conflicts (same subject+predicate with different objects),
5//! and produces either a clean merge or a conflict report.
6
7use crate::checkout;
8use crate::commit::{Commit, CommitError, CommitsTable, create_commit};
9use crate::diff::{self, DiffEntry};
10use crate::history::find_common_ancestor;
11use crate::object_store::GitObjectStore;
12use arrow_graph_core::{Triple, col};
13use std::collections::{HashMap, HashSet};
14
15/// A merge conflict: same (subject, predicate, namespace) with different objects.
16#[derive(Debug, Clone)]
17pub struct Conflict {
18    pub subject: String,
19    pub predicate: String,
20    pub namespace: String,
21    /// The object value from branch A.
22    pub object_a: String,
23    /// The object value from branch B.
24    pub object_b: String,
25}
26
27/// Result of a merge operation.
28#[derive(Debug)]
29pub enum MergeResult {
30    /// Clean merge — all changes applied, new commit created.
31    Clean(Commit),
32    /// Conflicts detected — manual resolution needed.
33    Conflict(Vec<Conflict>),
34    /// No common ancestor found (disconnected histories).
35    NoCommonAncestor,
36}
37
38/// Errors from merge operations.
39#[derive(Debug, thiserror::Error)]
40pub enum MergeError {
41    #[error("Commit error: {0}")]
42    Commit(#[from] CommitError),
43
44    #[error("Store error: {0}")]
45    Store(#[from] arrow_graph_core::StoreError),
46}
47
48/// How to resolve a single conflict.
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum Resolution {
51    /// Keep the value from branch A (current HEAD).
52    KeepOurs,
53    /// Keep the value from branch B (incoming).
54    KeepTheirs,
55    /// Keep both values as separate triples.
56    KeepBoth,
57    /// Drop both values (neither survives the merge).
58    Drop,
59}
60
61/// Strategy for automatic conflict resolution during merge.
62pub enum MergeStrategy {
63    /// Default: return `MergeResult::Conflict` for manual resolution.
64    Manual,
65    /// Keep branch A's value for all conflicts.
66    Ours,
67    /// Keep branch B's value for all conflicts.
68    Theirs,
69    /// Compare timestamps, keep the newer value.
70    LastWriterWins,
71    /// Caller-defined logic: receives a `&Conflict` and returns a `Resolution`.
72    Custom(Box<dyn Fn(&Conflict) -> Resolution>),
73}
74
75/// Key for conflict detection: (subject, predicate, namespace).
76#[derive(Debug, Clone, PartialEq, Eq, Hash)]
77struct ConflictKey {
78    subject: String,
79    predicate: String,
80    namespace: String,
81}
82
83/// Perform a 3-way merge between two commits.
84///
85/// 1. Find common ancestor
86/// 2. Diff ancestor->A and ancestor->B
87/// 3. Detect conflicts (same subject+predicate, different objects)
88/// 4. If no conflicts, apply both diffs and create merge commit
89pub fn merge(
90    obj_store: &mut GitObjectStore,
91    commits_table: &mut CommitsTable,
92    commit_a_id: &str,
93    commit_b_id: &str,
94    author: &str,
95) -> Result<MergeResult, MergeError> {
96    // Find common ancestor
97    let ancestor = match find_common_ancestor(commits_table, commit_a_id, commit_b_id) {
98        Some(a) => a.commit_id.clone(),
99        None => return Ok(MergeResult::NoCommonAncestor),
100    };
101
102    // Diff ancestor->A
103    let diff_a = diff::diff(obj_store, commits_table, &ancestor, commit_a_id)?;
104    // Diff ancestor->B
105    let diff_b = diff::diff(obj_store, commits_table, &ancestor, commit_b_id)?;
106
107    // Check for conflicts: additions in both with same (subject, predicate, namespace) but different object
108    let a_adds: HashMap<ConflictKey, &DiffEntry> = diff_a
109        .added
110        .iter()
111        .map(|e| {
112            (
113                ConflictKey {
114                    subject: e.subject.clone(),
115                    predicate: e.predicate.clone(),
116                    namespace: e.namespace.clone(),
117                },
118                e,
119            )
120        })
121        .collect();
122
123    let mut conflicts = Vec::new();
124    for entry_b in &diff_b.added {
125        let key = ConflictKey {
126            subject: entry_b.subject.clone(),
127            predicate: entry_b.predicate.clone(),
128            namespace: entry_b.namespace.clone(),
129        };
130        if let Some(entry_a) = a_adds.get(&key)
131            && entry_a.object != entry_b.object
132        {
133            conflicts.push(Conflict {
134                subject: key.subject,
135                predicate: key.predicate,
136                namespace: key.namespace,
137                object_a: entry_a.object.clone(),
138                object_b: entry_b.object.clone(),
139            });
140        }
141    }
142
143    if !conflicts.is_empty() {
144        return Ok(MergeResult::Conflict(conflicts));
145    }
146
147    // No conflicts — delegate to shared clean merge path
148    apply_clean_merge(
149        obj_store,
150        commits_table,
151        &ancestor,
152        commit_a_id,
153        commit_b_id,
154        &diff_a,
155        &diff_b,
156        author,
157    )
158}
159
160/// Perform a 3-way merge with automatic conflict resolution.
161///
162/// Like [`merge()`], but applies a [`MergeStrategy`] when conflicts are detected.
163/// With `MergeStrategy::Manual`, this behaves identically to `merge()`.
164pub fn merge_with_strategy(
165    obj_store: &mut GitObjectStore,
166    commits_table: &mut CommitsTable,
167    commit_a_id: &str,
168    commit_b_id: &str,
169    author: &str,
170    strategy: &MergeStrategy,
171) -> Result<MergeResult, MergeError> {
172    // Find common ancestor
173    let ancestor = match find_common_ancestor(commits_table, commit_a_id, commit_b_id) {
174        Some(a) => a.commit_id.clone(),
175        None => return Ok(MergeResult::NoCommonAncestor),
176    };
177
178    // Diff ancestor->A and ancestor->B
179    let diff_a = diff::diff(obj_store, commits_table, &ancestor, commit_a_id)?;
180    let diff_b = diff::diff(obj_store, commits_table, &ancestor, commit_b_id)?;
181
182    // Detect conflicts
183    let a_adds: HashMap<ConflictKey, &DiffEntry> = diff_a
184        .added
185        .iter()
186        .map(|e| {
187            (
188                ConflictKey {
189                    subject: e.subject.clone(),
190                    predicate: e.predicate.clone(),
191                    namespace: e.namespace.clone(),
192                },
193                e,
194            )
195        })
196        .collect();
197
198    let b_adds: HashMap<ConflictKey, &DiffEntry> = diff_b
199        .added
200        .iter()
201        .map(|e| {
202            (
203                ConflictKey {
204                    subject: e.subject.clone(),
205                    predicate: e.predicate.clone(),
206                    namespace: e.namespace.clone(),
207                },
208                e,
209            )
210        })
211        .collect();
212
213    let mut conflicts = Vec::new();
214    for (key, entry_b) in &b_adds {
215        if let Some(entry_a) = a_adds.get(key)
216            && entry_a.object != entry_b.object
217        {
218            conflicts.push(Conflict {
219                subject: key.subject.clone(),
220                predicate: key.predicate.clone(),
221                namespace: key.namespace.clone(),
222                object_a: entry_a.object.clone(),
223                object_b: entry_b.object.clone(),
224            });
225        }
226    }
227
228    // If no conflicts, delegate to the clean merge path
229    if conflicts.is_empty() {
230        return apply_clean_merge(
231            obj_store,
232            commits_table,
233            &ancestor,
234            commit_a_id,
235            commit_b_id,
236            &diff_a,
237            &diff_b,
238            author,
239        );
240    }
241
242    // Apply strategy
243    if matches!(strategy, MergeStrategy::Manual) {
244        return Ok(MergeResult::Conflict(conflicts));
245    }
246
247    // Resolve each conflict according to strategy
248    let mut resolved_keys: HashMap<ConflictKey, Resolution> = HashMap::new();
249    for conflict in &conflicts {
250        let resolution = match strategy {
251            MergeStrategy::Manual => unreachable!(),
252            MergeStrategy::Ours => Resolution::KeepOurs,
253            MergeStrategy::Theirs => Resolution::KeepTheirs,
254            MergeStrategy::LastWriterWins => {
255                // Compare consolidated_at timestamps from the diff entries
256                let key = ConflictKey {
257                    subject: conflict.subject.clone(),
258                    predicate: conflict.predicate.clone(),
259                    namespace: conflict.namespace.clone(),
260                };
261                let ts_a = a_adds
262                    .get(&key)
263                    .and_then(|e| e.consolidated_at)
264                    .unwrap_or(0);
265                let ts_b = b_adds
266                    .get(&key)
267                    .and_then(|e| e.consolidated_at)
268                    .unwrap_or(0);
269                if ts_a >= ts_b {
270                    Resolution::KeepOurs
271                } else {
272                    Resolution::KeepTheirs
273                }
274            }
275            MergeStrategy::Custom(f) => f(conflict),
276        };
277        resolved_keys.insert(
278            ConflictKey {
279                subject: conflict.subject.clone(),
280                predicate: conflict.predicate.clone(),
281                namespace: conflict.namespace.clone(),
282            },
283            resolution,
284        );
285    }
286
287    // Start from ancestor state
288    checkout::checkout(obj_store, commits_table, &ancestor)?;
289
290    // Build the set of additions, applying resolutions to conflicts
291    let mut all_adds: HashMap<(String, String, String, String), &DiffEntry> = HashMap::new();
292
293    // First add all non-conflicting additions from both branches
294    for entry in diff_a.added.iter().chain(diff_b.added.iter()) {
295        let conflict_key = ConflictKey {
296            subject: entry.subject.clone(),
297            predicate: entry.predicate.clone(),
298            namespace: entry.namespace.clone(),
299        };
300
301        if let Some(resolution) = resolved_keys.get(&conflict_key) {
302            // This is a conflicted key — handle based on resolution
303            let spo_key = (
304                entry.subject.clone(),
305                entry.predicate.clone(),
306                entry.object.clone(),
307                entry.namespace.clone(),
308            );
309            match resolution {
310                Resolution::KeepOurs => {
311                    if a_adds.contains_key(&conflict_key)
312                        && a_adds.get(&conflict_key).map(|e| &e.object) == Some(&entry.object)
313                    {
314                        all_adds.insert(spo_key, entry);
315                    }
316                }
317                Resolution::KeepTheirs => {
318                    if b_adds.contains_key(&conflict_key)
319                        && b_adds.get(&conflict_key).map(|e| &e.object) == Some(&entry.object)
320                    {
321                        all_adds.insert(spo_key, entry);
322                    }
323                }
324                Resolution::KeepBoth => {
325                    all_adds.insert(spo_key, entry);
326                }
327                Resolution::Drop => {
328                    // Skip — don't add either
329                }
330            }
331        } else {
332            // Non-conflicting — add as normal (dedup by spo key, first wins)
333            let key = (
334                entry.subject.clone(),
335                entry.predicate.clone(),
336                entry.object.clone(),
337                entry.namespace.clone(),
338            );
339            all_adds.entry(key).or_insert(entry);
340        }
341    }
342
343    // Apply additions to store
344    for entry in all_adds.values() {
345        let triple = Triple {
346            subject: entry.subject.clone(),
347            predicate: entry.predicate.clone(),
348            object: entry.object.clone(),
349            graph: entry.graph.clone(),
350            confidence: entry.confidence,
351            source_document: entry.source_document.clone(),
352            source_chunk_id: entry.source_chunk_id.clone(),
353            extracted_by: Some(format!("merge by {author}")),
354            caused_by: entry.caused_by.clone(),
355            derived_from: entry.derived_from.clone(),
356            consolidated_at: entry.consolidated_at,
357        };
358        obj_store
359            .store
360            .add_triple(&triple, &entry.namespace, Some(entry.y_layer))?;
361    }
362
363    // Apply removals from both diffs
364    apply_removals(obj_store, &diff_a, &diff_b);
365
366    // Create merge commit
367    let merge_commit = create_commit(
368        obj_store,
369        commits_table,
370        vec![commit_a_id.to_string(), commit_b_id.to_string()],
371        &format!("Merge {} into {} (resolved)", commit_b_id, commit_a_id),
372        author,
373    )?;
374
375    Ok(MergeResult::Clean(merge_commit))
376}
377
378/// Apply the clean merge path (shared by `merge` and `merge_with_strategy`).
379#[allow(clippy::too_many_arguments)]
380fn apply_clean_merge(
381    obj_store: &mut GitObjectStore,
382    commits_table: &mut CommitsTable,
383    ancestor: &str,
384    commit_a_id: &str,
385    commit_b_id: &str,
386    diff_a: &diff::DiffResult,
387    diff_b: &diff::DiffResult,
388    author: &str,
389) -> Result<MergeResult, MergeError> {
390    checkout::checkout(obj_store, commits_table, ancestor)?;
391
392    let mut all_adds: HashMap<(String, String, String, String), &DiffEntry> = HashMap::new();
393    for entry in diff_a.added.iter().chain(diff_b.added.iter()) {
394        let key = (
395            entry.subject.clone(),
396            entry.predicate.clone(),
397            entry.object.clone(),
398            entry.namespace.clone(),
399        );
400        all_adds.entry(key).or_insert(entry);
401    }
402
403    for entry in all_adds.values() {
404        let triple = Triple {
405            subject: entry.subject.clone(),
406            predicate: entry.predicate.clone(),
407            object: entry.object.clone(),
408            graph: entry.graph.clone(),
409            confidence: entry.confidence,
410            source_document: entry.source_document.clone(),
411            source_chunk_id: entry.source_chunk_id.clone(),
412            extracted_by: Some(format!("merge by {author}")),
413            caused_by: entry.caused_by.clone(),
414            derived_from: entry.derived_from.clone(),
415            consolidated_at: entry.consolidated_at,
416        };
417        obj_store
418            .store
419            .add_triple(&triple, &entry.namespace, Some(entry.y_layer))?;
420    }
421
422    apply_removals(obj_store, diff_a, diff_b);
423
424    let merge_commit = create_commit(
425        obj_store,
426        commits_table,
427        vec![commit_a_id.to_string(), commit_b_id.to_string()],
428        &format!("Merge {} into {}", commit_b_id, commit_a_id),
429        author,
430    )?;
431
432    Ok(MergeResult::Clean(merge_commit))
433}
434
435/// Apply removals from both diffs to the current store state.
436fn apply_removals(
437    obj_store: &mut GitObjectStore,
438    diff_a: &diff::DiffResult,
439    diff_b: &diff::DiffResult,
440) {
441    let all_removals: HashSet<(String, String, String, String)> = diff_a
442        .removed
443        .iter()
444        .chain(diff_b.removed.iter())
445        .map(|e| {
446            (
447                e.subject.clone(),
448                e.predicate.clone(),
449                e.object.clone(),
450                e.namespace.clone(),
451            )
452        })
453        .collect();
454
455    for ns in obj_store.store.namespaces().to_vec() {
456        let batches = obj_store.store.get_namespace_batches(&ns);
457        let ns_str = ns.clone();
458
459        let mut ids_to_delete = Vec::new();
460        for batch in batches {
461            let id_col = batch
462                .column(col::TRIPLE_ID)
463                .as_any()
464                .downcast_ref::<arrow::array::StringArray>()
465                .expect("triple_id column");
466            let subj_col = batch
467                .column(col::SUBJECT)
468                .as_any()
469                .downcast_ref::<arrow::array::StringArray>()
470                .expect("subject column");
471            let pred_col = batch
472                .column(col::PREDICATE)
473                .as_any()
474                .downcast_ref::<arrow::array::StringArray>()
475                .expect("predicate column");
476            let obj_col = batch
477                .column(col::OBJECT)
478                .as_any()
479                .downcast_ref::<arrow::array::StringArray>()
480                .expect("object column");
481
482            for i in 0..batch.num_rows() {
483                let key = (
484                    subj_col.value(i).to_string(),
485                    pred_col.value(i).to_string(),
486                    obj_col.value(i).to_string(),
487                    ns_str.clone(),
488                );
489                if all_removals.contains(&key) {
490                    ids_to_delete.push(id_col.value(i).to_string());
491                }
492            }
493        }
494
495        for id in &ids_to_delete {
496            let _ = obj_store.store.delete(id);
497        }
498    }
499}
500
501#[cfg(test)]
502mod tests {
503    use super::*;
504    use crate::commit::create_commit;
505
506    fn sample_triple(subj: &str, obj: &str) -> Triple {
507        Triple {
508            subject: subj.to_string(),
509            predicate: "rdf:type".to_string(),
510            object: obj.to_string(),
511            graph: None,
512            confidence: Some(0.9),
513            source_document: None,
514            source_chunk_id: None,
515            extracted_by: None,
516            caused_by: None,
517            derived_from: None,
518            consolidated_at: None,
519        }
520    }
521
522    #[test]
523    fn test_non_conflicting_merge() {
524        let tmp = tempfile::tempdir().unwrap();
525        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
526        let mut commits = CommitsTable::new();
527
528        // Create base commit
529        obj.store
530            .add_triple(&sample_triple("base", "Base"), "world", Some(1u8))
531            .unwrap();
532        let base = create_commit(&obj, &mut commits, vec![], "base", "test").unwrap();
533
534        // Branch A: add triple A
535        obj.store
536            .add_triple(&sample_triple("a-only", "A"), "world", Some(1u8))
537            .unwrap();
538        let ca = create_commit(
539            &obj,
540            &mut commits,
541            vec![base.commit_id.clone()],
542            "branch-a",
543            "test",
544        )
545        .unwrap();
546
547        // Checkout base, add triple B
548        checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
549        obj.store
550            .add_triple(&sample_triple("b-only", "B"), "world", Some(1u8))
551            .unwrap();
552        let cb = create_commit(
553            &obj,
554            &mut commits,
555            vec![base.commit_id.clone()],
556            "branch-b",
557            "test",
558        )
559        .unwrap();
560
561        // Merge
562        let result = merge(&mut obj, &mut commits, &ca.commit_id, &cb.commit_id, "test").unwrap();
563
564        match result {
565            MergeResult::Clean(mc) => {
566                assert_eq!(mc.parent_ids.len(), 2);
567                // After merge, store should have base + a-only + b-only
568                assert!(obj.store.len() >= 3);
569            }
570            MergeResult::Conflict(_) => panic!("Expected clean merge"),
571            MergeResult::NoCommonAncestor => panic!("Expected common ancestor"),
572        }
573    }
574
575    #[test]
576    fn test_conflicting_merge() {
577        let tmp = tempfile::tempdir().unwrap();
578        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
579        let mut commits = CommitsTable::new();
580
581        // Base
582        obj.store
583            .add_triple(&sample_triple("base", "Base"), "world", Some(1u8))
584            .unwrap();
585        let base = create_commit(&obj, &mut commits, vec![], "base", "test").unwrap();
586
587        // Branch A: add (conflict-subj, rdf:type, TypeA)
588        obj.store
589            .add_triple(&sample_triple("conflict-subj", "TypeA"), "world", Some(1u8))
590            .unwrap();
591        let ca = create_commit(
592            &obj,
593            &mut commits,
594            vec![base.commit_id.clone()],
595            "branch-a",
596            "test",
597        )
598        .unwrap();
599
600        // Checkout base, Branch B: add (conflict-subj, rdf:type, TypeB)
601        checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
602        obj.store
603            .add_triple(&sample_triple("conflict-subj", "TypeB"), "world", Some(1u8))
604            .unwrap();
605        let cb = create_commit(
606            &obj,
607            &mut commits,
608            vec![base.commit_id.clone()],
609            "branch-b",
610            "test",
611        )
612        .unwrap();
613
614        // Merge should detect conflict
615        let result = merge(&mut obj, &mut commits, &ca.commit_id, &cb.commit_id, "test").unwrap();
616
617        match result {
618            MergeResult::Conflict(conflicts) => {
619                assert_eq!(conflicts.len(), 1);
620                assert_eq!(conflicts[0].subject, "conflict-subj");
621                assert_eq!(conflicts[0].object_a, "TypeA");
622                assert_eq!(conflicts[0].object_b, "TypeB");
623            }
624            _ => panic!("Expected conflict"),
625        }
626    }
627
628    #[test]
629    fn test_merge_commit_has_two_parents() {
630        let tmp = tempfile::tempdir().unwrap();
631        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
632        let mut commits = CommitsTable::new();
633
634        obj.store
635            .add_triple(&sample_triple("base", "Base"), "world", Some(1u8))
636            .unwrap();
637        let base = create_commit(&obj, &mut commits, vec![], "base", "test").unwrap();
638
639        // Two branches with non-conflicting changes
640        obj.store
641            .add_triple(&sample_triple("a", "A"), "world", Some(1u8))
642            .unwrap();
643        let ca = create_commit(
644            &obj,
645            &mut commits,
646            vec![base.commit_id.clone()],
647            "a",
648            "test",
649        )
650        .unwrap();
651
652        checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
653        obj.store
654            .add_triple(&sample_triple("b", "B"), "work", Some(1u8))
655            .unwrap();
656        let cb = create_commit(
657            &obj,
658            &mut commits,
659            vec![base.commit_id.clone()],
660            "b",
661            "test",
662        )
663        .unwrap();
664
665        let result = merge(&mut obj, &mut commits, &ca.commit_id, &cb.commit_id, "test").unwrap();
666
667        match result {
668            MergeResult::Clean(mc) => {
669                assert_eq!(mc.parent_ids.len(), 2);
670                assert!(mc.parent_ids.contains(&ca.commit_id));
671                assert!(mc.parent_ids.contains(&cb.commit_id));
672            }
673            _ => panic!("Expected clean merge"),
674        }
675    }
676
677    /// Helper: create a conflicting scenario for strategy tests.
678    /// Returns (obj_store, commits_table, commit_a_id, commit_b_id).
679    fn setup_conflict_scenario() -> (GitObjectStore, CommitsTable, String, String) {
680        let tmp = tempfile::tempdir().unwrap();
681        // Leak the tempdir so it lives long enough
682        let tmp_path = tmp.path().to_owned();
683        std::mem::forget(tmp);
684        let mut obj = GitObjectStore::with_snapshot_dir(&tmp_path);
685        let mut commits = CommitsTable::new();
686
687        // Base
688        obj.store
689            .add_triple(&sample_triple("base", "Base"), "world", Some(1u8))
690            .unwrap();
691        let base = create_commit(&obj, &mut commits, vec![], "base", "test").unwrap();
692
693        // Branch A
694        obj.store
695            .add_triple(&sample_triple("conflict-subj", "TypeA"), "world", Some(1u8))
696            .unwrap();
697        let ca = create_commit(
698            &obj,
699            &mut commits,
700            vec![base.commit_id.clone()],
701            "branch-a",
702            "test",
703        )
704        .unwrap();
705
706        // Branch B
707        checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
708        obj.store
709            .add_triple(&sample_triple("conflict-subj", "TypeB"), "world", Some(1u8))
710            .unwrap();
711        let cb = create_commit(
712            &obj,
713            &mut commits,
714            vec![base.commit_id.clone()],
715            "branch-b",
716            "test",
717        )
718        .unwrap();
719
720        (obj, commits, ca.commit_id, cb.commit_id)
721    }
722
723    #[test]
724    fn test_strategy_manual_returns_conflict() {
725        let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
726        let result = merge_with_strategy(
727            &mut obj,
728            &mut commits,
729            &ca_id,
730            &cb_id,
731            "test",
732            &MergeStrategy::Manual,
733        )
734        .unwrap();
735        match result {
736            MergeResult::Conflict(conflicts) => {
737                assert_eq!(conflicts.len(), 1);
738                assert_eq!(conflicts[0].subject, "conflict-subj");
739            }
740            _ => panic!("Manual strategy should return Conflict"),
741        }
742    }
743
744    #[test]
745    fn test_strategy_ours() {
746        let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
747        let result = merge_with_strategy(
748            &mut obj,
749            &mut commits,
750            &ca_id,
751            &cb_id,
752            "test",
753            &MergeStrategy::Ours,
754        )
755        .unwrap();
756        match result {
757            MergeResult::Clean(_) => {
758                // Store should have base + TypeA (ours), NOT TypeB
759                let batches = obj
760                    .store
761                    .query(&arrow_graph_core::QuerySpec {
762                        subject: Some("conflict-subj".to_string()),
763                        ..Default::default()
764                    })
765                    .unwrap();
766                let total: usize = batches.iter().map(|b| b.num_rows()).sum();
767                assert_eq!(total, 1, "Should have exactly one triple for conflict-subj");
768                // Verify it's TypeA
769                let batch = &batches[0];
770                let obj_col = batch
771                    .column(col::OBJECT)
772                    .as_any()
773                    .downcast_ref::<arrow::array::StringArray>()
774                    .unwrap();
775                assert_eq!(obj_col.value(0), "TypeA");
776            }
777            _ => panic!("Ours strategy should produce Clean merge"),
778        }
779    }
780
781    #[test]
782    fn test_strategy_theirs() {
783        let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
784        let result = merge_with_strategy(
785            &mut obj,
786            &mut commits,
787            &ca_id,
788            &cb_id,
789            "test",
790            &MergeStrategy::Theirs,
791        )
792        .unwrap();
793        match result {
794            MergeResult::Clean(_) => {
795                let batches = obj
796                    .store
797                    .query(&arrow_graph_core::QuerySpec {
798                        subject: Some("conflict-subj".to_string()),
799                        ..Default::default()
800                    })
801                    .unwrap();
802                let total: usize = batches.iter().map(|b| b.num_rows()).sum();
803                assert_eq!(total, 1, "Should have exactly one triple for conflict-subj");
804                let batch = &batches[0];
805                let obj_col = batch
806                    .column(col::OBJECT)
807                    .as_any()
808                    .downcast_ref::<arrow::array::StringArray>()
809                    .unwrap();
810                assert_eq!(obj_col.value(0), "TypeB");
811            }
812            _ => panic!("Theirs strategy should produce Clean merge"),
813        }
814    }
815
816    #[test]
817    fn test_strategy_keep_both() {
818        let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
819        let result = merge_with_strategy(
820            &mut obj,
821            &mut commits,
822            &ca_id,
823            &cb_id,
824            "test",
825            &MergeStrategy::Custom(Box::new(|_| Resolution::KeepBoth)),
826        )
827        .unwrap();
828        match result {
829            MergeResult::Clean(_) => {
830                let batches = obj
831                    .store
832                    .query(&arrow_graph_core::QuerySpec {
833                        subject: Some("conflict-subj".to_string()),
834                        ..Default::default()
835                    })
836                    .unwrap();
837                let total: usize = batches.iter().map(|b| b.num_rows()).sum();
838                assert_eq!(total, 2, "KeepBoth should preserve both triples");
839            }
840            _ => panic!("Custom(KeepBoth) strategy should produce Clean merge"),
841        }
842    }
843
844    #[test]
845    fn test_strategy_drop() {
846        let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
847        let result = merge_with_strategy(
848            &mut obj,
849            &mut commits,
850            &ca_id,
851            &cb_id,
852            "test",
853            &MergeStrategy::Custom(Box::new(|_| Resolution::Drop)),
854        )
855        .unwrap();
856        match result {
857            MergeResult::Clean(_) => {
858                let batches = obj
859                    .store
860                    .query(&arrow_graph_core::QuerySpec {
861                        subject: Some("conflict-subj".to_string()),
862                        ..Default::default()
863                    })
864                    .unwrap();
865                let total: usize = batches.iter().map(|b| b.num_rows()).sum();
866                assert_eq!(total, 0, "Drop should remove both conflicting triples");
867            }
868            _ => panic!("Custom(Drop) strategy should produce Clean merge"),
869        }
870    }
871
872    #[test]
873    fn test_strategy_last_writer_wins() {
874        let tmp = tempfile::tempdir().unwrap();
875        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
876        let mut commits = CommitsTable::new();
877
878        // Base
879        obj.store
880            .add_triple(&sample_triple("base", "Base"), "world", Some(1u8))
881            .unwrap();
882        let base = create_commit(&obj, &mut commits, vec![], "base", "test").unwrap();
883
884        // Branch A: older timestamp
885        let triple_a = Triple {
886            subject: "ts-subj".to_string(),
887            predicate: "rdf:type".to_string(),
888            object: "OlderValue".to_string(),
889            graph: None,
890            confidence: Some(0.9),
891            source_document: None,
892            source_chunk_id: None,
893            extracted_by: None,
894            caused_by: None,
895            derived_from: None,
896            consolidated_at: Some(1000), // older
897        };
898        obj.store.add_triple(&triple_a, "world", Some(1u8)).unwrap();
899        let ca = create_commit(
900            &obj,
901            &mut commits,
902            vec![base.commit_id.clone()],
903            "older",
904            "test",
905        )
906        .unwrap();
907
908        // Branch B: newer timestamp
909        checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
910        let triple_b = Triple {
911            subject: "ts-subj".to_string(),
912            predicate: "rdf:type".to_string(),
913            object: "NewerValue".to_string(),
914            graph: None,
915            confidence: Some(0.9),
916            source_document: None,
917            source_chunk_id: None,
918            extracted_by: None,
919            caused_by: None,
920            derived_from: None,
921            consolidated_at: Some(2000), // newer
922        };
923        obj.store.add_triple(&triple_b, "world", Some(1u8)).unwrap();
924        let cb = create_commit(
925            &obj,
926            &mut commits,
927            vec![base.commit_id.clone()],
928            "newer",
929            "test",
930        )
931        .unwrap();
932
933        let result = merge_with_strategy(
934            &mut obj,
935            &mut commits,
936            &ca.commit_id,
937            &cb.commit_id,
938            "test",
939            &MergeStrategy::LastWriterWins,
940        )
941        .unwrap();
942
943        match result {
944            MergeResult::Clean(_) => {
945                let batches = obj
946                    .store
947                    .query(&arrow_graph_core::QuerySpec {
948                        subject: Some("ts-subj".to_string()),
949                        ..Default::default()
950                    })
951                    .unwrap();
952                let total: usize = batches.iter().map(|b| b.num_rows()).sum();
953                assert_eq!(total, 1);
954                let batch = &batches[0];
955                let obj_col = batch
956                    .column(col::OBJECT)
957                    .as_any()
958                    .downcast_ref::<arrow::array::StringArray>()
959                    .unwrap();
960                assert_eq!(
961                    obj_col.value(0),
962                    "NewerValue",
963                    "LastWriterWins should pick the newer timestamp"
964                );
965            }
966            _ => panic!("LastWriterWins should produce Clean merge"),
967        }
968    }
969
970    #[test]
971    fn test_strategy_custom_conditional() {
972        let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
973        // Custom strategy: keep ours if subject starts with "conflict", theirs otherwise
974        let result = merge_with_strategy(
975            &mut obj,
976            &mut commits,
977            &ca_id,
978            &cb_id,
979            "test",
980            &MergeStrategy::Custom(Box::new(|c| {
981                if c.subject.starts_with("conflict") {
982                    Resolution::KeepOurs
983                } else {
984                    Resolution::KeepTheirs
985                }
986            })),
987        )
988        .unwrap();
989        match result {
990            MergeResult::Clean(_) => {
991                let batches = obj
992                    .store
993                    .query(&arrow_graph_core::QuerySpec {
994                        subject: Some("conflict-subj".to_string()),
995                        ..Default::default()
996                    })
997                    .unwrap();
998                let batch = &batches[0];
999                let obj_col = batch
1000                    .column(col::OBJECT)
1001                    .as_any()
1002                    .downcast_ref::<arrow::array::StringArray>()
1003                    .unwrap();
1004                assert_eq!(
1005                    obj_col.value(0),
1006                    "TypeA",
1007                    "Custom should keep ours for conflict-* subjects"
1008                );
1009            }
1010            _ => panic!("Custom strategy should produce Clean merge"),
1011        }
1012    }
1013
1014    #[test]
1015    fn test_strategy_on_no_conflict_still_clean() {
1016        // When there are no conflicts, strategy doesn't matter — should still merge clean
1017        let tmp = tempfile::tempdir().unwrap();
1018        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
1019        let mut commits = CommitsTable::new();
1020
1021        obj.store
1022            .add_triple(&sample_triple("base", "Base"), "world", Some(1u8))
1023            .unwrap();
1024        let base = create_commit(&obj, &mut commits, vec![], "base", "test").unwrap();
1025
1026        // Non-conflicting branches
1027        obj.store
1028            .add_triple(&sample_triple("a-only", "A"), "world", Some(1u8))
1029            .unwrap();
1030        let ca = create_commit(
1031            &obj,
1032            &mut commits,
1033            vec![base.commit_id.clone()],
1034            "a",
1035            "test",
1036        )
1037        .unwrap();
1038
1039        checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
1040        obj.store
1041            .add_triple(&sample_triple("b-only", "B"), "work", Some(1u8))
1042            .unwrap();
1043        let cb = create_commit(
1044            &obj,
1045            &mut commits,
1046            vec![base.commit_id.clone()],
1047            "b",
1048            "test",
1049        )
1050        .unwrap();
1051
1052        let result = merge_with_strategy(
1053            &mut obj,
1054            &mut commits,
1055            &ca.commit_id,
1056            &cb.commit_id,
1057            "test",
1058            &MergeStrategy::Ours,
1059        )
1060        .unwrap();
1061        match result {
1062            MergeResult::Clean(mc) => {
1063                assert_eq!(mc.parent_ids.len(), 2);
1064                assert!(obj.store.len() >= 3);
1065            }
1066            _ => panic!("Non-conflicting merge with any strategy should be Clean"),
1067        }
1068    }
1069}