1use crate::checkout;
8use crate::commit::{Commit, CommitError, CommitsTable, create_commit};
9use crate::diff::{self, DiffEntry};
10use crate::history::find_common_ancestor;
11use crate::object_store::GitObjectStore;
12use arrow_graph_core::{Triple, col};
13use std::collections::{HashMap, HashSet};
14
15#[derive(Debug, Clone)]
17pub struct Conflict {
18 pub subject: String,
19 pub predicate: String,
20 pub namespace: String,
21 pub object_a: String,
23 pub object_b: String,
25}
26
27#[derive(Debug)]
29pub enum MergeResult {
30 Clean(Commit),
32 Conflict(Vec<Conflict>),
34 NoCommonAncestor,
36}
37
38#[derive(Debug, thiserror::Error)]
40pub enum MergeError {
41 #[error("Commit error: {0}")]
42 Commit(#[from] CommitError),
43
44 #[error("Store error: {0}")]
45 Store(#[from] arrow_graph_core::StoreError),
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum Resolution {
51 KeepOurs,
53 KeepTheirs,
55 KeepBoth,
57 Drop,
59}
60
61pub enum MergeStrategy {
63 Manual,
65 Ours,
67 Theirs,
69 LastWriterWins,
71 Custom(Box<dyn Fn(&Conflict) -> Resolution>),
73}
74
75#[derive(Debug, Clone, PartialEq, Eq, Hash)]
77struct ConflictKey {
78 subject: String,
79 predicate: String,
80 namespace: String,
81}
82
83pub fn merge(
90 obj_store: &mut GitObjectStore,
91 commits_table: &mut CommitsTable,
92 commit_a_id: &str,
93 commit_b_id: &str,
94 author: &str,
95) -> Result<MergeResult, MergeError> {
96 let ancestor = match find_common_ancestor(commits_table, commit_a_id, commit_b_id) {
98 Some(a) => a.commit_id.clone(),
99 None => return Ok(MergeResult::NoCommonAncestor),
100 };
101
102 let diff_a = diff::diff(obj_store, commits_table, &ancestor, commit_a_id)?;
104 let diff_b = diff::diff(obj_store, commits_table, &ancestor, commit_b_id)?;
106
107 let a_adds: HashMap<ConflictKey, &DiffEntry> = diff_a
109 .added
110 .iter()
111 .map(|e| {
112 (
113 ConflictKey {
114 subject: e.subject.clone(),
115 predicate: e.predicate.clone(),
116 namespace: e.namespace.clone(),
117 },
118 e,
119 )
120 })
121 .collect();
122
123 let mut conflicts = Vec::new();
124 for entry_b in &diff_b.added {
125 let key = ConflictKey {
126 subject: entry_b.subject.clone(),
127 predicate: entry_b.predicate.clone(),
128 namespace: entry_b.namespace.clone(),
129 };
130 if let Some(entry_a) = a_adds.get(&key)
131 && entry_a.object != entry_b.object
132 {
133 conflicts.push(Conflict {
134 subject: key.subject,
135 predicate: key.predicate,
136 namespace: key.namespace,
137 object_a: entry_a.object.clone(),
138 object_b: entry_b.object.clone(),
139 });
140 }
141 }
142
143 if !conflicts.is_empty() {
144 return Ok(MergeResult::Conflict(conflicts));
145 }
146
147 apply_clean_merge(
149 obj_store,
150 commits_table,
151 &ancestor,
152 commit_a_id,
153 commit_b_id,
154 &diff_a,
155 &diff_b,
156 author,
157 )
158}
159
160pub fn merge_with_strategy(
165 obj_store: &mut GitObjectStore,
166 commits_table: &mut CommitsTable,
167 commit_a_id: &str,
168 commit_b_id: &str,
169 author: &str,
170 strategy: &MergeStrategy,
171) -> Result<MergeResult, MergeError> {
172 let ancestor = match find_common_ancestor(commits_table, commit_a_id, commit_b_id) {
174 Some(a) => a.commit_id.clone(),
175 None => return Ok(MergeResult::NoCommonAncestor),
176 };
177
178 let diff_a = diff::diff(obj_store, commits_table, &ancestor, commit_a_id)?;
180 let diff_b = diff::diff(obj_store, commits_table, &ancestor, commit_b_id)?;
181
182 let a_adds: HashMap<ConflictKey, &DiffEntry> = diff_a
184 .added
185 .iter()
186 .map(|e| {
187 (
188 ConflictKey {
189 subject: e.subject.clone(),
190 predicate: e.predicate.clone(),
191 namespace: e.namespace.clone(),
192 },
193 e,
194 )
195 })
196 .collect();
197
198 let b_adds: HashMap<ConflictKey, &DiffEntry> = diff_b
199 .added
200 .iter()
201 .map(|e| {
202 (
203 ConflictKey {
204 subject: e.subject.clone(),
205 predicate: e.predicate.clone(),
206 namespace: e.namespace.clone(),
207 },
208 e,
209 )
210 })
211 .collect();
212
213 let mut conflicts = Vec::new();
214 for (key, entry_b) in &b_adds {
215 if let Some(entry_a) = a_adds.get(key)
216 && entry_a.object != entry_b.object
217 {
218 conflicts.push(Conflict {
219 subject: key.subject.clone(),
220 predicate: key.predicate.clone(),
221 namespace: key.namespace.clone(),
222 object_a: entry_a.object.clone(),
223 object_b: entry_b.object.clone(),
224 });
225 }
226 }
227
228 if conflicts.is_empty() {
230 return apply_clean_merge(
231 obj_store,
232 commits_table,
233 &ancestor,
234 commit_a_id,
235 commit_b_id,
236 &diff_a,
237 &diff_b,
238 author,
239 );
240 }
241
242 if matches!(strategy, MergeStrategy::Manual) {
244 return Ok(MergeResult::Conflict(conflicts));
245 }
246
247 let mut resolved_keys: HashMap<ConflictKey, Resolution> = HashMap::new();
249 for conflict in &conflicts {
250 let resolution = match strategy {
251 MergeStrategy::Manual => unreachable!(),
252 MergeStrategy::Ours => Resolution::KeepOurs,
253 MergeStrategy::Theirs => Resolution::KeepTheirs,
254 MergeStrategy::LastWriterWins => {
255 let key = ConflictKey {
257 subject: conflict.subject.clone(),
258 predicate: conflict.predicate.clone(),
259 namespace: conflict.namespace.clone(),
260 };
261 let ts_a = a_adds
262 .get(&key)
263 .and_then(|e| e.consolidated_at)
264 .unwrap_or(0);
265 let ts_b = b_adds
266 .get(&key)
267 .and_then(|e| e.consolidated_at)
268 .unwrap_or(0);
269 if ts_a >= ts_b {
270 Resolution::KeepOurs
271 } else {
272 Resolution::KeepTheirs
273 }
274 }
275 MergeStrategy::Custom(f) => f(conflict),
276 };
277 resolved_keys.insert(
278 ConflictKey {
279 subject: conflict.subject.clone(),
280 predicate: conflict.predicate.clone(),
281 namespace: conflict.namespace.clone(),
282 },
283 resolution,
284 );
285 }
286
287 checkout::checkout(obj_store, commits_table, &ancestor)?;
289
290 let mut all_adds: HashMap<(String, String, String, String), &DiffEntry> = HashMap::new();
292
293 for entry in diff_a.added.iter().chain(diff_b.added.iter()) {
295 let conflict_key = ConflictKey {
296 subject: entry.subject.clone(),
297 predicate: entry.predicate.clone(),
298 namespace: entry.namespace.clone(),
299 };
300
301 if let Some(resolution) = resolved_keys.get(&conflict_key) {
302 let spo_key = (
304 entry.subject.clone(),
305 entry.predicate.clone(),
306 entry.object.clone(),
307 entry.namespace.clone(),
308 );
309 match resolution {
310 Resolution::KeepOurs => {
311 if a_adds.contains_key(&conflict_key)
312 && a_adds.get(&conflict_key).map(|e| &e.object) == Some(&entry.object)
313 {
314 all_adds.insert(spo_key, entry);
315 }
316 }
317 Resolution::KeepTheirs => {
318 if b_adds.contains_key(&conflict_key)
319 && b_adds.get(&conflict_key).map(|e| &e.object) == Some(&entry.object)
320 {
321 all_adds.insert(spo_key, entry);
322 }
323 }
324 Resolution::KeepBoth => {
325 all_adds.insert(spo_key, entry);
326 }
327 Resolution::Drop => {
328 }
330 }
331 } else {
332 let key = (
334 entry.subject.clone(),
335 entry.predicate.clone(),
336 entry.object.clone(),
337 entry.namespace.clone(),
338 );
339 all_adds.entry(key).or_insert(entry);
340 }
341 }
342
343 for entry in all_adds.values() {
345 let triple = Triple {
346 subject: entry.subject.clone(),
347 predicate: entry.predicate.clone(),
348 object: entry.object.clone(),
349 graph: entry.graph.clone(),
350 confidence: entry.confidence,
351 source_document: entry.source_document.clone(),
352 source_chunk_id: entry.source_chunk_id.clone(),
353 extracted_by: Some(format!("merge by {author}")),
354 caused_by: entry.caused_by.clone(),
355 derived_from: entry.derived_from.clone(),
356 consolidated_at: entry.consolidated_at,
357 };
358 obj_store
359 .store
360 .add_triple(&triple, &entry.namespace, Some(entry.y_layer))?;
361 }
362
363 apply_removals(obj_store, &diff_a, &diff_b);
365
366 let merge_commit = create_commit(
368 obj_store,
369 commits_table,
370 vec![commit_a_id.to_string(), commit_b_id.to_string()],
371 &format!("Merge {} into {} (resolved)", commit_b_id, commit_a_id),
372 author,
373 )?;
374
375 Ok(MergeResult::Clean(merge_commit))
376}
377
378#[allow(clippy::too_many_arguments)]
380fn apply_clean_merge(
381 obj_store: &mut GitObjectStore,
382 commits_table: &mut CommitsTable,
383 ancestor: &str,
384 commit_a_id: &str,
385 commit_b_id: &str,
386 diff_a: &diff::DiffResult,
387 diff_b: &diff::DiffResult,
388 author: &str,
389) -> Result<MergeResult, MergeError> {
390 checkout::checkout(obj_store, commits_table, ancestor)?;
391
392 let mut all_adds: HashMap<(String, String, String, String), &DiffEntry> = HashMap::new();
393 for entry in diff_a.added.iter().chain(diff_b.added.iter()) {
394 let key = (
395 entry.subject.clone(),
396 entry.predicate.clone(),
397 entry.object.clone(),
398 entry.namespace.clone(),
399 );
400 all_adds.entry(key).or_insert(entry);
401 }
402
403 for entry in all_adds.values() {
404 let triple = Triple {
405 subject: entry.subject.clone(),
406 predicate: entry.predicate.clone(),
407 object: entry.object.clone(),
408 graph: entry.graph.clone(),
409 confidence: entry.confidence,
410 source_document: entry.source_document.clone(),
411 source_chunk_id: entry.source_chunk_id.clone(),
412 extracted_by: Some(format!("merge by {author}")),
413 caused_by: entry.caused_by.clone(),
414 derived_from: entry.derived_from.clone(),
415 consolidated_at: entry.consolidated_at,
416 };
417 obj_store
418 .store
419 .add_triple(&triple, &entry.namespace, Some(entry.y_layer))?;
420 }
421
422 apply_removals(obj_store, diff_a, diff_b);
423
424 let merge_commit = create_commit(
425 obj_store,
426 commits_table,
427 vec![commit_a_id.to_string(), commit_b_id.to_string()],
428 &format!("Merge {} into {}", commit_b_id, commit_a_id),
429 author,
430 )?;
431
432 Ok(MergeResult::Clean(merge_commit))
433}
434
435fn apply_removals(
437 obj_store: &mut GitObjectStore,
438 diff_a: &diff::DiffResult,
439 diff_b: &diff::DiffResult,
440) {
441 let all_removals: HashSet<(String, String, String, String)> = diff_a
442 .removed
443 .iter()
444 .chain(diff_b.removed.iter())
445 .map(|e| {
446 (
447 e.subject.clone(),
448 e.predicate.clone(),
449 e.object.clone(),
450 e.namespace.clone(),
451 )
452 })
453 .collect();
454
455 for ns in obj_store.store.namespaces().to_vec() {
456 let batches = obj_store.store.get_namespace_batches(&ns);
457 let ns_str = ns.clone();
458
459 let mut ids_to_delete = Vec::new();
460 for batch in batches {
461 let id_col = batch
462 .column(col::TRIPLE_ID)
463 .as_any()
464 .downcast_ref::<arrow::array::StringArray>()
465 .expect("triple_id column");
466 let subj_col = batch
467 .column(col::SUBJECT)
468 .as_any()
469 .downcast_ref::<arrow::array::StringArray>()
470 .expect("subject column");
471 let pred_col = batch
472 .column(col::PREDICATE)
473 .as_any()
474 .downcast_ref::<arrow::array::StringArray>()
475 .expect("predicate column");
476 let obj_col = batch
477 .column(col::OBJECT)
478 .as_any()
479 .downcast_ref::<arrow::array::StringArray>()
480 .expect("object column");
481
482 for i in 0..batch.num_rows() {
483 let key = (
484 subj_col.value(i).to_string(),
485 pred_col.value(i).to_string(),
486 obj_col.value(i).to_string(),
487 ns_str.clone(),
488 );
489 if all_removals.contains(&key) {
490 ids_to_delete.push(id_col.value(i).to_string());
491 }
492 }
493 }
494
495 for id in &ids_to_delete {
496 let _ = obj_store.store.delete(id);
497 }
498 }
499}
500
501#[cfg(test)]
502mod tests {
503 use super::*;
504 use crate::commit::create_commit;
505
506 fn sample_triple(subj: &str, obj: &str) -> Triple {
507 Triple {
508 subject: subj.to_string(),
509 predicate: "rdf:type".to_string(),
510 object: obj.to_string(),
511 graph: None,
512 confidence: Some(0.9),
513 source_document: None,
514 source_chunk_id: None,
515 extracted_by: None,
516 caused_by: None,
517 derived_from: None,
518 consolidated_at: None,
519 }
520 }
521
522 #[test]
523 fn test_non_conflicting_merge() {
524 let tmp = tempfile::tempdir().unwrap();
525 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
526 let mut commits = CommitsTable::new();
527
528 obj.store
530 .add_triple(&sample_triple("base", "Base"), "world", Some(1u8))
531 .unwrap();
532 let base = create_commit(&obj, &mut commits, vec![], "base", "test").unwrap();
533
534 obj.store
536 .add_triple(&sample_triple("a-only", "A"), "world", Some(1u8))
537 .unwrap();
538 let ca = create_commit(
539 &obj,
540 &mut commits,
541 vec![base.commit_id.clone()],
542 "branch-a",
543 "test",
544 )
545 .unwrap();
546
547 checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
549 obj.store
550 .add_triple(&sample_triple("b-only", "B"), "world", Some(1u8))
551 .unwrap();
552 let cb = create_commit(
553 &obj,
554 &mut commits,
555 vec![base.commit_id.clone()],
556 "branch-b",
557 "test",
558 )
559 .unwrap();
560
561 let result = merge(&mut obj, &mut commits, &ca.commit_id, &cb.commit_id, "test").unwrap();
563
564 match result {
565 MergeResult::Clean(mc) => {
566 assert_eq!(mc.parent_ids.len(), 2);
567 assert!(obj.store.len() >= 3);
569 }
570 MergeResult::Conflict(_) => panic!("Expected clean merge"),
571 MergeResult::NoCommonAncestor => panic!("Expected common ancestor"),
572 }
573 }
574
575 #[test]
576 fn test_conflicting_merge() {
577 let tmp = tempfile::tempdir().unwrap();
578 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
579 let mut commits = CommitsTable::new();
580
581 obj.store
583 .add_triple(&sample_triple("base", "Base"), "world", Some(1u8))
584 .unwrap();
585 let base = create_commit(&obj, &mut commits, vec![], "base", "test").unwrap();
586
587 obj.store
589 .add_triple(&sample_triple("conflict-subj", "TypeA"), "world", Some(1u8))
590 .unwrap();
591 let ca = create_commit(
592 &obj,
593 &mut commits,
594 vec![base.commit_id.clone()],
595 "branch-a",
596 "test",
597 )
598 .unwrap();
599
600 checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
602 obj.store
603 .add_triple(&sample_triple("conflict-subj", "TypeB"), "world", Some(1u8))
604 .unwrap();
605 let cb = create_commit(
606 &obj,
607 &mut commits,
608 vec![base.commit_id.clone()],
609 "branch-b",
610 "test",
611 )
612 .unwrap();
613
614 let result = merge(&mut obj, &mut commits, &ca.commit_id, &cb.commit_id, "test").unwrap();
616
617 match result {
618 MergeResult::Conflict(conflicts) => {
619 assert_eq!(conflicts.len(), 1);
620 assert_eq!(conflicts[0].subject, "conflict-subj");
621 assert_eq!(conflicts[0].object_a, "TypeA");
622 assert_eq!(conflicts[0].object_b, "TypeB");
623 }
624 _ => panic!("Expected conflict"),
625 }
626 }
627
628 #[test]
629 fn test_merge_commit_has_two_parents() {
630 let tmp = tempfile::tempdir().unwrap();
631 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
632 let mut commits = CommitsTable::new();
633
634 obj.store
635 .add_triple(&sample_triple("base", "Base"), "world", Some(1u8))
636 .unwrap();
637 let base = create_commit(&obj, &mut commits, vec![], "base", "test").unwrap();
638
639 obj.store
641 .add_triple(&sample_triple("a", "A"), "world", Some(1u8))
642 .unwrap();
643 let ca = create_commit(
644 &obj,
645 &mut commits,
646 vec![base.commit_id.clone()],
647 "a",
648 "test",
649 )
650 .unwrap();
651
652 checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
653 obj.store
654 .add_triple(&sample_triple("b", "B"), "work", Some(1u8))
655 .unwrap();
656 let cb = create_commit(
657 &obj,
658 &mut commits,
659 vec![base.commit_id.clone()],
660 "b",
661 "test",
662 )
663 .unwrap();
664
665 let result = merge(&mut obj, &mut commits, &ca.commit_id, &cb.commit_id, "test").unwrap();
666
667 match result {
668 MergeResult::Clean(mc) => {
669 assert_eq!(mc.parent_ids.len(), 2);
670 assert!(mc.parent_ids.contains(&ca.commit_id));
671 assert!(mc.parent_ids.contains(&cb.commit_id));
672 }
673 _ => panic!("Expected clean merge"),
674 }
675 }
676
677 fn setup_conflict_scenario() -> (GitObjectStore, CommitsTable, String, String) {
680 let tmp = tempfile::tempdir().unwrap();
681 let tmp_path = tmp.path().to_owned();
683 std::mem::forget(tmp);
684 let mut obj = GitObjectStore::with_snapshot_dir(&tmp_path);
685 let mut commits = CommitsTable::new();
686
687 obj.store
689 .add_triple(&sample_triple("base", "Base"), "world", Some(1u8))
690 .unwrap();
691 let base = create_commit(&obj, &mut commits, vec![], "base", "test").unwrap();
692
693 obj.store
695 .add_triple(&sample_triple("conflict-subj", "TypeA"), "world", Some(1u8))
696 .unwrap();
697 let ca = create_commit(
698 &obj,
699 &mut commits,
700 vec![base.commit_id.clone()],
701 "branch-a",
702 "test",
703 )
704 .unwrap();
705
706 checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
708 obj.store
709 .add_triple(&sample_triple("conflict-subj", "TypeB"), "world", Some(1u8))
710 .unwrap();
711 let cb = create_commit(
712 &obj,
713 &mut commits,
714 vec![base.commit_id.clone()],
715 "branch-b",
716 "test",
717 )
718 .unwrap();
719
720 (obj, commits, ca.commit_id, cb.commit_id)
721 }
722
723 #[test]
724 fn test_strategy_manual_returns_conflict() {
725 let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
726 let result = merge_with_strategy(
727 &mut obj,
728 &mut commits,
729 &ca_id,
730 &cb_id,
731 "test",
732 &MergeStrategy::Manual,
733 )
734 .unwrap();
735 match result {
736 MergeResult::Conflict(conflicts) => {
737 assert_eq!(conflicts.len(), 1);
738 assert_eq!(conflicts[0].subject, "conflict-subj");
739 }
740 _ => panic!("Manual strategy should return Conflict"),
741 }
742 }
743
744 #[test]
745 fn test_strategy_ours() {
746 let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
747 let result = merge_with_strategy(
748 &mut obj,
749 &mut commits,
750 &ca_id,
751 &cb_id,
752 "test",
753 &MergeStrategy::Ours,
754 )
755 .unwrap();
756 match result {
757 MergeResult::Clean(_) => {
758 let batches = obj
760 .store
761 .query(&arrow_graph_core::QuerySpec {
762 subject: Some("conflict-subj".to_string()),
763 ..Default::default()
764 })
765 .unwrap();
766 let total: usize = batches.iter().map(|b| b.num_rows()).sum();
767 assert_eq!(total, 1, "Should have exactly one triple for conflict-subj");
768 let batch = &batches[0];
770 let obj_col = batch
771 .column(col::OBJECT)
772 .as_any()
773 .downcast_ref::<arrow::array::StringArray>()
774 .unwrap();
775 assert_eq!(obj_col.value(0), "TypeA");
776 }
777 _ => panic!("Ours strategy should produce Clean merge"),
778 }
779 }
780
781 #[test]
782 fn test_strategy_theirs() {
783 let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
784 let result = merge_with_strategy(
785 &mut obj,
786 &mut commits,
787 &ca_id,
788 &cb_id,
789 "test",
790 &MergeStrategy::Theirs,
791 )
792 .unwrap();
793 match result {
794 MergeResult::Clean(_) => {
795 let batches = obj
796 .store
797 .query(&arrow_graph_core::QuerySpec {
798 subject: Some("conflict-subj".to_string()),
799 ..Default::default()
800 })
801 .unwrap();
802 let total: usize = batches.iter().map(|b| b.num_rows()).sum();
803 assert_eq!(total, 1, "Should have exactly one triple for conflict-subj");
804 let batch = &batches[0];
805 let obj_col = batch
806 .column(col::OBJECT)
807 .as_any()
808 .downcast_ref::<arrow::array::StringArray>()
809 .unwrap();
810 assert_eq!(obj_col.value(0), "TypeB");
811 }
812 _ => panic!("Theirs strategy should produce Clean merge"),
813 }
814 }
815
816 #[test]
817 fn test_strategy_keep_both() {
818 let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
819 let result = merge_with_strategy(
820 &mut obj,
821 &mut commits,
822 &ca_id,
823 &cb_id,
824 "test",
825 &MergeStrategy::Custom(Box::new(|_| Resolution::KeepBoth)),
826 )
827 .unwrap();
828 match result {
829 MergeResult::Clean(_) => {
830 let batches = obj
831 .store
832 .query(&arrow_graph_core::QuerySpec {
833 subject: Some("conflict-subj".to_string()),
834 ..Default::default()
835 })
836 .unwrap();
837 let total: usize = batches.iter().map(|b| b.num_rows()).sum();
838 assert_eq!(total, 2, "KeepBoth should preserve both triples");
839 }
840 _ => panic!("Custom(KeepBoth) strategy should produce Clean merge"),
841 }
842 }
843
844 #[test]
845 fn test_strategy_drop() {
846 let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
847 let result = merge_with_strategy(
848 &mut obj,
849 &mut commits,
850 &ca_id,
851 &cb_id,
852 "test",
853 &MergeStrategy::Custom(Box::new(|_| Resolution::Drop)),
854 )
855 .unwrap();
856 match result {
857 MergeResult::Clean(_) => {
858 let batches = obj
859 .store
860 .query(&arrow_graph_core::QuerySpec {
861 subject: Some("conflict-subj".to_string()),
862 ..Default::default()
863 })
864 .unwrap();
865 let total: usize = batches.iter().map(|b| b.num_rows()).sum();
866 assert_eq!(total, 0, "Drop should remove both conflicting triples");
867 }
868 _ => panic!("Custom(Drop) strategy should produce Clean merge"),
869 }
870 }
871
872 #[test]
873 fn test_strategy_last_writer_wins() {
874 let tmp = tempfile::tempdir().unwrap();
875 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
876 let mut commits = CommitsTable::new();
877
878 obj.store
880 .add_triple(&sample_triple("base", "Base"), "world", Some(1u8))
881 .unwrap();
882 let base = create_commit(&obj, &mut commits, vec![], "base", "test").unwrap();
883
884 let triple_a = Triple {
886 subject: "ts-subj".to_string(),
887 predicate: "rdf:type".to_string(),
888 object: "OlderValue".to_string(),
889 graph: None,
890 confidence: Some(0.9),
891 source_document: None,
892 source_chunk_id: None,
893 extracted_by: None,
894 caused_by: None,
895 derived_from: None,
896 consolidated_at: Some(1000), };
898 obj.store.add_triple(&triple_a, "world", Some(1u8)).unwrap();
899 let ca = create_commit(
900 &obj,
901 &mut commits,
902 vec![base.commit_id.clone()],
903 "older",
904 "test",
905 )
906 .unwrap();
907
908 checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
910 let triple_b = Triple {
911 subject: "ts-subj".to_string(),
912 predicate: "rdf:type".to_string(),
913 object: "NewerValue".to_string(),
914 graph: None,
915 confidence: Some(0.9),
916 source_document: None,
917 source_chunk_id: None,
918 extracted_by: None,
919 caused_by: None,
920 derived_from: None,
921 consolidated_at: Some(2000), };
923 obj.store.add_triple(&triple_b, "world", Some(1u8)).unwrap();
924 let cb = create_commit(
925 &obj,
926 &mut commits,
927 vec![base.commit_id.clone()],
928 "newer",
929 "test",
930 )
931 .unwrap();
932
933 let result = merge_with_strategy(
934 &mut obj,
935 &mut commits,
936 &ca.commit_id,
937 &cb.commit_id,
938 "test",
939 &MergeStrategy::LastWriterWins,
940 )
941 .unwrap();
942
943 match result {
944 MergeResult::Clean(_) => {
945 let batches = obj
946 .store
947 .query(&arrow_graph_core::QuerySpec {
948 subject: Some("ts-subj".to_string()),
949 ..Default::default()
950 })
951 .unwrap();
952 let total: usize = batches.iter().map(|b| b.num_rows()).sum();
953 assert_eq!(total, 1);
954 let batch = &batches[0];
955 let obj_col = batch
956 .column(col::OBJECT)
957 .as_any()
958 .downcast_ref::<arrow::array::StringArray>()
959 .unwrap();
960 assert_eq!(
961 obj_col.value(0),
962 "NewerValue",
963 "LastWriterWins should pick the newer timestamp"
964 );
965 }
966 _ => panic!("LastWriterWins should produce Clean merge"),
967 }
968 }
969
970 #[test]
971 fn test_strategy_custom_conditional() {
972 let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
973 let result = merge_with_strategy(
975 &mut obj,
976 &mut commits,
977 &ca_id,
978 &cb_id,
979 "test",
980 &MergeStrategy::Custom(Box::new(|c| {
981 if c.subject.starts_with("conflict") {
982 Resolution::KeepOurs
983 } else {
984 Resolution::KeepTheirs
985 }
986 })),
987 )
988 .unwrap();
989 match result {
990 MergeResult::Clean(_) => {
991 let batches = obj
992 .store
993 .query(&arrow_graph_core::QuerySpec {
994 subject: Some("conflict-subj".to_string()),
995 ..Default::default()
996 })
997 .unwrap();
998 let batch = &batches[0];
999 let obj_col = batch
1000 .column(col::OBJECT)
1001 .as_any()
1002 .downcast_ref::<arrow::array::StringArray>()
1003 .unwrap();
1004 assert_eq!(
1005 obj_col.value(0),
1006 "TypeA",
1007 "Custom should keep ours for conflict-* subjects"
1008 );
1009 }
1010 _ => panic!("Custom strategy should produce Clean merge"),
1011 }
1012 }
1013
1014 #[test]
1015 fn test_strategy_on_no_conflict_still_clean() {
1016 let tmp = tempfile::tempdir().unwrap();
1018 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
1019 let mut commits = CommitsTable::new();
1020
1021 obj.store
1022 .add_triple(&sample_triple("base", "Base"), "world", Some(1u8))
1023 .unwrap();
1024 let base = create_commit(&obj, &mut commits, vec![], "base", "test").unwrap();
1025
1026 obj.store
1028 .add_triple(&sample_triple("a-only", "A"), "world", Some(1u8))
1029 .unwrap();
1030 let ca = create_commit(
1031 &obj,
1032 &mut commits,
1033 vec![base.commit_id.clone()],
1034 "a",
1035 "test",
1036 )
1037 .unwrap();
1038
1039 checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
1040 obj.store
1041 .add_triple(&sample_triple("b-only", "B"), "work", Some(1u8))
1042 .unwrap();
1043 let cb = create_commit(
1044 &obj,
1045 &mut commits,
1046 vec![base.commit_id.clone()],
1047 "b",
1048 "test",
1049 )
1050 .unwrap();
1051
1052 let result = merge_with_strategy(
1053 &mut obj,
1054 &mut commits,
1055 &ca.commit_id,
1056 &cb.commit_id,
1057 "test",
1058 &MergeStrategy::Ours,
1059 )
1060 .unwrap();
1061 match result {
1062 MergeResult::Clean(mc) => {
1063 assert_eq!(mc.parent_ids.len(), 2);
1064 assert!(obj.store.len() >= 3);
1065 }
1066 _ => panic!("Non-conflicting merge with any strategy should be Clean"),
1067 }
1068 }
1069}