1use crate::checkout;
8use crate::commit::{Commit, CommitError, CommitsTable, create_commit};
9use crate::diff::{self, DiffEntry};
10use crate::history::find_common_ancestor;
11use crate::object_store::GitObjectStore;
12use nusy_arrow_core::{Namespace, Triple, YLayer, col};
13use std::collections::{HashMap, HashSet};
14
15#[derive(Debug, Clone)]
17pub struct Conflict {
18 pub subject: String,
19 pub predicate: String,
20 pub namespace: String,
21 pub object_a: String,
23 pub object_b: String,
25}
26
27#[derive(Debug)]
29pub enum MergeResult {
30 Clean(Commit),
32 Conflict(Vec<Conflict>),
34 NoCommonAncestor,
36}
37
38#[derive(Debug, thiserror::Error)]
40pub enum MergeError {
41 #[error("Commit error: {0}")]
42 Commit(#[from] CommitError),
43
44 #[error("Store error: {0}")]
45 Store(#[from] nusy_arrow_core::StoreError),
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum Resolution {
51 KeepOurs,
53 KeepTheirs,
55 KeepBoth,
57 Drop,
59}
60
61pub enum MergeStrategy {
63 Manual,
65 Ours,
67 Theirs,
69 LastWriterWins,
71 Custom(Box<dyn Fn(&Conflict) -> Resolution>),
73}
74
75#[derive(Debug, Clone, PartialEq, Eq, Hash)]
77struct ConflictKey {
78 subject: String,
79 predicate: String,
80 namespace: String,
81}
82
83pub fn merge(
90 obj_store: &mut GitObjectStore,
91 commits_table: &mut CommitsTable,
92 commit_a_id: &str,
93 commit_b_id: &str,
94 author: &str,
95) -> Result<MergeResult, MergeError> {
96 let ancestor = match find_common_ancestor(commits_table, commit_a_id, commit_b_id) {
98 Some(a) => a.commit_id.clone(),
99 None => return Ok(MergeResult::NoCommonAncestor),
100 };
101
102 let diff_a = diff::diff(obj_store, commits_table, &ancestor, commit_a_id)?;
104 let diff_b = diff::diff(obj_store, commits_table, &ancestor, commit_b_id)?;
106
107 let a_adds: HashMap<ConflictKey, &DiffEntry> = diff_a
109 .added
110 .iter()
111 .map(|e| {
112 (
113 ConflictKey {
114 subject: e.subject.clone(),
115 predicate: e.predicate.clone(),
116 namespace: e.namespace.clone(),
117 },
118 e,
119 )
120 })
121 .collect();
122
123 let mut conflicts = Vec::new();
124 for entry_b in &diff_b.added {
125 let key = ConflictKey {
126 subject: entry_b.subject.clone(),
127 predicate: entry_b.predicate.clone(),
128 namespace: entry_b.namespace.clone(),
129 };
130 if let Some(entry_a) = a_adds.get(&key)
131 && entry_a.object != entry_b.object
132 {
133 conflicts.push(Conflict {
134 subject: key.subject,
135 predicate: key.predicate,
136 namespace: key.namespace,
137 object_a: entry_a.object.clone(),
138 object_b: entry_b.object.clone(),
139 });
140 }
141 }
142
143 if !conflicts.is_empty() {
144 return Ok(MergeResult::Conflict(conflicts));
145 }
146
147 apply_clean_merge(
149 obj_store,
150 commits_table,
151 &ancestor,
152 commit_a_id,
153 commit_b_id,
154 &diff_a,
155 &diff_b,
156 author,
157 )
158}
159
160pub fn merge_with_strategy(
165 obj_store: &mut GitObjectStore,
166 commits_table: &mut CommitsTable,
167 commit_a_id: &str,
168 commit_b_id: &str,
169 author: &str,
170 strategy: &MergeStrategy,
171) -> Result<MergeResult, MergeError> {
172 let ancestor = match find_common_ancestor(commits_table, commit_a_id, commit_b_id) {
174 Some(a) => a.commit_id.clone(),
175 None => return Ok(MergeResult::NoCommonAncestor),
176 };
177
178 let diff_a = diff::diff(obj_store, commits_table, &ancestor, commit_a_id)?;
180 let diff_b = diff::diff(obj_store, commits_table, &ancestor, commit_b_id)?;
181
182 let a_adds: HashMap<ConflictKey, &DiffEntry> = diff_a
184 .added
185 .iter()
186 .map(|e| {
187 (
188 ConflictKey {
189 subject: e.subject.clone(),
190 predicate: e.predicate.clone(),
191 namespace: e.namespace.clone(),
192 },
193 e,
194 )
195 })
196 .collect();
197
198 let b_adds: HashMap<ConflictKey, &DiffEntry> = diff_b
199 .added
200 .iter()
201 .map(|e| {
202 (
203 ConflictKey {
204 subject: e.subject.clone(),
205 predicate: e.predicate.clone(),
206 namespace: e.namespace.clone(),
207 },
208 e,
209 )
210 })
211 .collect();
212
213 let mut conflicts = Vec::new();
214 for (key, entry_b) in &b_adds {
215 if let Some(entry_a) = a_adds.get(key)
216 && entry_a.object != entry_b.object
217 {
218 conflicts.push(Conflict {
219 subject: key.subject.clone(),
220 predicate: key.predicate.clone(),
221 namespace: key.namespace.clone(),
222 object_a: entry_a.object.clone(),
223 object_b: entry_b.object.clone(),
224 });
225 }
226 }
227
228 if conflicts.is_empty() {
230 return apply_clean_merge(
231 obj_store,
232 commits_table,
233 &ancestor,
234 commit_a_id,
235 commit_b_id,
236 &diff_a,
237 &diff_b,
238 author,
239 );
240 }
241
242 if matches!(strategy, MergeStrategy::Manual) {
244 return Ok(MergeResult::Conflict(conflicts));
245 }
246
247 let mut resolved_keys: HashMap<ConflictKey, Resolution> = HashMap::new();
249 for conflict in &conflicts {
250 let resolution = match strategy {
251 MergeStrategy::Manual => unreachable!(),
252 MergeStrategy::Ours => Resolution::KeepOurs,
253 MergeStrategy::Theirs => Resolution::KeepTheirs,
254 MergeStrategy::LastWriterWins => {
255 let key = ConflictKey {
257 subject: conflict.subject.clone(),
258 predicate: conflict.predicate.clone(),
259 namespace: conflict.namespace.clone(),
260 };
261 let ts_a = a_adds
262 .get(&key)
263 .and_then(|e| e.consolidated_at)
264 .unwrap_or(0);
265 let ts_b = b_adds
266 .get(&key)
267 .and_then(|e| e.consolidated_at)
268 .unwrap_or(0);
269 if ts_a >= ts_b {
270 Resolution::KeepOurs
271 } else {
272 Resolution::KeepTheirs
273 }
274 }
275 MergeStrategy::Custom(f) => f(conflict),
276 };
277 resolved_keys.insert(
278 ConflictKey {
279 subject: conflict.subject.clone(),
280 predicate: conflict.predicate.clone(),
281 namespace: conflict.namespace.clone(),
282 },
283 resolution,
284 );
285 }
286
287 checkout::checkout(obj_store, commits_table, &ancestor)?;
289
290 let mut all_adds: HashMap<(String, String, String, String), &DiffEntry> = HashMap::new();
292
293 for entry in diff_a.added.iter().chain(diff_b.added.iter()) {
295 let conflict_key = ConflictKey {
296 subject: entry.subject.clone(),
297 predicate: entry.predicate.clone(),
298 namespace: entry.namespace.clone(),
299 };
300
301 if let Some(resolution) = resolved_keys.get(&conflict_key) {
302 let spo_key = (
304 entry.subject.clone(),
305 entry.predicate.clone(),
306 entry.object.clone(),
307 entry.namespace.clone(),
308 );
309 match resolution {
310 Resolution::KeepOurs => {
311 if a_adds.contains_key(&conflict_key)
312 && a_adds.get(&conflict_key).map(|e| &e.object) == Some(&entry.object)
313 {
314 all_adds.insert(spo_key, entry);
315 }
316 }
317 Resolution::KeepTheirs => {
318 if b_adds.contains_key(&conflict_key)
319 && b_adds.get(&conflict_key).map(|e| &e.object) == Some(&entry.object)
320 {
321 all_adds.insert(spo_key, entry);
322 }
323 }
324 Resolution::KeepBoth => {
325 all_adds.insert(spo_key, entry);
326 }
327 Resolution::Drop => {
328 }
330 }
331 } else {
332 let key = (
334 entry.subject.clone(),
335 entry.predicate.clone(),
336 entry.object.clone(),
337 entry.namespace.clone(),
338 );
339 all_adds.entry(key).or_insert(entry);
340 }
341 }
342
343 for entry in all_adds.values() {
345 let ns = Namespace::from_str_loose(&entry.namespace).unwrap_or(Namespace::World);
346 let y_layer = YLayer::from_u8(entry.y_layer).unwrap_or(YLayer::Semantic);
347 let triple = Triple {
348 subject: entry.subject.clone(),
349 predicate: entry.predicate.clone(),
350 object: entry.object.clone(),
351 graph: entry.graph.clone(),
352 confidence: entry.confidence,
353 source_document: entry.source_document.clone(),
354 source_chunk_id: entry.source_chunk_id.clone(),
355 extracted_by: Some(format!("merge by {author}")),
356 caused_by: entry.caused_by.clone(),
357 derived_from: entry.derived_from.clone(),
358 consolidated_at: entry.consolidated_at,
359 certifiability_class: entry.certifiability_class.clone(),
360 };
361 obj_store.store.add_triple(&triple, ns, y_layer)?;
362 }
363
364 apply_removals(obj_store, &diff_a, &diff_b);
366
367 let merge_commit = create_commit(
369 obj_store,
370 commits_table,
371 vec![commit_a_id.to_string(), commit_b_id.to_string()],
372 &format!("Merge {} into {} (resolved)", commit_b_id, commit_a_id),
373 author,
374 )?;
375
376 Ok(MergeResult::Clean(merge_commit))
377}
378
379#[allow(clippy::too_many_arguments)]
381fn apply_clean_merge(
382 obj_store: &mut GitObjectStore,
383 commits_table: &mut CommitsTable,
384 ancestor: &str,
385 commit_a_id: &str,
386 commit_b_id: &str,
387 diff_a: &diff::DiffResult,
388 diff_b: &diff::DiffResult,
389 author: &str,
390) -> Result<MergeResult, MergeError> {
391 checkout::checkout(obj_store, commits_table, ancestor)?;
392
393 let mut all_adds: HashMap<(String, String, String, String), &DiffEntry> = HashMap::new();
394 for entry in diff_a.added.iter().chain(diff_b.added.iter()) {
395 let key = (
396 entry.subject.clone(),
397 entry.predicate.clone(),
398 entry.object.clone(),
399 entry.namespace.clone(),
400 );
401 all_adds.entry(key).or_insert(entry);
402 }
403
404 for entry in all_adds.values() {
405 let ns = Namespace::from_str_loose(&entry.namespace).unwrap_or(Namespace::World);
406 let y_layer = YLayer::from_u8(entry.y_layer).unwrap_or(YLayer::Semantic);
407 let triple = Triple {
408 subject: entry.subject.clone(),
409 predicate: entry.predicate.clone(),
410 object: entry.object.clone(),
411 graph: entry.graph.clone(),
412 confidence: entry.confidence,
413 source_document: entry.source_document.clone(),
414 source_chunk_id: entry.source_chunk_id.clone(),
415 extracted_by: Some(format!("merge by {author}")),
416 caused_by: entry.caused_by.clone(),
417 derived_from: entry.derived_from.clone(),
418 consolidated_at: entry.consolidated_at,
419 certifiability_class: entry.certifiability_class.clone(),
420 };
421 obj_store.store.add_triple(&triple, ns, y_layer)?;
422 }
423
424 apply_removals(obj_store, diff_a, diff_b);
425
426 let merge_commit = create_commit(
427 obj_store,
428 commits_table,
429 vec![commit_a_id.to_string(), commit_b_id.to_string()],
430 &format!("Merge {} into {}", commit_b_id, commit_a_id),
431 author,
432 )?;
433
434 Ok(MergeResult::Clean(merge_commit))
435}
436
437fn apply_removals(
439 obj_store: &mut GitObjectStore,
440 diff_a: &diff::DiffResult,
441 diff_b: &diff::DiffResult,
442) {
443 let all_removals: HashSet<(String, String, String, String)> = diff_a
444 .removed
445 .iter()
446 .chain(diff_b.removed.iter())
447 .map(|e| {
448 (
449 e.subject.clone(),
450 e.predicate.clone(),
451 e.object.clone(),
452 e.namespace.clone(),
453 )
454 })
455 .collect();
456
457 for ns in Namespace::ALL {
458 let batches = obj_store.store.get_namespace_batches(ns);
459 let ns_str = ns.as_str().to_string();
460
461 let mut ids_to_delete = Vec::new();
462 for batch in batches {
463 let id_col = batch
464 .column(col::TRIPLE_ID)
465 .as_any()
466 .downcast_ref::<arrow::array::StringArray>()
467 .expect("triple_id column");
468 let subj_col = batch
469 .column(col::SUBJECT)
470 .as_any()
471 .downcast_ref::<arrow::array::StringArray>()
472 .expect("subject column");
473 let pred_col = batch
474 .column(col::PREDICATE)
475 .as_any()
476 .downcast_ref::<arrow::array::StringArray>()
477 .expect("predicate column");
478 let obj_col = batch
479 .column(col::OBJECT)
480 .as_any()
481 .downcast_ref::<arrow::array::StringArray>()
482 .expect("object column");
483
484 for i in 0..batch.num_rows() {
485 let key = (
486 subj_col.value(i).to_string(),
487 pred_col.value(i).to_string(),
488 obj_col.value(i).to_string(),
489 ns_str.clone(),
490 );
491 if all_removals.contains(&key) {
492 ids_to_delete.push(id_col.value(i).to_string());
493 }
494 }
495 }
496
497 for id in &ids_to_delete {
498 let _ = obj_store.store.delete(id);
499 }
500 }
501}
502
503#[cfg(test)]
504mod tests {
505 use super::*;
506 use crate::commit::create_commit;
507
508 fn sample_triple(subj: &str, obj: &str) -> Triple {
509 Triple {
510 subject: subj.to_string(),
511 predicate: "rdf:type".to_string(),
512 object: obj.to_string(),
513 graph: None,
514 confidence: Some(0.9),
515 source_document: None,
516 source_chunk_id: None,
517 extracted_by: None,
518 caused_by: None,
519 derived_from: None,
520 consolidated_at: None,
521 certifiability_class: None,
522 }
523 }
524
525 #[test]
526 fn test_non_conflicting_merge() {
527 let tmp = tempfile::tempdir().unwrap();
528 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
529 let mut commits = CommitsTable::new();
530
531 obj.store
533 .add_triple(
534 &sample_triple("base", "Base"),
535 Namespace::World,
536 YLayer::Semantic,
537 )
538 .unwrap();
539 let base = create_commit(&obj, &mut commits, vec![], "base", "DGX").unwrap();
540
541 obj.store
543 .add_triple(
544 &sample_triple("a-only", "A"),
545 Namespace::World,
546 YLayer::Semantic,
547 )
548 .unwrap();
549 let ca = create_commit(
550 &obj,
551 &mut commits,
552 vec![base.commit_id.clone()],
553 "branch-a",
554 "DGX",
555 )
556 .unwrap();
557
558 checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
560 obj.store
561 .add_triple(
562 &sample_triple("b-only", "B"),
563 Namespace::World,
564 YLayer::Semantic,
565 )
566 .unwrap();
567 let cb = create_commit(
568 &obj,
569 &mut commits,
570 vec![base.commit_id.clone()],
571 "branch-b",
572 "DGX",
573 )
574 .unwrap();
575
576 let result = merge(&mut obj, &mut commits, &ca.commit_id, &cb.commit_id, "DGX").unwrap();
578
579 match result {
580 MergeResult::Clean(mc) => {
581 assert_eq!(mc.parent_ids.len(), 2);
582 assert!(obj.store.len() >= 3);
584 }
585 MergeResult::Conflict(_) => panic!("Expected clean merge"),
586 MergeResult::NoCommonAncestor => panic!("Expected common ancestor"),
587 }
588 }
589
590 #[test]
591 fn test_conflicting_merge() {
592 let tmp = tempfile::tempdir().unwrap();
593 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
594 let mut commits = CommitsTable::new();
595
596 obj.store
598 .add_triple(
599 &sample_triple("base", "Base"),
600 Namespace::World,
601 YLayer::Semantic,
602 )
603 .unwrap();
604 let base = create_commit(&obj, &mut commits, vec![], "base", "DGX").unwrap();
605
606 obj.store
608 .add_triple(
609 &sample_triple("conflict-subj", "TypeA"),
610 Namespace::World,
611 YLayer::Semantic,
612 )
613 .unwrap();
614 let ca = create_commit(
615 &obj,
616 &mut commits,
617 vec![base.commit_id.clone()],
618 "branch-a",
619 "DGX",
620 )
621 .unwrap();
622
623 checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
625 obj.store
626 .add_triple(
627 &sample_triple("conflict-subj", "TypeB"),
628 Namespace::World,
629 YLayer::Semantic,
630 )
631 .unwrap();
632 let cb = create_commit(
633 &obj,
634 &mut commits,
635 vec![base.commit_id.clone()],
636 "branch-b",
637 "DGX",
638 )
639 .unwrap();
640
641 let result = merge(&mut obj, &mut commits, &ca.commit_id, &cb.commit_id, "DGX").unwrap();
643
644 match result {
645 MergeResult::Conflict(conflicts) => {
646 assert_eq!(conflicts.len(), 1);
647 assert_eq!(conflicts[0].subject, "conflict-subj");
648 assert_eq!(conflicts[0].object_a, "TypeA");
649 assert_eq!(conflicts[0].object_b, "TypeB");
650 }
651 _ => panic!("Expected conflict"),
652 }
653 }
654
655 #[test]
656 fn test_merge_commit_has_two_parents() {
657 let tmp = tempfile::tempdir().unwrap();
658 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
659 let mut commits = CommitsTable::new();
660
661 obj.store
662 .add_triple(
663 &sample_triple("base", "Base"),
664 Namespace::World,
665 YLayer::Semantic,
666 )
667 .unwrap();
668 let base = create_commit(&obj, &mut commits, vec![], "base", "DGX").unwrap();
669
670 obj.store
672 .add_triple(&sample_triple("a", "A"), Namespace::World, YLayer::Semantic)
673 .unwrap();
674 let ca =
675 create_commit(&obj, &mut commits, vec![base.commit_id.clone()], "a", "DGX").unwrap();
676
677 checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
678 obj.store
679 .add_triple(&sample_triple("b", "B"), Namespace::Work, YLayer::Semantic)
680 .unwrap();
681 let cb =
682 create_commit(&obj, &mut commits, vec![base.commit_id.clone()], "b", "DGX").unwrap();
683
684 let result = merge(&mut obj, &mut commits, &ca.commit_id, &cb.commit_id, "DGX").unwrap();
685
686 match result {
687 MergeResult::Clean(mc) => {
688 assert_eq!(mc.parent_ids.len(), 2);
689 assert!(mc.parent_ids.contains(&ca.commit_id));
690 assert!(mc.parent_ids.contains(&cb.commit_id));
691 }
692 _ => panic!("Expected clean merge"),
693 }
694 }
695
696 fn setup_conflict_scenario() -> (GitObjectStore, CommitsTable, String, String) {
699 let tmp = tempfile::tempdir().unwrap();
700 let tmp_path = tmp.path().to_owned();
702 std::mem::forget(tmp);
703 let mut obj = GitObjectStore::with_snapshot_dir(&tmp_path);
704 let mut commits = CommitsTable::new();
705
706 obj.store
708 .add_triple(
709 &sample_triple("base", "Base"),
710 Namespace::World,
711 YLayer::Semantic,
712 )
713 .unwrap();
714 let base = create_commit(&obj, &mut commits, vec![], "base", "DGX").unwrap();
715
716 obj.store
718 .add_triple(
719 &sample_triple("conflict-subj", "TypeA"),
720 Namespace::World,
721 YLayer::Semantic,
722 )
723 .unwrap();
724 let ca = create_commit(
725 &obj,
726 &mut commits,
727 vec![base.commit_id.clone()],
728 "branch-a",
729 "DGX",
730 )
731 .unwrap();
732
733 checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
735 obj.store
736 .add_triple(
737 &sample_triple("conflict-subj", "TypeB"),
738 Namespace::World,
739 YLayer::Semantic,
740 )
741 .unwrap();
742 let cb = create_commit(
743 &obj,
744 &mut commits,
745 vec![base.commit_id.clone()],
746 "branch-b",
747 "DGX",
748 )
749 .unwrap();
750
751 (obj, commits, ca.commit_id, cb.commit_id)
752 }
753
754 #[test]
755 fn test_strategy_manual_returns_conflict() {
756 let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
757 let result = merge_with_strategy(
758 &mut obj,
759 &mut commits,
760 &ca_id,
761 &cb_id,
762 "DGX",
763 &MergeStrategy::Manual,
764 )
765 .unwrap();
766 match result {
767 MergeResult::Conflict(conflicts) => {
768 assert_eq!(conflicts.len(), 1);
769 assert_eq!(conflicts[0].subject, "conflict-subj");
770 }
771 _ => panic!("Manual strategy should return Conflict"),
772 }
773 }
774
775 #[test]
776 fn test_strategy_ours() {
777 let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
778 let result = merge_with_strategy(
779 &mut obj,
780 &mut commits,
781 &ca_id,
782 &cb_id,
783 "DGX",
784 &MergeStrategy::Ours,
785 )
786 .unwrap();
787 match result {
788 MergeResult::Clean(_) => {
789 let batches = obj
791 .store
792 .query(&nusy_arrow_core::QuerySpec {
793 subject: Some("conflict-subj".to_string()),
794 ..Default::default()
795 })
796 .unwrap();
797 let total: usize = batches.iter().map(|b| b.num_rows()).sum();
798 assert_eq!(total, 1, "Should have exactly one triple for conflict-subj");
799 let batch = &batches[0];
801 let obj_col = batch
802 .column(col::OBJECT)
803 .as_any()
804 .downcast_ref::<arrow::array::StringArray>()
805 .unwrap();
806 assert_eq!(obj_col.value(0), "TypeA");
807 }
808 _ => panic!("Ours strategy should produce Clean merge"),
809 }
810 }
811
812 #[test]
813 fn test_strategy_theirs() {
814 let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
815 let result = merge_with_strategy(
816 &mut obj,
817 &mut commits,
818 &ca_id,
819 &cb_id,
820 "DGX",
821 &MergeStrategy::Theirs,
822 )
823 .unwrap();
824 match result {
825 MergeResult::Clean(_) => {
826 let batches = obj
827 .store
828 .query(&nusy_arrow_core::QuerySpec {
829 subject: Some("conflict-subj".to_string()),
830 ..Default::default()
831 })
832 .unwrap();
833 let total: usize = batches.iter().map(|b| b.num_rows()).sum();
834 assert_eq!(total, 1, "Should have exactly one triple for conflict-subj");
835 let batch = &batches[0];
836 let obj_col = batch
837 .column(col::OBJECT)
838 .as_any()
839 .downcast_ref::<arrow::array::StringArray>()
840 .unwrap();
841 assert_eq!(obj_col.value(0), "TypeB");
842 }
843 _ => panic!("Theirs strategy should produce Clean merge"),
844 }
845 }
846
847 #[test]
848 fn test_strategy_keep_both() {
849 let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
850 let result = merge_with_strategy(
851 &mut obj,
852 &mut commits,
853 &ca_id,
854 &cb_id,
855 "DGX",
856 &MergeStrategy::Custom(Box::new(|_| Resolution::KeepBoth)),
857 )
858 .unwrap();
859 match result {
860 MergeResult::Clean(_) => {
861 let batches = obj
862 .store
863 .query(&nusy_arrow_core::QuerySpec {
864 subject: Some("conflict-subj".to_string()),
865 ..Default::default()
866 })
867 .unwrap();
868 let total: usize = batches.iter().map(|b| b.num_rows()).sum();
869 assert_eq!(total, 2, "KeepBoth should preserve both triples");
870 }
871 _ => panic!("Custom(KeepBoth) strategy should produce Clean merge"),
872 }
873 }
874
875 #[test]
876 fn test_strategy_drop() {
877 let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
878 let result = merge_with_strategy(
879 &mut obj,
880 &mut commits,
881 &ca_id,
882 &cb_id,
883 "DGX",
884 &MergeStrategy::Custom(Box::new(|_| Resolution::Drop)),
885 )
886 .unwrap();
887 match result {
888 MergeResult::Clean(_) => {
889 let batches = obj
890 .store
891 .query(&nusy_arrow_core::QuerySpec {
892 subject: Some("conflict-subj".to_string()),
893 ..Default::default()
894 })
895 .unwrap();
896 let total: usize = batches.iter().map(|b| b.num_rows()).sum();
897 assert_eq!(total, 0, "Drop should remove both conflicting triples");
898 }
899 _ => panic!("Custom(Drop) strategy should produce Clean merge"),
900 }
901 }
902
903 #[test]
904 fn test_strategy_last_writer_wins() {
905 let tmp = tempfile::tempdir().unwrap();
906 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
907 let mut commits = CommitsTable::new();
908
909 obj.store
911 .add_triple(
912 &sample_triple("base", "Base"),
913 Namespace::World,
914 YLayer::Semantic,
915 )
916 .unwrap();
917 let base = create_commit(&obj, &mut commits, vec![], "base", "DGX").unwrap();
918
919 let triple_a = Triple {
921 subject: "ts-subj".to_string(),
922 predicate: "rdf:type".to_string(),
923 object: "OlderValue".to_string(),
924 graph: None,
925 confidence: Some(0.9),
926 source_document: None,
927 source_chunk_id: None,
928 extracted_by: None,
929 caused_by: None,
930 derived_from: None,
931 consolidated_at: Some(1000), certifiability_class: None,
933 };
934 obj.store
935 .add_triple(&triple_a, Namespace::World, YLayer::Semantic)
936 .unwrap();
937 let ca = create_commit(
938 &obj,
939 &mut commits,
940 vec![base.commit_id.clone()],
941 "older",
942 "DGX",
943 )
944 .unwrap();
945
946 checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
948 let triple_b = Triple {
949 subject: "ts-subj".to_string(),
950 predicate: "rdf:type".to_string(),
951 object: "NewerValue".to_string(),
952 graph: None,
953 confidence: Some(0.9),
954 source_document: None,
955 source_chunk_id: None,
956 extracted_by: None,
957 caused_by: None,
958 derived_from: None,
959 consolidated_at: Some(2000), certifiability_class: None,
961 };
962 obj.store
963 .add_triple(&triple_b, Namespace::World, YLayer::Semantic)
964 .unwrap();
965 let cb = create_commit(
966 &obj,
967 &mut commits,
968 vec![base.commit_id.clone()],
969 "newer",
970 "DGX",
971 )
972 .unwrap();
973
974 let result = merge_with_strategy(
975 &mut obj,
976 &mut commits,
977 &ca.commit_id,
978 &cb.commit_id,
979 "DGX",
980 &MergeStrategy::LastWriterWins,
981 )
982 .unwrap();
983
984 match result {
985 MergeResult::Clean(_) => {
986 let batches = obj
987 .store
988 .query(&nusy_arrow_core::QuerySpec {
989 subject: Some("ts-subj".to_string()),
990 ..Default::default()
991 })
992 .unwrap();
993 let total: usize = batches.iter().map(|b| b.num_rows()).sum();
994 assert_eq!(total, 1);
995 let batch = &batches[0];
996 let obj_col = batch
997 .column(col::OBJECT)
998 .as_any()
999 .downcast_ref::<arrow::array::StringArray>()
1000 .unwrap();
1001 assert_eq!(
1002 obj_col.value(0),
1003 "NewerValue",
1004 "LastWriterWins should pick the newer timestamp"
1005 );
1006 }
1007 _ => panic!("LastWriterWins should produce Clean merge"),
1008 }
1009 }
1010
1011 #[test]
1012 fn test_strategy_custom_conditional() {
1013 let (mut obj, mut commits, ca_id, cb_id) = setup_conflict_scenario();
1014 let result = merge_with_strategy(
1016 &mut obj,
1017 &mut commits,
1018 &ca_id,
1019 &cb_id,
1020 "DGX",
1021 &MergeStrategy::Custom(Box::new(|c| {
1022 if c.subject.starts_with("conflict") {
1023 Resolution::KeepOurs
1024 } else {
1025 Resolution::KeepTheirs
1026 }
1027 })),
1028 )
1029 .unwrap();
1030 match result {
1031 MergeResult::Clean(_) => {
1032 let batches = obj
1033 .store
1034 .query(&nusy_arrow_core::QuerySpec {
1035 subject: Some("conflict-subj".to_string()),
1036 ..Default::default()
1037 })
1038 .unwrap();
1039 let batch = &batches[0];
1040 let obj_col = batch
1041 .column(col::OBJECT)
1042 .as_any()
1043 .downcast_ref::<arrow::array::StringArray>()
1044 .unwrap();
1045 assert_eq!(
1046 obj_col.value(0),
1047 "TypeA",
1048 "Custom should keep ours for conflict-* subjects"
1049 );
1050 }
1051 _ => panic!("Custom strategy should produce Clean merge"),
1052 }
1053 }
1054
1055 #[test]
1056 fn test_strategy_on_no_conflict_still_clean() {
1057 let tmp = tempfile::tempdir().unwrap();
1059 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
1060 let mut commits = CommitsTable::new();
1061
1062 obj.store
1063 .add_triple(
1064 &sample_triple("base", "Base"),
1065 Namespace::World,
1066 YLayer::Semantic,
1067 )
1068 .unwrap();
1069 let base = create_commit(&obj, &mut commits, vec![], "base", "DGX").unwrap();
1070
1071 obj.store
1073 .add_triple(
1074 &sample_triple("a-only", "A"),
1075 Namespace::World,
1076 YLayer::Semantic,
1077 )
1078 .unwrap();
1079 let ca =
1080 create_commit(&obj, &mut commits, vec![base.commit_id.clone()], "a", "DGX").unwrap();
1081
1082 checkout::checkout(&mut obj, &commits, &base.commit_id).unwrap();
1083 obj.store
1084 .add_triple(
1085 &sample_triple("b-only", "B"),
1086 Namespace::Work,
1087 YLayer::Semantic,
1088 )
1089 .unwrap();
1090 let cb =
1091 create_commit(&obj, &mut commits, vec![base.commit_id.clone()], "b", "DGX").unwrap();
1092
1093 let result = merge_with_strategy(
1094 &mut obj,
1095 &mut commits,
1096 &ca.commit_id,
1097 &cb.commit_id,
1098 "DGX",
1099 &MergeStrategy::Ours,
1100 )
1101 .unwrap();
1102 match result {
1103 MergeResult::Clean(mc) => {
1104 assert_eq!(mc.parent_ids.len(), 2);
1105 assert!(obj.store.len() >= 3);
1106 }
1107 _ => panic!("Non-conflicting merge with any strategy should be Clean"),
1108 }
1109 }
1110}