use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use ipld_core::ipld::Ipld;
use crate::codec::hash_to_cid;
use crate::error::{Error, RepoError};
use crate::id::{ChangeId, Cid};
use crate::index;
use crate::objects::{Commit, Operation, RefTarget, View};
use crate::prolly::{self, Cursor, ProllyKey};
use crate::store::{Blockstore, OpHeadsStore};
use super::conflict::{
Conflict, ConflictCategory, ConflictPolicy, MergeConflicts, detect_conflicts_with_views,
};
use super::lca;
use super::readonly::decode_from_store;
const MERGE_AUTHOR: &str = "mnem";
const MERGE_COMMIT_MESSAGE: &str = "mnem merge commit";
pub(crate) fn merge_op_heads(
bs: &Arc<dyn Blockstore>,
ohs: &Arc<dyn OpHeadsStore>,
mut heads: Vec<Cid>,
) -> Result<Cid, Error> {
assert!(heads.len() >= 2, "merge_op_heads requires >=2 heads");
heads.sort();
let mut head_ops: Vec<Operation> = Vec::with_capacity(heads.len());
let mut head_views: Vec<View> = Vec::with_capacity(heads.len());
let mut head_commits: Vec<Option<(Cid, Commit)>> = Vec::with_capacity(heads.len());
for h in &heads {
let op: Operation = decode_from_store(&**bs, h)?;
let view: View = decode_from_store(&**bs, &op.view)?;
let commit = if let Some(cc) = view.heads.first() {
let decoded: Commit = decode_from_store(&**bs, cc)?;
Some((cc.clone(), decoded))
} else {
None
};
head_ops.push(op);
head_views.push(view);
head_commits.push(commit);
}
let ancestor_cid = lca::find_lca_many(&**bs, &heads)?.ok_or(RepoError::NoCommonAncestor)?;
let ancestor_op: Operation = decode_from_store(&**bs, &ancestor_cid)?;
let ancestor_view: View = decode_from_store(&**bs, &ancestor_op.view)?;
let mut merged_view = merge_views(&ancestor_view, &head_views);
if let Some(merge_commit_cid) = build_merge_commit(&**bs, &head_commits)? {
merged_view.heads = vec![merge_commit_cid];
}
let (view_bytes, view_cid) = hash_to_cid(&merged_view)?;
bs.put_trusted(view_cid.clone(), view_bytes)?;
let merge_time = head_ops.iter().map(|o| o.time).max().unwrap_or(0) + 1;
let description = describe_merge(&heads);
let mut merge_op = Operation::new(view_cid, MERGE_AUTHOR, merge_time, description);
for h in &heads {
merge_op = merge_op.with_parent(h.clone());
}
let (op_bytes, op_cid) = hash_to_cid(&merge_op)?;
bs.put_trusted(op_cid.clone(), op_bytes)?;
ohs.update(op_cid.clone(), &heads)?;
Ok(op_cid)
}
fn build_merge_commit(
bs: &dyn Blockstore,
head_commits: &[Option<(Cid, Commit)>],
) -> Result<Option<Cid>, Error> {
let parents: Vec<&(Cid, Commit)> = head_commits.iter().filter_map(Option::as_ref).collect();
if parents.is_empty() {
return Ok(None);
}
let node_roots: Vec<&Cid> = parents.iter().map(|(_, c)| &c.nodes).collect();
let edge_roots: Vec<&Cid> = parents.iter().map(|(_, c)| &c.edges).collect();
let node_union = union_prolly_trees(bs, &node_roots)?;
let edge_union = union_prolly_trees(bs, &edge_roots)?;
let merged_nodes = node_union.root;
let merged_edges = edge_union.root;
let mut parent_pairs: Vec<&(Cid, Commit)> = parents.clone();
parent_pairs.sort_by(|a, b| a.0.cmp(&b.0));
let merged_schema = parent_pairs[0].1.schema.clone();
let mut reused_indexes: Option<Cid> = None;
for (_parent_cid, parent_commit) in &parents {
if parent_commit.nodes == merged_nodes
&& parent_commit.edges == merged_edges
&& let Some(idx) = &parent_commit.indexes
{
reused_indexes = Some(idx.clone());
break;
}
}
let merged_indexes = match reused_indexes {
Some(cid) => cid,
None => index::build_index_set(bs, &merged_nodes, &merged_edges)?,
};
let merge_time = parent_pairs.iter().map(|(_, c)| c.time).max().unwrap_or(0) + 1;
let parent_cids: Vec<Cid> = parent_pairs.iter().map(|(c, _)| c.clone()).collect();
let change_id = deterministic_change_id(&parent_cids);
let mut commit = Commit::new(
change_id,
merged_nodes,
merged_edges,
merged_schema,
MERGE_AUTHOR,
merge_time,
MERGE_COMMIT_MESSAGE,
);
commit.indexes = Some(merged_indexes);
if !node_union.conflicts.is_empty() || !edge_union.conflicts.is_empty() {
let conflict_list = |map: &BTreeMap<ProllyKey, Vec<Cid>>| -> Ipld {
let mut entries = Vec::with_capacity(map.len());
for (k, candidates) in map {
entries.push(Ipld::Map(
[
("key".into(), Ipld::Bytes(k.0.to_vec())),
(
"candidates".into(),
Ipld::List(
candidates
.iter()
.map(|c| {
Ipld::Link(
ipld_core::cid::Cid::try_from(c.to_bytes().as_slice())
.expect("cid round-trip"),
)
})
.collect(),
),
),
]
.into_iter()
.collect::<BTreeMap<_, _>>(),
));
}
Ipld::List(entries)
};
let mut conflict_map = BTreeMap::new();
if !node_union.conflicts.is_empty() {
conflict_map.insert("nodes".into(), conflict_list(&node_union.conflicts));
}
if !edge_union.conflicts.is_empty() {
conflict_map.insert("edges".into(), conflict_list(&edge_union.conflicts));
}
commit
.extra
.insert("_merge_conflicts".into(), Ipld::Map(conflict_map));
}
for p in parent_cids {
commit = commit.with_parent(p);
}
let (bytes, cid) = hash_to_cid(&commit)?;
bs.put_trusted(cid.clone(), bytes)?;
Ok(Some(cid))
}
struct UnionOutcome {
root: Cid,
conflicts: BTreeMap<ProllyKey, Vec<Cid>>,
}
fn union_prolly_trees(bs: &dyn Blockstore, roots: &[&Cid]) -> Result<UnionOutcome, Error> {
let mut sorted_roots: Vec<&Cid> = roots.to_vec();
sorted_roots.sort();
let mut all_values: BTreeMap<ProllyKey, Vec<Cid>> = BTreeMap::new();
for root in &sorted_roots {
let cursor = Cursor::new(bs, root)?;
for entry in cursor {
let (k, v) = entry?;
all_values.entry(k).or_default().push(v);
}
}
let mut merged: BTreeMap<ProllyKey, Cid> = BTreeMap::new();
let mut conflicts: BTreeMap<ProllyKey, Vec<Cid>> = BTreeMap::new();
for (k, mut values) in all_values {
values.sort();
values.dedup();
if values.len() > 1 {
let winner = values.last().expect("dedup keeps >=1").clone();
conflicts.insert(k, values);
merged.insert(k, winner);
} else {
merged.insert(k, values.into_iter().next().expect("len == 1"));
}
}
let root = prolly::build_tree(bs, merged)?;
Ok(UnionOutcome { root, conflicts })
}
fn deterministic_change_id(parent_cids: &[Cid]) -> ChangeId {
let mut hasher = blake3::Hasher::new();
hasher.update(b"mnem/merge-change-id/v1");
for p in parent_cids {
hasher.update(&p.to_bytes());
}
let digest = hasher.finalize();
let mut id = [0u8; 16];
id.copy_from_slice(&digest.as_bytes()[..16]);
ChangeId::from_bytes_raw(id)
}
fn describe_merge(heads: &[Cid]) -> String {
let joined = heads
.iter()
.map(Cid::to_string)
.collect::<Vec<_>>()
.join(" ");
format!("merge {} op-heads: {joined}", heads.len())
}
fn merge_views(ancestor: &View, heads: &[View]) -> View {
let mut names: BTreeSet<String> = ancestor.refs.keys().cloned().collect();
for v in heads {
names.extend(v.refs.keys().cloned());
}
let mut merged_refs: BTreeMap<String, RefTarget> = BTreeMap::new();
for name in names {
let base = ancestor.refs.get(&name);
let head_targets: Vec<Option<&RefTarget>> =
heads.iter().map(|v| v.refs.get(&name)).collect();
if let Some(t) = merge_one_ref(base, &head_targets) {
merged_refs.insert(name, t);
}
}
let mut commit_heads: BTreeSet<Cid> = ancestor.heads.iter().cloned().collect();
for v in heads {
commit_heads.extend(v.heads.iter().cloned());
}
let mut commit_heads: Vec<Cid> = commit_heads.into_iter().collect();
commit_heads.sort();
let remote_refs = heads
.iter()
.find_map(|v| v.remote_refs.clone())
.or_else(|| ancestor.remote_refs.clone());
let wc_commit = heads
.iter()
.find_map(|v| v.wc_commit.clone())
.or_else(|| ancestor.wc_commit.clone());
let mut extra: BTreeMap<String, Ipld> = ancestor.extra.clone();
for v in heads {
for (k, val) in &v.extra {
extra.insert(k.clone(), val.clone());
}
}
let mut tombstones = ancestor.tombstones.clone();
for v in heads {
for (node_id, ts) in &v.tombstones {
tombstones.insert(*node_id, ts.clone());
}
}
View {
heads: commit_heads,
refs: merged_refs,
remote_refs,
wc_commit,
tombstones,
extra,
}
}
fn merge_one_ref(base: Option<&RefTarget>, heads: &[Option<&RefTarget>]) -> Option<RefTarget> {
if heads.iter().all(|h| *h == base) {
return base.cloned();
}
if let Some(first) = heads.first()
&& heads.iter().all(|h| h == first)
{
return (*first).cloned();
}
let mut counts: BTreeMap<Cid, i32> = BTreeMap::new();
for h in heads {
for (cid, sign) in bag(*h) {
*counts.entry(cid).or_insert(0) += sign;
}
}
let n_minus_1 = i32::try_from(heads.len()).unwrap_or(i32::MAX) - 1;
for (cid, sign) in bag(base) {
*counts.entry(cid).or_insert(0) -= sign * n_minus_1;
}
let mut adds: Vec<Cid> = Vec::new();
let mut removes: Vec<Cid> = Vec::new();
for (cid, count) in counts {
if count > 0 {
adds.push(cid);
} else if count < 0 {
removes.push(cid);
}
}
match (adds.len(), removes.len()) {
(0, 0) => None,
(1, 0) => {
let target = adds.into_iter().next().expect("checked len == 1");
Some(RefTarget::normal(target))
}
_ => Some(RefTarget::conflicted(adds, removes)),
}
}
fn bag(t: Option<&RefTarget>) -> Vec<(Cid, i32)> {
match t {
None => Vec::new(),
Some(RefTarget::Normal { target }) => vec![(target.clone(), 1)],
Some(RefTarget::Conflicted { adds, removes }) => {
let mut out = Vec::with_capacity(adds.len() + removes.len());
for c in adds {
out.push((c.clone(), 1));
}
for c in removes {
out.push((c.clone(), -1));
}
out
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum MergeStrategy {
Manual,
Ours,
Theirs,
}
#[derive(Clone, Debug)]
pub enum MergeOutcome {
FastForward(Cid),
Clean(Cid),
Conflicts(MergeConflicts),
}
pub fn merge_three_way(
bs: &Arc<dyn Blockstore>,
_oph: &Arc<dyn OpHeadsStore>,
left: Cid,
right: Cid,
strategy: MergeStrategy,
) -> Result<MergeOutcome, Error> {
let lca_cid = find_commit_lca(&**bs, &left, &right)?;
if let Some(ref lca) = lca_cid {
if lca == &right {
return Ok(MergeOutcome::FastForward(left));
}
if lca == &left {
return Ok(MergeOutcome::FastForward(right));
}
} else {
return Err(RepoError::NoCommonAncestor.into());
}
let left_commit: Commit = decode_from_store(&**bs, &left)?;
let right_commit: Commit = decode_from_store(&**bs, &right)?;
let empty_view = View {
heads: vec![],
refs: BTreeMap::new(),
remote_refs: None,
wc_commit: None,
tombstones: BTreeMap::new(),
extra: BTreeMap::new(),
};
let repo = build_detection_repo(bs, &left)?;
let mc = detect_conflicts_with_views(
&repo,
left.clone(),
right.clone(),
lca_cid.clone(),
&empty_view,
&empty_view,
ConflictPolicy::default(),
)?;
if !mc.conflicts.is_empty() && matches!(strategy, MergeStrategy::Manual) {
return Ok(MergeOutcome::Conflicts(mc));
}
let head_commits: Vec<Option<(Cid, Commit)>> = vec![
Some((left.clone(), left_commit)),
Some((right.clone(), right_commit)),
];
let merge_cid = build_merge_commit(&**bs, &head_commits)?
.ok_or_else(|| Error::from(RepoError::NoCommonAncestor))?;
Ok(MergeOutcome::Clean(merge_cid))
}
fn find_commit_lca(bs: &dyn Blockstore, left: &Cid, right: &Cid) -> Result<Option<Cid>, Error> {
if left == right {
return Ok(Some(left.clone()));
}
let left_anc = commit_ancestors_inclusive(bs, left)?;
let right_anc = commit_ancestors_inclusive(bs, right)?;
let common: BTreeSet<Cid> = left_anc.intersection(&right_anc).cloned().collect();
if common.is_empty() {
return Ok(None);
}
let mut strict: BTreeSet<Cid> = BTreeSet::new();
for c in &common {
let anc = commit_ancestors_inclusive(bs, c)?;
for a in &anc {
if a != c && common.contains(a) {
strict.insert(a.clone());
}
}
}
let mut lcas: Vec<Cid> = common.difference(&strict).cloned().collect();
lcas.sort();
Ok(lcas.into_iter().next())
}
fn commit_ancestors_inclusive(bs: &dyn Blockstore, cid: &Cid) -> Result<BTreeSet<Cid>, Error> {
let mut seen: BTreeSet<Cid> = BTreeSet::new();
let mut stack: Vec<Cid> = vec![cid.clone()];
while let Some(c) = stack.pop() {
if !seen.insert(c.clone()) {
continue;
}
let commit: Commit = decode_from_store(bs, &c)?;
for p in &commit.parents {
if !seen.contains(p) {
stack.push(p.clone());
}
}
}
Ok(seen)
}
fn build_detection_repo(
bs: &Arc<dyn Blockstore>,
commit_cid: &Cid,
) -> Result<super::ReadonlyRepo, Error> {
use crate::store::MemoryOpHeadsStore;
let view = View {
heads: vec![commit_cid.clone()],
refs: BTreeMap::new(),
remote_refs: None,
wc_commit: None,
tombstones: BTreeMap::new(),
extra: BTreeMap::new(),
};
let (view_bytes, view_cid) = hash_to_cid(&view)?;
bs.put_trusted(view_cid.clone(), view_bytes)?;
let op = Operation::new(view_cid, MERGE_AUTHOR, 0, "merge-detect");
let (op_bytes, op_cid) = hash_to_cid(&op)?;
bs.put_trusted(op_cid.clone(), op_bytes)?;
let ohs: Arc<dyn OpHeadsStore> = Arc::new(MemoryOpHeadsStore::new());
super::ReadonlyRepo::load_at(bs.clone(), ohs, op_cid)
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ConflictSide {
Left,
Right,
}
#[must_use]
pub fn conflict_category_counts(mc: &MergeConflicts) -> (usize, usize, usize) {
let mut node_cid = 0usize;
let mut edge_prop = 0usize;
let mut tvm = 0usize;
for c in &mc.conflicts {
match c.category {
ConflictCategory::NodeCidDivergence => node_cid += 1,
ConflictCategory::EdgePropCollision => edge_prop += 1,
ConflictCategory::TombstoneVsModify => tvm += 1,
}
}
(node_cid, edge_prop, tvm)
}
pub fn picks_from_strategy(
mc: &MergeConflicts,
strategy: MergeStrategy,
) -> Result<Vec<(Conflict, ConflictSide)>, Error> {
let side = match strategy {
MergeStrategy::Ours => ConflictSide::Left,
MergeStrategy::Theirs => ConflictSide::Right,
MergeStrategy::Manual => {
return Err(RepoError::Stale.into());
}
};
Ok(mc.conflicts.iter().map(|c| (c.clone(), side)).collect())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::id::{CODEC_RAW, Multihash};
fn raw_cid(seed: u32) -> Cid {
Cid::new(CODEC_RAW, Multihash::sha2_256(&seed.to_be_bytes()))
}
#[test]
fn unchanged_from_base_stays_base() {
let c0 = raw_cid(0);
let base = RefTarget::normal(c0);
let heads = [Some(&base), Some(&base)];
let merged = merge_one_ref(Some(&base), &heads);
assert_eq!(merged, Some(base));
}
#[test]
fn all_heads_agree_on_changed_value() {
let c0 = raw_cid(0);
let c1 = raw_cid(1);
let base = RefTarget::normal(c0);
let new = RefTarget::normal(c1);
let heads = [Some(&new), Some(&new)];
let merged = merge_one_ref(Some(&base), &heads);
assert_eq!(merged, Some(new));
}
#[test]
fn single_changing_head_wins_no_conflict() {
let c0 = raw_cid(0);
let c1 = raw_cid(1);
let base = RefTarget::normal(c0);
let changed = RefTarget::normal(c1);
let heads = [Some(&base), Some(&changed)];
let merged = merge_one_ref(Some(&base), &heads);
assert_eq!(merged, Some(changed));
}
#[test]
fn diverging_heads_produce_conflict() {
let c0 = raw_cid(0);
let c1 = raw_cid(1);
let c2 = raw_cid(2);
let base = RefTarget::normal(c0.clone());
let h1 = RefTarget::normal(c1.clone());
let h2 = RefTarget::normal(c2.clone());
let merged = merge_one_ref(Some(&base), &[Some(&h1), Some(&h2)]).unwrap();
match merged {
RefTarget::Conflicted { adds, removes } => {
let adds_set: BTreeSet<Cid> = adds.into_iter().collect();
assert_eq!(adds_set, BTreeSet::from([c1, c2]));
assert_eq!(removes, vec![c0]);
}
other => panic!("expected Conflicted, got {other:?}"),
}
}
#[test]
fn add_vs_delete_is_a_conflict() {
let c0 = raw_cid(0);
let c1 = raw_cid(1);
let base = RefTarget::normal(c0.clone());
let bumped = RefTarget::normal(c1.clone());
let merged = merge_one_ref(Some(&base), &[Some(&bumped), None]).unwrap();
match merged {
RefTarget::Conflicted { adds, removes } => {
assert_eq!(adds, vec![c1]);
assert_eq!(removes, vec![c0]);
}
other => panic!("expected Conflicted, got {other:?}"),
}
}
#[test]
fn absent_base_one_head_adds_becomes_normal() {
let c1 = raw_cid(1);
let new = RefTarget::normal(c1.clone());
let merged = merge_one_ref(None, &[Some(&new), None]).unwrap();
assert_eq!(merged, RefTarget::normal(c1));
}
#[test]
fn both_heads_delete_base_returns_absent() {
let c0 = raw_cid(0);
let base = RefTarget::normal(c0);
let merged = merge_one_ref(Some(&base), &[None, None]);
assert_eq!(merged, None);
}
fn stores() -> (Arc<dyn Blockstore>, Arc<dyn OpHeadsStore>) {
use crate::store::{MemoryBlockstore, MemoryOpHeadsStore};
(
Arc::new(MemoryBlockstore::new()),
Arc::new(MemoryOpHeadsStore::new()),
)
}
fn normal_target(seed: u32) -> RefTarget {
RefTarget::normal(raw_cid(seed))
}
#[test]
fn open_on_divergent_heads_runs_merge() {
use crate::repo::ReadonlyRepo;
let (bs, ohs) = stores();
let repo0 = ReadonlyRepo::init(bs.clone(), ohs.clone()).unwrap();
let target_a = normal_target(101);
let target_b = normal_target(102);
let repo1 = repo0
.update_ref("refs/heads/a", None, Some(target_a.clone()), "alice")
.unwrap();
let repo2 = repo0
.update_ref("refs/heads/b", None, Some(target_b.clone()), "bob")
.unwrap();
let heads_before = ohs.current().unwrap();
assert_eq!(heads_before.len(), 2);
assert!(heads_before.contains(repo1.op_id()));
assert!(heads_before.contains(repo2.op_id()));
let merged = ReadonlyRepo::open(bs, ohs.clone()).unwrap();
let heads_after = ohs.current().unwrap();
assert_eq!(heads_after.len(), 1);
assert_eq!(heads_after[0], *merged.op_id());
let mut expected_parents = vec![repo1.op_id().clone(), repo2.op_id().clone()];
expected_parents.sort();
assert_eq!(merged.operation().parents, expected_parents);
assert_eq!(merged.view().refs.get("refs/heads/a"), Some(&target_a));
assert_eq!(merged.view().refs.get("refs/heads/b"), Some(&target_b));
}
#[test]
fn same_ref_divergence_becomes_conflicted() {
use crate::repo::ReadonlyRepo;
let (bs, ohs) = stores();
let repo0 = ReadonlyRepo::init(bs.clone(), ohs.clone()).unwrap();
let target_a = normal_target(201);
let target_b = normal_target(202);
repo0
.update_ref("refs/heads/main", None, Some(target_a.clone()), "alice")
.unwrap();
repo0
.update_ref("refs/heads/main", None, Some(target_b.clone()), "bob")
.unwrap();
assert_eq!(ohs.current().unwrap().len(), 2);
let merged = ReadonlyRepo::open(bs, ohs).unwrap();
let main_ref = merged
.view()
.refs
.get("refs/heads/main")
.expect("conflicted main should be present");
match main_ref {
RefTarget::Conflicted { adds, removes } => {
let cid_a = match &target_a {
RefTarget::Normal { target } => target.clone(),
_ => unreachable!(),
};
let cid_b = match &target_b {
RefTarget::Normal { target } => target.clone(),
_ => unreachable!(),
};
let adds_set: BTreeSet<Cid> = adds.iter().cloned().collect();
assert_eq!(adds_set, BTreeSet::from([cid_a, cid_b]));
assert!(removes.is_empty());
}
other => panic!("expected Conflicted, got {other:?}"),
}
}
#[test]
fn open_after_merge_is_idempotent() {
use crate::repo::ReadonlyRepo;
let (bs, ohs) = stores();
let repo0 = ReadonlyRepo::init(bs.clone(), ohs.clone()).unwrap();
repo0
.update_ref("refs/heads/a", None, Some(normal_target(301)), "alice")
.unwrap();
repo0
.update_ref("refs/heads/b", None, Some(normal_target(302)), "bob")
.unwrap();
let m1 = ReadonlyRepo::open(bs.clone(), ohs.clone()).unwrap();
let m2 = ReadonlyRepo::open(bs, ohs).unwrap();
assert_eq!(m1.op_id(), m2.op_id());
}
#[test]
fn merge_with_concurrent_node_commits_unions_both_sides() {
use crate::id::NodeId;
use crate::index::PropPredicate;
use crate::objects::Node;
use crate::repo::ReadonlyRepo;
use ipld_core::ipld::Ipld;
let (bs, ohs) = stores();
let repo0 = ReadonlyRepo::init(bs.clone(), ohs.clone()).unwrap();
let mut tx_a = repo0.start_transaction();
let alice =
Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into()));
tx_a.add_node(&alice).unwrap();
let _repo_a = tx_a.commit("agent:A", "alice").unwrap();
let mut tx_b = repo0.start_transaction();
let bob =
Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Bob".into()));
tx_b.add_node(&bob).unwrap();
let _repo_b = tx_b.commit("agent:B", "bob").unwrap();
assert_eq!(
ohs.current().unwrap().len(),
2,
"concurrent writers produced two op-heads"
);
let merged = ReadonlyRepo::open(bs, ohs.clone()).unwrap();
assert_eq!(ohs.current().unwrap().len(), 1);
assert!(
merged.lookup_node(&alice.id).unwrap().is_some(),
"Alice survives the merge"
);
assert!(
merged.lookup_node(&bob.id).unwrap().is_some(),
"Bob survives the merge"
);
let alice_hits = merged
.query()
.label("Person")
.where_prop("name", PropPredicate::Eq(Ipld::String("Alice".into())))
.execute()
.unwrap();
let bob_hits = merged
.query()
.label("Person")
.where_prop("name", PropPredicate::Eq(Ipld::String("Bob".into())))
.execute()
.unwrap();
assert_eq!(alice_hits.len(), 1);
assert_eq!(bob_hits.len(), 1);
let merge_commit = merged.head_commit().expect("merge commit exists");
assert_eq!(
merge_commit.parents.len(),
2,
"merge commit has both parent commits as parents"
);
}
#[test]
fn concurrent_content_divergence_surfaces_as_merge_conflict_metadata() {
use crate::id::NodeId;
use crate::objects::Node;
use crate::repo::ReadonlyRepo;
use ipld_core::ipld::Ipld;
let (bs, ohs) = stores();
let repo0 = ReadonlyRepo::init(bs.clone(), ohs.clone()).unwrap();
let alice_id = NodeId::new_v7();
let mut tx = repo0.start_transaction();
tx.add_node(&Node::new(alice_id, "Person").with_prop("name", Ipld::String("Alice".into())))
.unwrap();
let repo1 = tx.commit("seed", "seed alice").unwrap();
let mut tx_a = repo1.start_transaction();
tx_a.add_node(
&Node::new(alice_id, "Person")
.with_prop("name", Ipld::String("Alice".into()))
.with_prop("company", Ipld::String("Acme".into())),
)
.unwrap();
let _ = tx_a.commit("agent:A", "alice at Acme").unwrap();
let mut tx_b = repo1.start_transaction();
tx_b.add_node(
&Node::new(alice_id, "Person")
.with_prop("name", Ipld::String("Alice".into()))
.with_prop("company", Ipld::String("Beta".into())),
)
.unwrap();
let _ = tx_b.commit("agent:B", "alice at Beta").unwrap();
assert_eq!(ohs.current().unwrap().len(), 2);
let merged = ReadonlyRepo::open(bs, ohs).unwrap();
let commit = merged.head_commit().expect("merge commit exists");
let alice_now = merged.lookup_node(&alice_id).unwrap().unwrap();
let company = alice_now
.get_str("company")
.expect("company prop present on both candidates");
assert!(company == "Acme" || company == "Beta");
let conflicts = commit
.extra
.get("_merge_conflicts")
.expect("merge commit records content conflicts");
match conflicts {
Ipld::Map(m) => {
assert!(m.contains_key("nodes"), "nodes-level conflict recorded");
}
other => panic!("expected conflict map, got {other:?}"),
}
}
#[test]
fn three_parent_octopus_merge_unions_all_sides() {
use crate::id::NodeId;
use crate::objects::Node;
use crate::repo::ReadonlyRepo;
use ipld_core::ipld::Ipld;
let (bs, ohs) = stores();
let repo0 = ReadonlyRepo::init(bs.clone(), ohs.clone()).unwrap();
let mk = |name: &str| -> NodeId {
let mut tx = repo0.start_transaction();
let n =
Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String(name.into()));
let id = n.id;
tx.add_node(&n).unwrap();
let _ = tx.commit("agent", name).unwrap();
id
};
let a = mk("Alice");
let b = mk("Bob");
let c = mk("Carol");
assert_eq!(
ohs.current().unwrap().len(),
3,
"three concurrent op-heads expected"
);
let merged = ReadonlyRepo::open(bs, ohs.clone()).unwrap();
assert_eq!(ohs.current().unwrap().len(), 1);
assert!(merged.lookup_node(&a).unwrap().is_some());
assert!(merged.lookup_node(&b).unwrap().is_some());
assert!(merged.lookup_node(&c).unwrap().is_some());
assert_eq!(merged.operation().parents.len(), 3);
assert_eq!(
merged.head_commit().unwrap().parents.len(),
3,
"octopus merge commit points at all three parents"
);
let all_people = merged.query().label("Person").execute().unwrap();
assert_eq!(all_people.len(), 3);
}
#[test]
fn merge_op_heads_is_order_invariant() {
use crate::repo::ReadonlyRepo;
let (bs, ohs) = stores();
let repo0 = ReadonlyRepo::init(bs.clone(), ohs.clone()).unwrap();
let repo1 = repo0
.update_ref("refs/heads/a", None, Some(normal_target(501)), "alice")
.unwrap();
let repo2 = repo0
.update_ref("refs/heads/b", None, Some(normal_target(502)), "bob")
.unwrap();
assert_eq!(ohs.current().unwrap().len(), 2);
let merge1 = merge_op_heads(
&bs,
&ohs,
vec![repo1.op_id().clone(), repo2.op_id().clone()],
)
.unwrap();
ohs.update(repo1.op_id().clone(), std::slice::from_ref(&merge1))
.unwrap();
ohs.update(repo2.op_id().clone(), &[]).unwrap();
assert_eq!(ohs.current().unwrap().len(), 2);
let merge2 = merge_op_heads(
&bs,
&ohs,
vec![repo2.op_id().clone(), repo1.op_id().clone()],
)
.unwrap();
assert_eq!(
merge1, merge2,
"merge op CID must be invariant under input head order"
);
}
#[test]
fn merge_three_way_fast_forward_on_linear_history() {
use crate::id::NodeId;
use crate::objects::Node;
use crate::repo::ReadonlyRepo;
use ipld_core::ipld::Ipld;
let (bs, ohs) = stores();
let repo0 = ReadonlyRepo::init(bs.clone(), ohs.clone()).unwrap();
let mut tx = repo0.start_transaction();
tx.add_node(&Node::new(NodeId::new_v7(), "Doc").with_prop("v", Ipld::String("1".into())))
.unwrap();
let repo_left = tx.commit("alice", "v1").unwrap();
let left_cid = repo_left.view().heads.first().cloned().unwrap();
let mut tx2 = repo_left.start_transaction();
tx2.add_node(&Node::new(NodeId::new_v7(), "Doc").with_prop("v", Ipld::String("2".into())))
.unwrap();
let repo_right = tx2.commit("alice", "v2").unwrap();
let right_cid = repo_right.view().heads.first().cloned().unwrap();
let outcome = merge_three_way(
&bs,
&ohs,
left_cid.clone(),
right_cid.clone(),
MergeStrategy::Manual,
)
.unwrap();
match outcome {
MergeOutcome::FastForward(cid) => assert_eq!(cid, right_cid),
other => panic!("expected FastForward, got {other:?}"),
}
}
#[test]
fn merge_three_way_clean_produces_union() {
use crate::id::NodeId;
use crate::objects::Node;
use crate::repo::ReadonlyRepo;
use ipld_core::ipld::Ipld;
let (bs, ohs) = stores();
let repo0 = ReadonlyRepo::init(bs.clone(), ohs.clone()).unwrap();
let mut tx_base = repo0.start_transaction();
tx_base
.add_node(
&Node::new(NodeId::new_v7(), "Doc").with_prop("v", Ipld::String("base".into())),
)
.unwrap();
let repo_base = tx_base.commit("alice", "base").unwrap();
let mut tx_a = repo_base.start_transaction();
tx_a.add_node(&Node::new(NodeId::new_v7(), "Doc").with_prop("v", Ipld::String("A".into())))
.unwrap();
let repo_a = tx_a.commit("alice", "branch A").unwrap();
let a_cid = repo_a.view().heads.first().cloned().unwrap();
let mut tx_b = repo_base.start_transaction();
tx_b.add_node(&Node::new(NodeId::new_v7(), "Doc").with_prop("v", Ipld::String("B".into())))
.unwrap();
let _repo_b = tx_b.commit("alice", "branch B").unwrap();
let right_cid: Cid = {
let r = ReadonlyRepo::open(bs.clone(), ohs.clone()).unwrap();
r.view().heads.first().cloned().unwrap()
};
let outcome = merge_three_way(&bs, &ohs, a_cid, right_cid, MergeStrategy::Manual).unwrap();
match outcome {
MergeOutcome::Clean(_) | MergeOutcome::FastForward(_) => {}
MergeOutcome::Conflicts(_) => panic!("disjoint-prop branches should not conflict"),
}
}
#[test]
fn conflicted_input_flattens_into_final_conflict() {
let c0 = raw_cid(0);
let c1 = raw_cid(1);
let c2 = raw_cid(2);
let c3 = raw_cid(3);
let base = RefTarget::normal(c0.clone());
let prior_conflict = RefTarget::conflicted(vec![c1.clone(), c2.clone()], vec![c0.clone()]);
let other_change = RefTarget::normal(c3.clone());
let merged =
merge_one_ref(Some(&base), &[Some(&prior_conflict), Some(&other_change)]).unwrap();
match merged {
RefTarget::Conflicted { adds, removes } => {
let adds_set: BTreeSet<Cid> = adds.into_iter().collect();
assert_eq!(adds_set, BTreeSet::from([c1, c2, c3]));
assert_eq!(removes, vec![c0]);
}
other => panic!("expected Conflicted, got {other:?}"),
}
}
}