use std::collections::{BTreeMap, BTreeSet};
use ipld_core::ipld::Ipld;
use serde::{Deserialize, Serialize};
use crate::error::Error;
use crate::id::{Cid, NodeId};
use crate::objects::{Commit, Edge, View};
use crate::prolly::{Cursor, ProllyKey};
use crate::repo::ReadonlyRepo;
use crate::repo::readonly::decode_from_store;
use crate::store::Blockstore;
pub const MERGE_CONFLICTS_SCHEMA: &str = "mnem.v1.merge_conflicts";
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ConflictCategory {
NodeCidDivergence,
EdgePropCollision,
TombstoneVsModify,
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct EdgeKey {
#[serde(with = "nodeid_str")]
pub src: NodeId,
#[serde(with = "nodeid_str")]
pub dst: NodeId,
pub etype: String,
}
mod nodeid_str {
use super::NodeId;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::str::FromStr;
use uuid::Uuid;
pub(super) fn serialize<S: Serializer>(id: &NodeId, s: S) -> Result<S::Ok, S::Error> {
id.to_uuid_string().serialize(s)
}
pub(super) fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<NodeId, D::Error> {
let s = String::deserialize(d)?;
let u = Uuid::from_str(&s).map_err(serde::de::Error::custom)?;
Ok(NodeId::from_bytes_raw(*u.as_bytes()))
}
}
mod nodeid_str_opt {
use super::NodeId;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::str::FromStr;
use uuid::Uuid;
pub(super) fn serialize<S: Serializer>(id: &Option<NodeId>, s: S) -> Result<S::Ok, S::Error> {
match id {
Some(n) => n.to_uuid_string().serialize(s),
None => s.serialize_none(),
}
}
pub(super) fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Option<NodeId>, D::Error> {
let opt = Option::<String>::deserialize(d)?;
match opt {
None => Ok(None),
Some(s) => {
let u = Uuid::from_str(&s).map_err(serde::de::Error::custom)?;
Ok(Some(NodeId::from_bytes_raw(*u.as_bytes())))
}
}
}
}
mod cid_str {
use super::Cid;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub(super) fn serialize<S: Serializer>(c: &Cid, s: S) -> Result<S::Ok, S::Error> {
c.to_string().serialize(s)
}
pub(super) fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Cid, D::Error> {
let s = String::deserialize(d)?;
Cid::parse_str(&s).map_err(serde::de::Error::custom)
}
}
mod cid_str_opt {
use super::Cid;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub(super) fn serialize<S: Serializer>(c: &Option<Cid>, s: S) -> Result<S::Ok, S::Error> {
match c {
Some(v) => v.to_string().serialize(s),
None => s.serialize_none(),
}
}
pub(super) fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Option<Cid>, D::Error> {
let opt = Option::<String>::deserialize(d)?;
match opt {
None => Ok(None),
Some(s) => Cid::parse_str(&s)
.map(Some)
.map_err(serde::de::Error::custom),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Conflict {
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "nodeid_str_opt"
)]
pub node_id: Option<NodeId>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub edge_key: Option<EdgeKey>,
pub category: ConflictCategory,
pub left: serde_json::Value,
pub right: serde_json::Value,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub base: Option<serde_json::Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub suggested: Option<serde_json::Value>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct MergeConflicts {
pub schema: String,
#[serde(with = "cid_str")]
pub left_head: Cid,
#[serde(with = "cid_str")]
pub right_head: Cid,
#[serde(default, skip_serializing_if = "Option::is_none", with = "cid_str_opt")]
pub lca: Option<Cid>,
pub conflicts: Vec<Conflict>,
}
impl MergeConflicts {
#[must_use]
pub fn is_clean(&self) -> bool {
self.conflicts.is_empty()
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum PropTiebreak {
BranchHeadCidLex,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct ConflictPolicy {
pub tombstone_wins: bool,
pub edge_prop_tiebreak: PropTiebreak,
}
impl Default for ConflictPolicy {
fn default() -> Self {
Self {
tombstone_wins: true,
edge_prop_tiebreak: PropTiebreak::BranchHeadCidLex,
}
}
}
pub fn detect_conflicts(
repo: &ReadonlyRepo,
left: Cid,
right: Cid,
lca: Option<Cid>,
) -> Result<MergeConflicts, Error> {
detect_conflicts_with_policy(repo, left, right, lca, ConflictPolicy::default())
}
pub fn detect_conflicts_with_policy(
repo: &ReadonlyRepo,
left: Cid,
right: Cid,
lca: Option<Cid>,
policy: ConflictPolicy,
) -> Result<MergeConflicts, Error> {
let bs: &dyn Blockstore = &**repo.blockstore();
let left_commit: Commit = decode_from_store(bs, &left)?;
let right_commit: Commit = decode_from_store(bs, &right)?;
let lca_commit: Option<Commit> = match &lca {
Some(cid) => Some(decode_from_store(bs, cid)?),
None => None,
};
let left_tombstones = tombstones_for_commit(bs, &left)?;
let right_tombstones = tombstones_for_commit(bs, &right)?;
let mut conflicts = Vec::new();
let left_nodes = collect_prolly(bs, &left_commit.nodes)?;
let right_nodes = collect_prolly(bs, &right_commit.nodes)?;
let lca_nodes = match &lca_commit {
Some(c) => collect_prolly(bs, &c.nodes)?,
None => BTreeMap::new(),
};
let node_keys: BTreeSet<&ProllyKey> = left_nodes.keys().chain(right_nodes.keys()).collect();
for key in &node_keys {
let key = *key;
let l = left_nodes.get(key);
let r = right_nodes.get(key);
let base = lca_nodes.get(key);
let node_id = nodeid_from_key(key);
let left_tomb = left_tombstones.contains(&node_id);
let right_tomb = right_tombstones.contains(&node_id);
let right_modified = r.is_some() && r != base;
let left_modified = l.is_some() && l != base;
if left_tomb && right_modified && !right_tomb {
let suggested = if policy.tombstone_wins {
Some(
serde_json::json!({ "action": "tombstone", "node_id": node_id.to_uuid_string() }),
)
} else {
None
};
conflicts.push(Conflict {
node_id: Some(node_id),
edge_key: None,
category: ConflictCategory::TombstoneVsModify,
left: serde_json::json!({ "tombstoned": true }),
right: serde_json::json!({ "node_cid": r.expect("checked is_some").to_string() }),
base: base.map(|c| serde_json::json!({ "node_cid": c.to_string() })),
suggested,
});
continue;
}
if right_tomb && left_modified && !left_tomb {
let suggested = if policy.tombstone_wins {
Some(
serde_json::json!({ "action": "tombstone", "node_id": node_id.to_uuid_string() }),
)
} else {
None
};
conflicts.push(Conflict {
node_id: Some(node_id),
edge_key: None,
category: ConflictCategory::TombstoneVsModify,
left: serde_json::json!({ "node_cid": l.expect("checked is_some").to_string() }),
right: serde_json::json!({ "tombstoned": true }),
base: base.map(|c| serde_json::json!({ "node_cid": c.to_string() })),
suggested,
});
continue;
}
if let (Some(lc), Some(rc)) = (l, r) {
if lc != rc {
let left_changed = base.map_or(true, |b| b != lc);
let right_changed = base.map_or(true, |b| b != rc);
if left_changed && right_changed {
conflicts.push(Conflict {
node_id: Some(node_id),
edge_key: None,
category: ConflictCategory::NodeCidDivergence,
left: serde_json::json!({ "node_cid": lc.to_string() }),
right: serde_json::json!({ "node_cid": rc.to_string() }),
base: base.map(|c| serde_json::json!({ "node_cid": c.to_string() })),
suggested: None,
});
}
}
}
}
let left_edges = collect_edges(bs, &left_commit.edges)?;
let right_edges = collect_edges(bs, &right_commit.edges)?;
let lca_edges = match &lca_commit {
Some(c) => collect_edges(bs, &c.edges)?,
None => BTreeMap::new(),
};
let edge_keys: BTreeSet<&EdgeKey> = left_edges.keys().chain(right_edges.keys()).collect();
let left_wins_lex = match policy.edge_prop_tiebreak {
PropTiebreak::BranchHeadCidLex => left < right,
};
for key in edge_keys {
let l = left_edges.get(key);
let r = right_edges.get(key);
let base = lca_edges.get(key);
if let (Some(lp), Some(rp)) = (l, r) {
if lp == rp {
continue; }
let left_changed = base.map_or(true, |b| b != lp);
let right_changed = base.map_or(true, |b| b != rp);
if !(left_changed && right_changed) {
continue;
}
let left_json = props_to_json(lp);
let right_json = props_to_json(rp);
let base_json = base.map(props_to_json);
let suggested = Some(serde_json::json!({
"tiebreak": "branch_head_cid_lex",
"winner_side": if left_wins_lex { "left" } else { "right" },
"props": if left_wins_lex { &left_json } else { &right_json },
}));
conflicts.push(Conflict {
node_id: None,
edge_key: Some(key.clone()),
category: ConflictCategory::EdgePropCollision,
left: left_json,
right: right_json,
base: base_json,
suggested,
});
}
}
conflicts.sort_by(|a, b| {
let a_kind = u8::from(a.edge_key.is_some());
let b_kind = u8::from(b.edge_key.is_some());
a_kind
.cmp(&b_kind)
.then_with(|| a.node_id.cmp(&b.node_id))
.then_with(|| a.edge_key.cmp(&b.edge_key))
.then_with(|| (a.category as u8).cmp(&(b.category as u8)))
});
Ok(MergeConflicts {
schema: MERGE_CONFLICTS_SCHEMA.to_string(),
left_head: left,
right_head: right,
lca,
conflicts,
})
}
fn collect_prolly(bs: &dyn Blockstore, root: &Cid) -> Result<BTreeMap<ProllyKey, Cid>, Error> {
let mut out = BTreeMap::new();
let cursor = Cursor::new(bs, root)?;
for pair in cursor {
let (k, v) = pair?;
out.insert(k, v);
}
Ok(out)
}
fn collect_edges(
bs: &dyn Blockstore,
root: &Cid,
) -> Result<BTreeMap<EdgeKey, BTreeMap<String, Ipld>>, Error> {
let mut out = BTreeMap::new();
let cursor = Cursor::new(bs, root)?;
for pair in cursor {
let (_k, v) = pair?;
let edge: Edge = decode_from_store(bs, &v)?;
let key = EdgeKey {
src: edge.src,
dst: edge.dst,
etype: edge.etype.clone(),
};
out.insert(key, edge.props);
}
Ok(out)
}
fn nodeid_from_key(key: &ProllyKey) -> NodeId {
NodeId::from_bytes_raw(key.0)
}
fn tombstones_for_commit(
bs: &dyn Blockstore,
_commit_cid: &Cid,
) -> Result<BTreeSet<NodeId>, Error> {
let _ = bs;
Ok(BTreeSet::new())
}
fn props_to_json(props: &BTreeMap<String, Ipld>) -> serde_json::Value {
serde_json::to_value(props).unwrap_or(serde_json::Value::Null)
}
pub fn detect_conflicts_with_views(
repo: &ReadonlyRepo,
left: Cid,
right: Cid,
lca: Option<Cid>,
left_view: &View,
right_view: &View,
policy: ConflictPolicy,
) -> Result<MergeConflicts, Error> {
let mut mc = detect_conflicts_with_policy(repo, left.clone(), right.clone(), lca, policy)?;
let left_ts: BTreeSet<NodeId> = left_view.tombstones.keys().copied().collect();
let right_ts: BTreeSet<NodeId> = right_view.tombstones.keys().copied().collect();
for conflict in mc.conflicts.iter_mut() {
if let Some(id) = conflict.node_id {
let l_ts = left_ts.contains(&id);
let r_ts = right_ts.contains(&id);
if l_ts ^ r_ts {
conflict.category = ConflictCategory::TombstoneVsModify;
if l_ts {
conflict.left = serde_json::json!({ "tombstoned": true });
}
if r_ts {
conflict.right = serde_json::json!({ "tombstoned": true });
}
if policy.tombstone_wins {
conflict.suggested = Some(
serde_json::json!({ "action": "tombstone", "node_id": id.to_uuid_string() }),
);
}
}
}
}
let bs: &dyn Blockstore = &**repo.blockstore();
let left_commit: Commit = decode_from_store(bs, &left)?;
let right_commit: Commit = decode_from_store(bs, &right)?;
let left_nodes = collect_prolly(bs, &left_commit.nodes)?;
let right_nodes = collect_prolly(bs, &right_commit.nodes)?;
let emit_for = |id: NodeId, side_tombstone_is_left: bool| -> Option<Conflict> {
let key = ProllyKey::from(id);
let (ln, rn) = (left_nodes.get(&key), right_nodes.get(&key));
let (_ln, _rn) = (ln?, rn?);
Some(Conflict {
node_id: Some(id),
edge_key: None,
category: ConflictCategory::TombstoneVsModify,
left: if side_tombstone_is_left {
serde_json::json!({ "tombstoned": true })
} else {
serde_json::json!({ "node_cid": _ln.to_string() })
},
right: if side_tombstone_is_left {
serde_json::json!({ "node_cid": _rn.to_string() })
} else {
serde_json::json!({ "tombstoned": true })
},
base: None,
suggested: if policy.tombstone_wins {
Some(serde_json::json!({ "action": "tombstone", "node_id": id.to_uuid_string() }))
} else {
None
},
})
};
for id in left_ts.iter() {
if mc.conflicts.iter().any(|c| c.node_id == Some(*id)) {
continue;
}
if let Some(c) = emit_for(*id, true) {
mc.conflicts.push(c);
}
}
for id in right_ts.iter() {
if mc.conflicts.iter().any(|c| c.node_id == Some(*id)) {
continue;
}
if let Some(c) = emit_for(*id, false) {
mc.conflicts.push(c);
}
}
mc.conflicts.sort_by(|a, b| {
let a_kind = u8::from(a.edge_key.is_some());
let b_kind = u8::from(b.edge_key.is_some());
a_kind
.cmp(&b_kind)
.then_with(|| a.node_id.cmp(&b.node_id))
.then_with(|| a.edge_key.cmp(&b.edge_key))
.then_with(|| (a.category as u8).cmp(&(b.category as u8)))
});
Ok(mc)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::id::{EdgeId, NodeId};
use crate::objects::{Edge, Node};
use crate::repo::ReadonlyRepo;
use crate::store::{Blockstore, MemoryBlockstore, MemoryOpHeadsStore, OpHeadsStore};
use ipld_core::ipld::Ipld;
use std::sync::Arc;
fn stores() -> (Arc<dyn Blockstore>, Arc<dyn OpHeadsStore>) {
(
Arc::new(MemoryBlockstore::new()),
Arc::new(MemoryOpHeadsStore::new()),
)
}
fn nid(seed: u8) -> NodeId {
NodeId::from_bytes_raw([seed; 16])
}
fn eid(seed: u8) -> EdgeId {
EdgeId::from_bytes_raw([seed; 16])
}
fn commit_snapshot(
base: &ReadonlyRepo,
author: &str,
nodes: Vec<Node>,
edges: Vec<Edge>,
) -> (ReadonlyRepo, Cid) {
let mut tx = base.start_transaction();
for n in nodes {
tx.add_node(&n).unwrap();
}
for e in edges {
tx.add_edge(&e).unwrap();
}
let new_repo = tx.commit(author, "snap").unwrap();
let head = new_repo
.view()
.heads
.first()
.cloned()
.expect("head present");
(new_repo, head)
}
#[test]
fn clean_merge_on_disjoint_branches_has_no_conflicts() {
let (bs, ohs) = stores();
let repo0 = ReadonlyRepo::init(bs.clone(), ohs.clone()).unwrap();
let (_repo_a, head_a) = commit_snapshot(
&repo0,
"A",
vec![Node::new(nid(1), "Person").with_prop("name", Ipld::String("Alice".into()))],
vec![],
);
let (_repo_b, head_b) = commit_snapshot(
&repo0,
"B",
vec![Node::new(nid(2), "Person").with_prop("name", Ipld::String("Bob".into()))],
vec![],
);
let _ = (bs, ohs);
let mc = detect_conflicts(&_repo_a, head_a, head_b, None).unwrap();
assert!(
mc.is_clean(),
"expected no conflicts, got {:?}",
mc.conflicts
);
assert_eq!(mc.schema, MERGE_CONFLICTS_SCHEMA);
}
#[test]
fn node_cid_divergence_flagged() {
let (bs, ohs) = stores();
let repo0 = ReadonlyRepo::init(bs.clone(), ohs.clone()).unwrap();
let id = nid(7);
let (_repo_a, head_a) = commit_snapshot(
&repo0,
"A",
vec![Node::new(id, "Person").with_prop("name", Ipld::String("Alice".into()))],
vec![],
);
let (_repo_b, head_b) = commit_snapshot(
&repo0,
"B",
vec![Node::new(id, "Person").with_prop("name", Ipld::String("Alicia".into()))],
vec![],
);
let _ = (bs, ohs);
let mc = detect_conflicts(&_repo_a, head_a, head_b, None).unwrap();
assert_eq!(mc.conflicts.len(), 1);
let c = &mc.conflicts[0];
assert_eq!(c.category, ConflictCategory::NodeCidDivergence);
assert_eq!(c.node_id, Some(id));
}
#[test]
fn edge_prop_collision_deterministic_tiebreak() {
let (bs, ohs) = stores();
let repo0 = ReadonlyRepo::init(bs.clone(), ohs.clone()).unwrap();
let s = nid(1);
let d = nid(2);
let (_repo_a, head_a) = commit_snapshot(
&repo0,
"A",
vec![Node::new(s, "Person"), Node::new(d, "Person")],
vec![Edge::new(eid(10), "knows", s, d).with_prop("since", Ipld::Integer(2020))],
);
let (_repo_b, head_b) = commit_snapshot(
&repo0,
"B",
vec![Node::new(s, "Person"), Node::new(d, "Person")],
vec![Edge::new(eid(11), "knows", s, d).with_prop("since", Ipld::Integer(2021))],
);
let _ = (bs, ohs);
let mc = detect_conflicts(&_repo_a, head_a.clone(), head_b.clone(), None).unwrap();
let edge_conflicts: Vec<_> = mc
.conflicts
.iter()
.filter(|c| c.category == ConflictCategory::EdgePropCollision)
.collect();
assert_eq!(edge_conflicts.len(), 1, "got: {:?}", mc.conflicts);
let c = edge_conflicts[0];
assert_eq!(
c.edge_key,
Some(EdgeKey {
src: s,
dst: d,
etype: "knows".into()
})
);
let want_winner = if head_a < head_b { "left" } else { "right" };
let got = c.suggested.as_ref().unwrap();
assert_eq!(
got.get("winner_side").and_then(|v| v.as_str()),
Some(want_winner)
);
assert_eq!(
got.get("tiebreak").and_then(|v| v.as_str()),
Some("branch_head_cid_lex")
);
}
#[test]
fn tombstone_vs_modify_tombstone_wins_default() {
let (bs, ohs) = stores();
let repo0 = ReadonlyRepo::init(bs.clone(), ohs.clone()).unwrap();
let id = nid(5);
let (repo_seed, _) = commit_snapshot(
&repo0,
"S",
vec![Node::new(id, "Person").with_prop("name", Ipld::String("Seed".into()))],
vec![],
);
let mut tx_l = repo_seed.start_transaction();
tx_l.tombstone_node(id, "gdpr").unwrap();
let repo_l = tx_l.commit("L", "tombstone").unwrap();
let head_l = repo_l.view().heads.first().cloned().unwrap();
let view_l = repo_l.view().clone();
let (repo_r, head_r) = commit_snapshot(
&repo_seed,
"R",
vec![Node::new(id, "Person").with_prop("name", Ipld::String("Changed".into()))],
vec![],
);
let view_r = repo_r.view().clone();
let mc = detect_conflicts_with_views(
&repo_l,
head_l,
head_r,
None,
&view_l,
&view_r,
ConflictPolicy::default(),
)
.unwrap();
let tvm: Vec<_> = mc
.conflicts
.iter()
.filter(|c| c.category == ConflictCategory::TombstoneVsModify)
.collect();
assert!(
!tvm.is_empty(),
"no tombstone-vs-modify in {:?}",
mc.conflicts
);
let c = tvm[0];
assert_eq!(c.node_id, Some(id));
let s = c.suggested.as_ref().unwrap();
assert_eq!(s.get("action").and_then(|v| v.as_str()), Some("tombstone"));
}
#[test]
fn multi_category_surfaced_simultaneously() {
let (bs, ohs) = stores();
let repo0 = ReadonlyRepo::init(bs.clone(), ohs.clone()).unwrap();
let p = nid(11);
let q = nid(22);
let (_repo_a, head_a) = commit_snapshot(
&repo0,
"A",
vec![
Node::new(p, "Person").with_prop("k", Ipld::Integer(1)),
Node::new(q, "Person"),
],
vec![Edge::new(eid(1), "likes", p, q).with_prop("w", Ipld::Integer(100))],
);
let (_repo_b, head_b) = commit_snapshot(
&repo0,
"B",
vec![
Node::new(p, "Person").with_prop("k", Ipld::Integer(2)),
Node::new(q, "Person"),
],
vec![Edge::new(eid(2), "likes", p, q).with_prop("w", Ipld::Integer(200))],
);
let _ = (bs, ohs);
let mc = detect_conflicts(&_repo_a, head_a, head_b, None).unwrap();
let cats: BTreeSet<_> = mc.conflicts.iter().map(|c| c.category).collect();
assert!(cats.contains(&ConflictCategory::NodeCidDivergence));
assert!(cats.contains(&ConflictCategory::EdgePropCollision));
}
#[test]
fn json_round_trip_preserves_shape() {
let (bs, ohs) = stores();
let repo0 = ReadonlyRepo::init(bs.clone(), ohs.clone()).unwrap();
let id = nid(9);
let (_repo_a, head_a) = commit_snapshot(
&repo0,
"A",
vec![Node::new(id, "Person").with_prop("k", Ipld::String("x".into()))],
vec![],
);
let (_repo_b, head_b) = commit_snapshot(
&repo0,
"B",
vec![Node::new(id, "Person").with_prop("k", Ipld::String("y".into()))],
vec![],
);
let _ = (bs, ohs);
let mc = detect_conflicts(&_repo_a, head_a, head_b, None).unwrap();
let s = serde_json::to_string(&mc).expect("encode");
let decoded: MergeConflicts = serde_json::from_str(&s).expect("decode");
assert_eq!(mc, decoded);
}
#[test]
fn schema_constant_pinned() {
assert_eq!(MERGE_CONFLICTS_SCHEMA, "mnem.v1.merge_conflicts");
}
}