use crdt_graph::flatbuffers::simple as fb;
use crdt_graph::types::simple::{self, AddEdge, AddVertex, Graph};
use crdt_graph::types::{RemoveEdge, RemoveVertex};
use crdt_graph::{TwoPTwoPGraphError, Uuid};
fn new_id() -> Uuid {
Uuid::now_v7()
}
fn assert_converged(a: &Graph, b: &Graph) {
let pa = a.generate_petgraph();
let pb = b.generate_petgraph();
assert_eq!(
pa.node_count(),
pb.node_count(),
"vertex count mismatch: {} vs {}",
pa.node_count(),
pb.node_count()
);
assert_eq!(
pa.edge_count(),
pb.edge_count(),
"edge count mismatch: {} vs {}",
pa.edge_count(),
pb.edge_count()
);
}
#[test]
fn two_replicas_concurrent_vertex_add() {
let mut ra = Graph::new();
let mut rb = Graph::new();
let va = new_id();
let vb = new_id();
let op_a = ra.prepare(AddVertex { id: va }.into()).unwrap();
let op_b = rb.prepare(AddVertex { id: vb }.into()).unwrap();
rb.apply_downstream(op_a).unwrap();
ra.apply_downstream(op_b).unwrap();
assert!(ra.lookup_vertex(&va));
assert!(ra.lookup_vertex(&vb));
assert_converged(&ra, &rb);
}
#[test]
fn two_replicas_concurrent_edge_add_same_vertices() {
let mut ra = Graph::new();
let mut rb = Graph::new();
let v1 = new_id();
let v2 = new_id();
let ops: Vec<simple::Operation> = vec![
AddVertex { id: v1 }.into(),
AddVertex { id: v2 }.into(),
];
for op in &ops {
ra.update_operation(op.clone()).unwrap();
rb.apply_downstream(op.clone()).unwrap();
}
let ea = new_id();
let eb = new_id();
let op_a = ra
.prepare(AddEdge { id: ea, source: v1, target: v2 }.into())
.unwrap();
let op_b = rb
.prepare(AddEdge { id: eb, source: v1, target: v2 }.into())
.unwrap();
rb.apply_downstream(op_a).unwrap();
ra.apply_downstream(op_b).unwrap();
assert_eq!(ra.edge_count(), 2);
assert_converged(&ra, &rb);
}
#[test]
fn two_replicas_concurrent_add_edge_and_remove_vertex() {
let mut ra = Graph::new();
let mut rb = Graph::new();
let v1 = new_id();
let v2 = new_id();
let v3 = new_id();
for op in [
AddVertex { id: v1 }.into(),
AddVertex { id: v2 }.into(),
AddVertex { id: v3 }.into(),
] {
let broadcast: simple::Operation = ra.prepare(op).unwrap();
rb.apply_downstream(broadcast).unwrap();
}
let e1 = new_id();
let op_a = ra
.prepare(AddEdge { id: e1, source: v1, target: v2 }.into())
.unwrap();
let rv1 = new_id();
let op_b = rb
.prepare(RemoveVertex { id: rv1, add_vertex_id: v1 }.into())
.unwrap();
rb.apply_downstream(op_a).unwrap();
ra.apply_downstream(op_b).unwrap();
assert!(!ra.lookup_vertex(&v1));
assert!(!rb.lookup_vertex(&v1));
assert_eq!(ra.generate_petgraph().edge_count(), 0);
assert_converged(&ra, &rb);
}
#[test]
fn two_replicas_concurrent_remove_same_vertex() {
let mut ra = Graph::new();
let mut rb = Graph::new();
let v1 = new_id();
let op = ra.prepare(AddVertex { id: v1 }.into()).unwrap();
rb.apply_downstream(op).unwrap();
let rv_a = new_id();
let rv_b = new_id();
let op_a = ra
.prepare(RemoveVertex { id: rv_a, add_vertex_id: v1 }.into())
.unwrap();
let op_b = rb
.prepare(RemoveVertex { id: rv_b, add_vertex_id: v1 }.into())
.unwrap();
rb.apply_downstream(op_a).unwrap();
ra.apply_downstream(op_b).unwrap();
assert!(!ra.lookup_vertex(&v1));
assert_converged(&ra, &rb);
}
#[test]
fn two_replicas_concurrent_remove_same_edge() {
let mut ra = Graph::new();
let mut rb = Graph::new();
let v1 = new_id();
let v2 = new_id();
let e1 = new_id();
for op in [
AddVertex { id: v1 }.into(),
AddVertex { id: v2 }.into(),
AddEdge { id: e1, source: v1, target: v2 }.into(),
] {
let broadcast: simple::Operation = ra.prepare(op).unwrap();
rb.apply_downstream(broadcast).unwrap();
}
let re_a = new_id();
let re_b = new_id();
let op_a = ra
.prepare(RemoveEdge { id: re_a, add_edge_id: e1 }.into())
.unwrap();
let op_b = rb
.prepare(RemoveEdge { id: re_b, add_edge_id: e1 }.into())
.unwrap();
rb.apply_downstream(op_a).unwrap();
ra.apply_downstream(op_b).unwrap();
assert_eq!(ra.edge_count(), 0);
assert_converged(&ra, &rb);
}
#[test]
fn downstream_add_edge_before_vertices_succeeds() {
let mut remote = Graph::new();
let v1 = new_id();
let v2 = new_id();
let e1 = new_id();
remote
.apply_downstream(AddEdge { id: e1, source: v1, target: v2 }.into())
.unwrap();
assert_eq!(remote.generate_petgraph().edge_count(), 0);
remote
.apply_downstream(AddVertex { id: v1 }.into())
.unwrap();
remote
.apply_downstream(AddVertex { id: v2 }.into())
.unwrap();
assert_eq!(remote.generate_petgraph().node_count(), 2);
assert_eq!(remote.generate_petgraph().edge_count(), 1);
}
#[test]
fn downstream_remove_vertex_before_add_vertex_fails() {
let mut remote = Graph::new();
let v1 = new_id();
let rv1 = new_id();
let err = remote
.apply_downstream(RemoveVertex { id: rv1, add_vertex_id: v1 }.into())
.unwrap_err();
assert!(matches!(err, TwoPTwoPGraphError::AddVertexNotDelivered(_)));
remote
.apply_downstream(AddVertex { id: v1 }.into())
.unwrap();
remote
.apply_downstream(RemoveVertex { id: rv1, add_vertex_id: v1 }.into())
.unwrap();
assert!(!remote.lookup_vertex(&v1));
}
#[test]
fn downstream_remove_edge_before_add_edge_fails() {
let mut remote = Graph::new();
let e1 = new_id();
let re1 = new_id();
let err = remote
.apply_downstream(RemoveEdge { id: re1, add_edge_id: e1 }.into())
.unwrap_err();
assert!(matches!(err, TwoPTwoPGraphError::AddEdgeNotDelivered(_)));
let v1 = new_id();
let v2 = new_id();
remote
.apply_downstream(AddEdge { id: e1, source: v1, target: v2 }.into())
.unwrap();
remote
.apply_downstream(RemoveEdge { id: re1, add_edge_id: e1 }.into())
.unwrap();
}
#[test]
fn reverse_delivery_order() {
let mut origin = Graph::new();
let mut remote = Graph::new();
let v1 = new_id();
let v2 = new_id();
let e1 = new_id();
let re1 = new_id();
let rv1 = new_id();
let ops: Vec<simple::Operation> = vec![
origin.prepare(AddVertex { id: v1 }.into()).unwrap(),
origin.prepare(AddVertex { id: v2 }.into()).unwrap(),
origin.prepare(AddEdge { id: e1, source: v1, target: v2 }.into()).unwrap(),
origin.prepare(RemoveEdge { id: re1, add_edge_id: e1 }.into()).unwrap(),
origin.prepare(RemoveVertex { id: rv1, add_vertex_id: v1 }.into()).unwrap(),
];
let mut pending: Vec<simple::Operation> = ops.into_iter().rev().collect();
let mut rounds = 0;
while !pending.is_empty() {
let mut still_pending = Vec::new();
for op in pending {
if remote.apply_downstream(op.clone()).is_err() {
still_pending.push(op);
}
}
pending = still_pending;
rounds += 1;
assert!(rounds <= 10, "delivery did not converge within 10 rounds");
}
assert_converged(&origin, &remote);
}
#[test]
fn three_replicas_independent_edits_converge() {
let mut ra = Graph::new();
let mut rb = Graph::new();
let mut rc = Graph::new();
let v1 = new_id();
let v2 = new_id();
let v3 = new_id();
for op in [
AddVertex { id: v1 }.into(),
AddVertex { id: v2 }.into(),
AddVertex { id: v3 }.into(),
] {
let broadcast: simple::Operation = ra.prepare(op).unwrap();
rb.apply_downstream(broadcast.clone()).unwrap();
rc.apply_downstream(broadcast).unwrap();
}
let ea = new_id();
let eb = new_id();
let ec = new_id();
let op_a = ra
.prepare(AddEdge { id: ea, source: v1, target: v2 }.into())
.unwrap();
let op_b = rb
.prepare(AddEdge { id: eb, source: v2, target: v3 }.into())
.unwrap();
let op_c = rc
.prepare(AddEdge { id: ec, source: v3, target: v1 }.into())
.unwrap();
rb.apply_downstream(op_a.clone()).unwrap();
rc.apply_downstream(op_a).unwrap();
ra.apply_downstream(op_b.clone()).unwrap();
rc.apply_downstream(op_b).unwrap();
ra.apply_downstream(op_c.clone()).unwrap();
rb.apply_downstream(op_c).unwrap();
assert_eq!(ra.edge_count(), 3);
assert_converged(&ra, &rb);
assert_converged(&rb, &rc);
}
#[test]
fn three_replicas_different_delivery_order_converge() {
let mut ra = Graph::new();
let mut rb = Graph::new();
let mut rc = Graph::new();
let v1 = new_id();
let v2 = new_id();
let v3 = new_id();
let e1 = new_id();
let e2 = new_id();
let ops: Vec<simple::Operation> = vec![
ra.prepare(AddVertex { id: v1 }.into()).unwrap(),
ra.prepare(AddVertex { id: v2 }.into()).unwrap(),
ra.prepare(AddVertex { id: v3 }.into()).unwrap(),
ra.prepare(AddEdge { id: e1, source: v1, target: v2 }.into()).unwrap(),
ra.prepare(AddEdge { id: e2, source: v2, target: v3 }.into()).unwrap(),
];
for op in &ops {
rb.apply_downstream(op.clone()).unwrap();
}
rc.apply_downstream(ops[2].clone()).unwrap(); rc.apply_downstream(ops[0].clone()).unwrap(); rc.apply_downstream(ops[1].clone()).unwrap(); rc.apply_downstream(ops[3].clone()).unwrap(); rc.apply_downstream(ops[4].clone()).unwrap();
assert_converged(&ra, &rb);
assert_converged(&rb, &rc);
}
#[test]
fn network_partition_bulk_sync() {
let mut ra = Graph::new();
let mut rb = Graph::new();
let v1 = new_id();
let v2 = new_id();
let v3 = new_id();
for op in [
AddVertex { id: v1 }.into(),
AddVertex { id: v2 }.into(),
] {
let broadcast: simple::Operation = ra.prepare(op).unwrap();
rb.apply_downstream(broadcast).unwrap();
}
let mut a_ops = Vec::new();
a_ops.push(ra.prepare(AddVertex { id: v3 }.into()).unwrap());
let e1 = new_id();
a_ops.push(
ra.prepare(AddEdge { id: e1, source: v1, target: v3 }.into())
.unwrap(),
);
let mut b_ops = Vec::new();
let e2 = new_id();
b_ops.push(
rb.prepare(AddEdge { id: e2, source: v1, target: v2 }.into())
.unwrap(),
);
let re2 = new_id();
b_ops.push(
rb.prepare(RemoveEdge { id: re2, add_edge_id: e2 }.into())
.unwrap(),
);
let rv2 = new_id();
b_ops.push(
rb.prepare(RemoveVertex { id: rv2, add_vertex_id: v2 }.into())
.unwrap(),
);
for op in &a_ops {
rb.apply_downstream(op.clone()).unwrap();
}
for op in &b_ops {
ra.apply_downstream(op.clone()).unwrap();
}
assert!(ra.lookup_vertex(&v1));
assert!(!ra.lookup_vertex(&v2));
assert!(ra.lookup_vertex(&v3));
assert_eq!(ra.vertex_count(), 2);
assert_eq!(ra.edge_count(), 1);
assert_converged(&ra, &rb);
}
#[test]
fn partition_concurrent_edge_add_and_vertex_remove() {
let mut ra = Graph::new();
let mut rb = Graph::new();
let v1 = new_id();
let v2 = new_id();
for op in [
AddVertex { id: v1 }.into(),
AddVertex { id: v2 }.into(),
] {
let broadcast: simple::Operation = ra.prepare(op).unwrap();
rb.apply_downstream(broadcast).unwrap();
}
let e1 = new_id();
let op_a = ra
.prepare(AddEdge { id: e1, source: v2, target: v1 }.into())
.unwrap();
let rv1 = new_id();
let op_b = rb
.prepare(RemoveVertex { id: rv1, add_vertex_id: v1 }.into())
.unwrap();
rb.apply_downstream(op_a).unwrap(); ra.apply_downstream(op_b).unwrap();
assert!(!ra.lookup_vertex(&v1));
assert!(ra.lookup_vertex(&v2));
assert_eq!(ra.generate_petgraph().edge_count(), 0); assert_converged(&ra, &rb);
}
#[test]
fn four_replicas_diamond_topology() {
let mut ra = Graph::new();
let mut rb = Graph::new();
let mut rc = Graph::new();
let mut rd = Graph::new();
let v1 = new_id();
let v2 = new_id();
let op1 = ra.prepare(AddVertex { id: v1 }.into()).unwrap();
let op2 = ra.prepare(AddVertex { id: v2 }.into()).unwrap();
rb.apply_downstream(op1.clone()).unwrap();
rb.apply_downstream(op2.clone()).unwrap();
rc.apply_downstream(op1).unwrap();
rc.apply_downstream(op2).unwrap();
let eb = new_id();
let ec = new_id();
let op_b = rb
.prepare(AddEdge { id: eb, source: v1, target: v2 }.into())
.unwrap();
let op_c = rc
.prepare(AddEdge { id: ec, source: v2, target: v1 }.into())
.unwrap();
rd.apply_downstream(AddVertex { id: v1 }.into()).unwrap();
rd.apply_downstream(AddVertex { id: v2 }.into()).unwrap();
rd.apply_downstream(op_b.clone()).unwrap();
rd.apply_downstream(op_c.clone()).unwrap();
rc.apply_downstream(op_b).unwrap();
rb.apply_downstream(op_c).unwrap();
let all_edges_for_a: Vec<simple::Operation> = vec![
AddEdge { id: eb, source: v1, target: v2 }.into(),
AddEdge { id: ec, source: v2, target: v1 }.into(),
];
for op in all_edges_for_a {
ra.apply_downstream(op).unwrap();
}
assert_eq!(rd.vertex_count(), 2);
assert_eq!(rd.edge_count(), 2);
assert_converged(&ra, &rb);
assert_converged(&rb, &rc);
assert_converged(&rc, &rd);
}
#[test]
fn multi_replica_sync_via_flatbuffers() {
let mut ra = Graph::new();
let mut rb = Graph::new();
let mut rc = Graph::new();
let v1 = new_id();
let v2 = new_id();
let v3 = new_id();
let e1 = new_id();
let ops_a: Vec<simple::Operation> = vec![
ra.prepare(AddVertex { id: v1 }.into()).unwrap(),
ra.prepare(AddVertex { id: v2 }.into()).unwrap(),
ra.prepare(AddVertex { id: v3 }.into()).unwrap(),
ra.prepare(AddEdge { id: e1, source: v1, target: v2 }.into()).unwrap(),
];
let wire_ab = fb::encode_operation_log(&ops_a);
for op in fb::decode_operation_log(&wire_ab).unwrap() {
rb.apply_downstream(op).unwrap();
}
let e2 = new_id();
let op_b = rb
.prepare(AddEdge { id: e2, source: v2, target: v3 }.into())
.unwrap();
let mut all_for_c = ops_a.clone();
all_for_c.push(op_b.clone());
let wire_c = fb::encode_operation_log(&all_for_c);
for op in fb::decode_operation_log(&wire_c).unwrap() {
rc.apply_downstream(op).unwrap();
}
ra.apply_downstream(op_b).unwrap();
assert_eq!(ra.edge_count(), 2);
assert_converged(&ra, &rb);
assert_converged(&rb, &rc);
}
#[test]
fn many_concurrent_operations_three_replicas() {
let mut ra = Graph::new();
let mut rb = Graph::new();
let mut rc = Graph::new();
let mut a_ops = Vec::new();
let mut b_ops = Vec::new();
let mut c_ops = Vec::new();
let mut a_verts = Vec::new();
let mut b_verts = Vec::new();
let mut c_verts = Vec::new();
for _ in 0..10 {
let va = new_id();
a_verts.push(va);
a_ops.push(ra.prepare(AddVertex { id: va }.into()).unwrap());
let vb = new_id();
b_verts.push(vb);
b_ops.push(rb.prepare(AddVertex { id: vb }.into()).unwrap());
let vc = new_id();
c_verts.push(vc);
c_ops.push(rc.prepare(AddVertex { id: vc }.into()).unwrap());
}
for op in &a_ops {
rb.apply_downstream(op.clone()).unwrap();
rc.apply_downstream(op.clone()).unwrap();
}
for op in &b_ops {
ra.apply_downstream(op.clone()).unwrap();
rc.apply_downstream(op.clone()).unwrap();
}
for op in &c_ops {
ra.apply_downstream(op.clone()).unwrap();
rb.apply_downstream(op.clone()).unwrap();
}
assert_eq!(ra.vertex_count(), 30);
assert_converged(&ra, &rb);
assert_converged(&rb, &rc);
let mut edge_ops_a = Vec::new();
let mut edge_ops_b = Vec::new();
for i in 0..5 {
let e = new_id();
edge_ops_a.push(
ra.prepare(
AddEdge { id: e, source: a_verts[i], target: b_verts[i] }.into(),
)
.unwrap(),
);
let e = new_id();
edge_ops_b.push(
rb.prepare(
AddEdge { id: e, source: b_verts[i], target: c_verts[i] }.into(),
)
.unwrap(),
);
}
for op in &edge_ops_a {
rb.apply_downstream(op.clone()).unwrap();
rc.apply_downstream(op.clone()).unwrap();
}
for op in &edge_ops_b {
ra.apply_downstream(op.clone()).unwrap();
rc.apply_downstream(op.clone()).unwrap();
}
assert_eq!(ra.vertex_count(), 30);
assert_eq!(ra.edge_count(), 10);
assert_converged(&ra, &rb);
assert_converged(&rb, &rc);
}
#[test]
fn concurrent_remove_both_endpoints_of_edge() {
let mut ra = Graph::new();
let mut rb = Graph::new();
let v1 = new_id();
let v2 = new_id();
let e1 = new_id();
for op in [
AddVertex { id: v1 }.into(),
AddVertex { id: v2 }.into(),
AddEdge { id: e1, source: v1, target: v2 }.into(),
] {
let broadcast: simple::Operation = ra.prepare(op).unwrap();
rb.apply_downstream(broadcast).unwrap();
}
let re1 = new_id();
let rv1 = new_id();
let ops_a: Vec<simple::Operation> = vec![
ra.prepare(RemoveEdge { id: re1, add_edge_id: e1 }.into()).unwrap(),
ra.prepare(RemoveVertex { id: rv1, add_vertex_id: v1 }.into()).unwrap(),
];
let re1b = new_id();
let rv2 = new_id();
let ops_b: Vec<simple::Operation> = vec![
rb.prepare(RemoveEdge { id: re1b, add_edge_id: e1 }.into()).unwrap(),
rb.prepare(RemoveVertex { id: rv2, add_vertex_id: v2 }.into()).unwrap(),
];
for op in &ops_a {
rb.apply_downstream(op.clone()).unwrap();
}
for op in &ops_b {
ra.apply_downstream(op.clone()).unwrap();
}
assert!(!ra.lookup_vertex(&v1));
assert!(!ra.lookup_vertex(&v2));
assert_eq!(ra.vertex_count(), 0);
assert_eq!(ra.edge_count(), 0);
assert_converged(&ra, &rb);
}
#[test]
fn self_loop_concurrent_vertex_remove() {
let mut ra = Graph::new();
let mut rb = Graph::new();
let v1 = new_id();
let op = ra.prepare(AddVertex { id: v1 }.into()).unwrap();
rb.apply_downstream(op).unwrap();
let e1 = new_id();
let op_a = ra
.prepare(AddEdge { id: e1, source: v1, target: v1 }.into())
.unwrap();
let rv1 = new_id();
let op_b = rb
.prepare(RemoveVertex { id: rv1, add_vertex_id: v1 }.into())
.unwrap();
rb.apply_downstream(op_a).unwrap();
ra.apply_downstream(op_b).unwrap();
assert!(!ra.lookup_vertex(&v1));
assert_eq!(ra.generate_petgraph().edge_count(), 0);
assert_converged(&ra, &rb);
}
#[test]
fn three_replicas_interleaved_add_remove() {
let mut ra = Graph::new();
let mut rb = Graph::new();
let mut rc = Graph::new();
let v1 = new_id();
let v2 = new_id();
let op1 = ra.prepare(AddVertex { id: v1 }.into()).unwrap();
let op2 = ra.prepare(AddVertex { id: v2 }.into()).unwrap();
for op in [&op1, &op2] {
rb.apply_downstream(op.clone()).unwrap();
rc.apply_downstream(op.clone()).unwrap();
}
let e1 = new_id();
let op3 = rb
.prepare(AddEdge { id: e1, source: v1, target: v2 }.into())
.unwrap();
ra.apply_downstream(op3.clone()).unwrap();
rc.apply_downstream(op3.clone()).unwrap();
let re1 = new_id();
let op4 = rc
.prepare(RemoveEdge { id: re1, add_edge_id: e1 }.into())
.unwrap();
ra.apply_downstream(op4.clone()).unwrap();
rb.apply_downstream(op4.clone()).unwrap();
let mut rd = Graph::new();
rd.apply_downstream(op3).unwrap(); rd.apply_downstream(op4).unwrap();
rd.apply_downstream(op2).unwrap(); rd.apply_downstream(op1).unwrap();
assert_eq!(rd.vertex_count(), 2);
assert_eq!(rd.edge_count(), 0);
assert_converged(&ra, &rb);
assert_converged(&rb, &rc);
assert_converged(&rc, &rd);
}