use std::collections::{BTreeMap, HashSet};
use silk::clock::LamportClock;
use silk::entry::{Entry, GraphOp, Hash, Value};
use silk::graph::MaterializedGraph;
use silk::ontology::{EdgeTypeDef, NodeTypeDef, Ontology};
use silk::oplog::OpLog;
use silk::sync::{entries_missing, merge_entries, Snapshot, SyncOffer};
fn test_ontology() -> Ontology {
Ontology {
node_types: BTreeMap::from([
(
"entity".into(),
NodeTypeDef {
description: None,
properties: BTreeMap::new(),
subtypes: None,
parent_type: None,
},
),
(
"signal".into(),
NodeTypeDef {
description: None,
properties: BTreeMap::new(),
subtypes: None,
parent_type: None,
},
),
]),
edge_types: BTreeMap::from([(
"CONNECTS".into(),
EdgeTypeDef {
description: None,
source_types: vec!["entity".into()],
target_types: vec!["entity".into()],
properties: BTreeMap::new(),
},
)]),
}
}
fn genesis(author: &str) -> Entry {
Entry::new(
GraphOp::DefineOntology {
ontology: test_ontology(),
},
vec![],
vec![],
LamportClock::new(author),
author,
)
}
struct Peer {
oplog: OpLog,
graph: MaterializedGraph,
clock: LamportClock,
}
impl Peer {
fn new(id: &str, g: &Entry) -> Self {
let mut graph = MaterializedGraph::new(test_ontology());
graph.apply(g);
Self {
oplog: OpLog::new(g.clone()),
graph,
clock: LamportClock::new(id),
}
}
fn add_node(&mut self, node_id: &str, props: BTreeMap<String, Value>) -> Hash {
self.clock.tick();
let entry = Entry::new(
GraphOp::AddNode {
node_id: node_id.into(),
node_type: "entity".into(),
label: node_id.into(),
properties: props,
subtype: None,
},
self.oplog.heads(),
vec![],
self.clock.clone(),
&self.clock.id,
);
let hash = entry.hash;
self.graph.apply(&entry);
self.oplog.append(entry).unwrap();
hash
}
fn add_edge(&mut self, edge_id: &str, src: &str, tgt: &str) -> Hash {
self.clock.tick();
let entry = Entry::new(
GraphOp::AddEdge {
edge_id: edge_id.into(),
edge_type: "CONNECTS".into(),
source_id: src.into(),
target_id: tgt.into(),
properties: BTreeMap::new(),
},
self.oplog.heads(),
vec![],
self.clock.clone(),
&self.clock.id,
);
let hash = entry.hash;
self.graph.apply(&entry);
self.oplog.append(entry).unwrap();
hash
}
fn update_property(&mut self, entity_id: &str, key: &str, value: Value) -> Hash {
self.clock.tick();
let entry = Entry::new(
GraphOp::UpdateProperty {
entity_id: entity_id.into(),
key: key.into(),
value,
},
self.oplog.heads(),
vec![],
self.clock.clone(),
&self.clock.id,
);
let hash = entry.hash;
self.graph.apply(&entry);
self.oplog.append(entry).unwrap();
hash
}
fn remove_node(&mut self, node_id: &str) -> Hash {
self.clock.tick();
let entry = Entry::new(
GraphOp::RemoveNode {
node_id: node_id.into(),
},
self.oplog.heads(),
vec![],
self.clock.clone(),
&self.clock.id,
);
let hash = entry.hash;
self.graph.apply(&entry);
self.oplog.append(entry).unwrap();
hash
}
fn len(&self) -> usize {
self.oplog.len()
}
fn heads(&self) -> Vec<Hash> {
self.oplog.heads()
}
fn sync_to(&self, other: &mut Peer) {
let offer =
SyncOffer::from_oplog(&other.oplog, other.clock.physical_ms, other.clock.logical);
let payload = entries_missing(&self.oplog, &offer);
if !payload.entries.is_empty() {
let merged = merge_entries(&mut other.oplog, &payload.entries).unwrap();
if merged > 0 {
other.rebuild_graph();
for entry in &payload.entries {
other.clock.merge(&entry.clock);
}
}
}
}
fn sync_bidi(a: &mut Peer, b: &mut Peer) {
let offer_b = SyncOffer::from_oplog(&b.oplog, b.clock.physical_ms, b.clock.logical);
let payload_for_b = entries_missing(&a.oplog, &offer_b);
if !payload_for_b.entries.is_empty() {
let merged = merge_entries(&mut b.oplog, &payload_for_b.entries).unwrap();
if merged > 0 {
b.rebuild_graph();
for entry in &payload_for_b.entries {
b.clock.merge(&entry.clock);
}
}
}
let offer_a = SyncOffer::from_oplog(&a.oplog, a.clock.physical_ms, a.clock.logical);
let payload_for_a = entries_missing(&b.oplog, &offer_a);
if !payload_for_a.entries.is_empty() {
let merged = merge_entries(&mut a.oplog, &payload_for_a.entries).unwrap();
if merged > 0 {
a.rebuild_graph();
for entry in &payload_for_a.entries {
a.clock.merge(&entry.clock);
}
}
}
}
fn rebuild_graph(&mut self) {
let all = self.oplog.entries_since(None);
let refs: Vec<&Entry> = all.iter().copied().collect();
self.graph.rebuild(&refs);
}
fn live_node_ids(&self) -> HashSet<String> {
self.graph
.all_nodes()
.iter()
.map(|n| n.node_id.clone())
.collect()
}
fn live_edge_ids(&self) -> HashSet<String> {
self.graph
.all_edges()
.iter()
.map(|e| e.edge_id.clone())
.collect()
}
}
#[test]
fn partition_heal_three_peers() {
let g = genesis("inst-a");
let mut a = Peer::new("inst-a", &g);
let mut b = Peer::new("inst-b", &g);
let mut c = Peer::new("inst-c", &g);
a.add_node("shared-1", BTreeMap::new());
a.sync_to(&mut b);
a.sync_to(&mut c);
assert_eq!(a.len(), b.len());
assert_eq!(a.len(), c.len());
a.add_node("from-a", BTreeMap::new());
Peer::sync_bidi(&mut a, &mut b);
b.add_node("from-b", BTreeMap::new());
Peer::sync_bidi(&mut a, &mut b);
c.add_node("from-c", BTreeMap::new());
assert!(a.graph.get_node("from-a").is_some());
assert!(a.graph.get_node("from-b").is_some());
assert!(a.graph.get_node("from-c").is_none());
assert!(b.graph.get_node("from-a").is_some());
assert!(b.graph.get_node("from-b").is_some());
assert!(c.graph.get_node("from-c").is_some());
assert!(c.graph.get_node("from-a").is_none());
Peer::sync_bidi(&mut a, &mut c);
Peer::sync_bidi(&mut b, &mut c);
Peer::sync_bidi(&mut a, &mut b);
let expected_nodes: HashSet<String> = ["shared-1", "from-a", "from-b", "from-c"]
.iter()
.map(|s| s.to_string())
.collect();
assert_eq!(a.live_node_ids(), expected_nodes, "A missing nodes");
assert_eq!(b.live_node_ids(), expected_nodes, "B missing nodes");
assert_eq!(c.live_node_ids(), expected_nodes, "C missing nodes");
let heads_a: HashSet<Hash> = a.heads().into_iter().collect();
let heads_b: HashSet<Hash> = b.heads().into_iter().collect();
let heads_c: HashSet<Hash> = c.heads().into_iter().collect();
assert_eq!(heads_a, heads_b, "A and B heads diverge");
assert_eq!(heads_b, heads_c, "B and C heads diverge");
}
#[test]
fn partition_heal_conflicting_property_updates() {
let g = genesis("inst-a");
let mut a = Peer::new("inst-a", &g);
let mut c = Peer::new("inst-c", &g);
a.add_node("s1", BTreeMap::new());
a.sync_to(&mut c);
a.update_property("s1", "status", Value::String("alive".into()));
c.update_property("s1", "status", Value::String("dead".into()));
Peer::sync_bidi(&mut a, &mut c);
let val_a = a
.graph
.get_node("s1")
.unwrap()
.properties
.get("status")
.unwrap()
.clone();
let val_c = c
.graph
.get_node("s1")
.unwrap()
.properties
.get("status")
.unwrap()
.clone();
assert_eq!(val_a, val_c, "LWW did not converge");
}
#[test]
fn partition_heal_add_wins_across_partition() {
let g = genesis("inst-a");
let mut a = Peer::new("inst-a", &g);
let mut c = Peer::new("inst-c", &g);
a.add_node("s1", BTreeMap::new());
a.sync_to(&mut c);
a.remove_node("s1");
c.add_node(
"s1",
BTreeMap::from([("revived".into(), Value::Bool(true))]),
);
Peer::sync_bidi(&mut a, &mut c);
assert!(
a.graph.get_node("s1").is_some(),
"add-wins failed on A after heal"
);
assert!(
c.graph.get_node("s1").is_some(),
"add-wins failed on C after heal"
);
}
#[test]
fn partition_heal_edges_reconnect() {
let g = genesis("inst-a");
let mut a = Peer::new("inst-a", &g);
let mut c = Peer::new("inst-c", &g);
a.add_node("n1", BTreeMap::new());
a.add_node("n2", BTreeMap::new());
a.add_node("n3", BTreeMap::new());
a.sync_to(&mut c);
a.add_edge("e-ab", "n1", "n2");
c.add_edge("e-bc", "n2", "n3");
Peer::sync_bidi(&mut a, &mut c);
let expected_edges: HashSet<String> = ["e-ab", "e-bc"].iter().map(|s| s.to_string()).collect();
assert_eq!(a.live_edge_ids(), expected_edges, "A missing edges");
assert_eq!(c.live_edge_ids(), expected_edges, "C missing edges");
}
#[test]
fn snapshot_bootstrap_then_delta() {
let g = genesis("inst-a");
let mut a = Peer::new("inst-a", &g);
a.add_node("n1", BTreeMap::new());
a.add_node("n2", BTreeMap::new());
a.add_edge("e1", "n1", "n2");
let snap = Snapshot::from_oplog(&a.oplog);
let mut b = Peer::new("inst-b", &g);
merge_entries(&mut b.oplog, &snap.entries[1..]).unwrap(); b.rebuild_graph();
assert_eq!(a.len(), b.len());
assert_eq!(a.live_node_ids(), b.live_node_ids());
a.add_node("n3", BTreeMap::new());
a.add_edge("e2", "n2", "n3");
a.sync_to(&mut b);
assert_eq!(a.len(), b.len());
assert!(b.graph.get_node("n3").is_some());
assert!(b.graph.get_edge("e2").is_some());
}
#[test]
fn ring_topology_convergence() {
let g = genesis("inst-a");
let mut a = Peer::new("inst-a", &g);
let mut b = Peer::new("inst-b", &g);
let mut c = Peer::new("inst-c", &g);
let mut d = Peer::new("inst-d", &g);
a.add_node("from-a", BTreeMap::new());
b.add_node("from-b", BTreeMap::new());
c.add_node("from-c", BTreeMap::new());
d.add_node("from-d", BTreeMap::new());
a.sync_to(&mut b);
b.sync_to(&mut c);
c.sync_to(&mut d);
d.sync_to(&mut a);
a.sync_to(&mut b);
b.sync_to(&mut c);
c.sync_to(&mut d);
d.sync_to(&mut a);
let expected: HashSet<String> = ["from-a", "from-b", "from-c", "from-d"]
.iter()
.map(|s| s.to_string())
.collect();
assert_eq!(a.live_node_ids(), expected, "A");
assert_eq!(b.live_node_ids(), expected, "B");
assert_eq!(c.live_node_ids(), expected, "C");
assert_eq!(d.live_node_ids(), expected, "D");
}