use serde::{Deserialize, Serialize};
use std::collections::{HashSet, VecDeque};
use crate::bloom::BloomFilter;
use crate::entry::{Entry, Hash};
use crate::oplog::OpLog;
const MAX_SYNC_BYTES: usize = 64 * 1024 * 1024;
const MAX_ENTRIES_PER_MESSAGE: usize = 100_000;
pub const PROTOCOL_VERSION: u32 = 1;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncOffer {
#[serde(default)]
pub protocol_version: u32,
pub heads: Vec<Hash>,
pub bloom: BloomFilter,
pub physical_ms: u64,
pub logical: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncPayload {
pub entries: Vec<Entry>,
pub need: Vec<Hash>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Snapshot {
pub entries: Vec<Entry>,
}
impl SyncOffer {
pub fn from_oplog(oplog: &OpLog, physical_ms: u64, logical: u32) -> Self {
let all = oplog.entries_since(None);
let count = all.len().max(128);
let mut bloom = BloomFilter::new(count, 0.01);
for entry in &all {
bloom.insert(&entry.hash);
}
Self {
protocol_version: PROTOCOL_VERSION,
heads: oplog.heads(),
bloom,
physical_ms,
logical,
}
}
pub fn to_bytes(&self) -> Vec<u8> {
rmp_serde::to_vec(self).expect("sync offer serialization should not fail")
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
if bytes.len() > MAX_SYNC_BYTES {
return Err(format!(
"sync offer too large: {} bytes (max {MAX_SYNC_BYTES})",
bytes.len()
));
}
let offer: Self =
rmp_serde::from_slice(bytes).map_err(|e| format!("invalid sync offer: {e}"))?;
if offer.protocol_version > PROTOCOL_VERSION {
return Err(format!(
"unsupported protocol version {} (this peer supports up to {})",
offer.protocol_version, PROTOCOL_VERSION
));
}
offer
.bloom
.validate()
.map_err(|e| format!("invalid bloom filter in sync offer: {e}"))?;
Ok(offer)
}
}
impl SyncPayload {
pub fn to_bytes(&self) -> Vec<u8> {
rmp_serde::to_vec(self).expect("sync payload serialization should not fail")
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
if bytes.len() > MAX_SYNC_BYTES {
return Err(format!(
"sync payload too large: {} bytes (max {MAX_SYNC_BYTES})",
bytes.len()
));
}
let payload: Self =
rmp_serde::from_slice(bytes).map_err(|e| format!("invalid sync payload: {e}"))?;
if payload.entries.len() > MAX_ENTRIES_PER_MESSAGE {
return Err(format!(
"too many entries in payload: {} (max {MAX_ENTRIES_PER_MESSAGE})",
payload.entries.len()
));
}
Ok(payload)
}
}
impl Snapshot {
pub fn from_oplog(oplog: &OpLog) -> Self {
let entries: Vec<Entry> = oplog.entries_since(None).into_iter().cloned().collect();
Self { entries }
}
pub fn to_bytes(&self) -> Vec<u8> {
rmp_serde::to_vec(self).expect("snapshot serialization should not fail")
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
if bytes.len() > MAX_SYNC_BYTES {
return Err(format!(
"snapshot too large: {} bytes (max {MAX_SYNC_BYTES})",
bytes.len()
));
}
let snap: Self =
rmp_serde::from_slice(bytes).map_err(|e| format!("invalid snapshot: {e}"))?;
if snap.entries.len() > MAX_ENTRIES_PER_MESSAGE {
return Err(format!(
"too many entries in snapshot: {} (max {MAX_ENTRIES_PER_MESSAGE})",
snap.entries.len()
));
}
Ok(snap)
}
}
pub fn entries_missing(oplog: &OpLog, remote_offer: &SyncOffer) -> SyncPayload {
let remote_heads_set: HashSet<Hash> = remote_offer.heads.iter().copied().collect();
let all_entries = oplog.entries_since(None);
let our_heads: HashSet<Hash> = oplog.heads().into_iter().collect();
if our_heads.is_subset(&remote_heads_set) {
let need = compute_need(oplog, &remote_offer.heads);
return SyncPayload {
entries: vec![],
need,
};
}
let mut send_set: HashSet<Hash> = HashSet::new();
for entry in &all_entries {
if !remote_offer.bloom.contains(&entry.hash) {
send_set.insert(entry.hash);
}
}
for &head in &our_heads {
if !remote_heads_set.contains(&head) {
send_set.insert(head);
}
}
{
let mut queue: VecDeque<Hash> = send_set.iter().copied().collect();
while let Some(hash) = queue.pop_front() {
if let Some(entry) = oplog.get(&hash) {
for parent_hash in &entry.next {
if !send_set.contains(parent_hash)
&& !remote_heads_set.contains(parent_hash)
&& oplog.get(parent_hash).is_some()
{
send_set.insert(*parent_hash);
queue.push_back(*parent_hash);
}
}
}
}
}
let missing: Vec<Entry> = all_entries
.into_iter()
.filter(|e| send_set.contains(&e.hash))
.cloned()
.collect();
let need = compute_need(oplog, &remote_offer.heads);
SyncPayload {
entries: missing,
need,
}
}
fn compute_need(oplog: &OpLog, remote_heads: &[Hash]) -> Vec<Hash> {
remote_heads
.iter()
.filter(|h| oplog.get(h).is_none())
.copied()
.collect()
}
pub fn merge_entries(oplog: &mut OpLog, entries: &[Entry]) -> Result<usize, String> {
let mut inserted = 0;
let mut remaining: Vec<&Entry> = entries.iter().collect();
let mut max_passes = remaining.len() + 1;
while !remaining.is_empty() && max_passes > 0 {
let mut next_remaining = Vec::new();
for entry in &remaining {
match oplog.append((*entry).clone()) {
Ok(true) => {
inserted += 1;
}
Ok(false) => {
}
Err(crate::oplog::OpLogError::MissingParent(_)) => {
next_remaining.push(*entry);
}
Err(crate::oplog::OpLogError::InvalidHash) => {
return Err(format!(
"invalid hash for entry {}",
hex::encode(entry.hash)
));
}
}
}
if next_remaining.len() == remaining.len() {
return Err(format!(
"{} entries have unresolvable parents",
remaining.len()
));
}
remaining = next_remaining;
max_passes -= 1;
}
Ok(inserted)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::clock::LamportClock;
use crate::entry::GraphOp;
use crate::ontology::{NodeTypeDef, Ontology};
use std::collections::BTreeMap;
fn test_ontology() -> Ontology {
Ontology {
node_types: BTreeMap::from([(
"entity".into(),
NodeTypeDef {
description: None,
properties: BTreeMap::new(),
subtypes: None,
parent_type: None,
},
)]),
edge_types: BTreeMap::new(),
}
}
fn genesis(author: &str) -> Entry {
Entry::new(
GraphOp::DefineOntology {
ontology: test_ontology(),
},
vec![],
vec![],
LamportClock::new(author),
author,
)
}
fn add_node_op(id: &str) -> GraphOp {
GraphOp::AddNode {
node_id: id.into(),
node_type: "entity".into(),
label: id.into(),
properties: BTreeMap::new(),
subtype: None,
}
}
fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64, author: &str) -> Entry {
Entry::new(
op,
next,
vec![],
LamportClock::with_values(author, clock_time, 0),
author,
)
}
#[test]
fn sync_offer_from_oplog() {
let g = genesis("inst-a");
let mut log = OpLog::new(g.clone());
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
log.append(e1.clone()).unwrap();
let offer = SyncOffer::from_oplog(&log, 2, 0);
assert_eq!(offer.heads, vec![e1.hash]);
assert!(offer.bloom.contains(&g.hash));
assert!(offer.bloom.contains(&e1.hash));
assert_eq!(offer.physical_ms, 2);
assert_eq!(offer.logical, 0);
}
#[test]
fn sync_offer_serialization_roundtrip() {
let g = genesis("inst-a");
let log = OpLog::new(g.clone());
let offer = SyncOffer::from_oplog(&log, 1, 0);
let bytes = offer.to_bytes();
let restored = SyncOffer::from_bytes(&bytes).unwrap();
assert_eq!(restored.heads, offer.heads);
assert_eq!(restored.physical_ms, offer.physical_ms);
assert_eq!(restored.logical, offer.logical);
assert!(restored.bloom.contains(&g.hash));
}
#[test]
fn entries_missing_detects_delta() {
let g = genesis("inst-a");
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
let mut log_a = OpLog::new(g.clone());
log_a.append(e1.clone()).unwrap();
log_a.append(e2.clone()).unwrap();
let mut log_b = OpLog::new(g.clone());
log_b.append(e1.clone()).unwrap();
let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
let payload = entries_missing(&log_a, &offer_b);
assert_eq!(payload.entries.len(), 1);
assert_eq!(payload.entries[0].hash, e2.hash);
assert!(payload.need.is_empty());
}
#[test]
fn entries_missing_nothing_when_in_sync() {
let g = genesis("inst-a");
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
let mut log_a = OpLog::new(g.clone());
log_a.append(e1.clone()).unwrap();
let mut log_b = OpLog::new(g.clone());
log_b.append(e1.clone()).unwrap();
let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
let payload = entries_missing(&log_a, &offer_b);
assert!(payload.entries.is_empty());
assert!(payload.need.is_empty());
}
#[test]
fn entries_missing_need_list_for_remote_only() {
let g = genesis("inst-a");
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-b");
let log_a = OpLog::new(g.clone());
let mut log_b = OpLog::new(g.clone());
log_b.append(e1.clone()).unwrap();
let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
let payload = entries_missing(&log_a, &offer_b);
assert_eq!(payload.need.len(), 1);
assert_eq!(payload.need[0], e1.hash);
}
#[test]
fn entries_missing_bloom_reduces_transfer() {
let g = genesis("inst-a");
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
let mut log_a = OpLog::new(g.clone());
log_a.append(e1.clone()).unwrap();
log_a.append(e2.clone()).unwrap();
let mut log_b = OpLog::new(g.clone());
log_b.append(e1.clone()).unwrap();
let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
let payload = entries_missing(&log_a, &offer_b);
assert_eq!(payload.entries.len(), 1);
assert_eq!(payload.entries[0].hash, e2.hash);
}
#[test]
fn merge_entries_basic() {
let g = genesis("inst-a");
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
let mut log_b = OpLog::new(g.clone());
let merged = merge_entries(&mut log_b, &[e1.clone(), e2.clone()]).unwrap();
assert_eq!(merged, 2);
assert_eq!(log_b.len(), 3); assert!(log_b.get(&e1.hash).is_some());
assert!(log_b.get(&e2.hash).is_some());
}
#[test]
fn merge_entries_out_of_order() {
let g = genesis("inst-a");
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
let mut log_b = OpLog::new(g.clone());
let merged = merge_entries(&mut log_b, &[e2.clone(), e1.clone()]).unwrap();
assert_eq!(merged, 2);
assert_eq!(log_b.len(), 3);
}
#[test]
fn merge_entries_duplicates_ignored() {
let g = genesis("inst-a");
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
let mut log_b = OpLog::new(g.clone());
log_b.append(e1.clone()).unwrap();
let merged = merge_entries(&mut log_b, &[e1.clone()]).unwrap();
assert_eq!(merged, 0);
assert_eq!(log_b.len(), 2);
}
#[test]
fn merge_entries_rejects_invalid_hash() {
let g = genesis("inst-a");
let mut bad = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
bad.author = "tampered".into();
let mut log_b = OpLog::new(g.clone());
let result = merge_entries(&mut log_b, &[bad]);
assert!(result.is_err());
assert!(result.unwrap_err().contains("invalid hash"));
}
#[test]
fn snapshot_roundtrip() {
let g = genesis("inst-a");
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
let mut log = OpLog::new(g.clone());
log.append(e1.clone()).unwrap();
log.append(e2.clone()).unwrap();
let snapshot = Snapshot::from_oplog(&log);
assert_eq!(snapshot.entries.len(), 3);
let bytes = snapshot.to_bytes();
let restored = Snapshot::from_bytes(&bytes).unwrap();
assert_eq!(restored.entries.len(), 3);
assert_eq!(restored.entries[0].hash, g.hash);
assert_eq!(restored.entries[1].hash, e1.hash);
assert_eq!(restored.entries[2].hash, e2.hash);
}
#[test]
fn snapshot_can_bootstrap_new_peer() {
let g = genesis("inst-a");
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
let mut log_a = OpLog::new(g.clone());
log_a.append(e1.clone()).unwrap();
log_a.append(e2.clone()).unwrap();
let snapshot = Snapshot::from_oplog(&log_a);
let genesis_entry = &snapshot.entries[0];
let mut log_b = OpLog::new(genesis_entry.clone());
let remaining = &snapshot.entries[1..];
let merged = merge_entries(&mut log_b, remaining).unwrap();
assert_eq!(merged, 2);
assert_eq!(log_b.len(), 3);
assert_eq!(log_b.heads(), log_a.heads());
}
#[test]
fn full_sync_roundtrip_a_to_b() {
let g = genesis("inst-a");
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
let mut log_a = OpLog::new(g.clone());
log_a.append(e1.clone()).unwrap();
log_a.append(e2.clone()).unwrap();
let mut log_b = OpLog::new(g.clone());
let offer_b = SyncOffer::from_oplog(&log_b, 1, 0);
let payload = entries_missing(&log_a, &offer_b);
let merged = merge_entries(&mut log_b, &payload.entries).unwrap();
assert_eq!(merged, 2);
assert_eq!(log_b.len(), 3);
assert_eq!(log_a.heads(), log_b.heads());
}
#[test]
fn full_sync_bidirectional() {
let g = genesis("inst-a");
let a1 = make_entry(add_node_op("a1"), vec![g.hash], 2, "inst-a");
let mut log_a = OpLog::new(g.clone());
log_a.append(a1.clone()).unwrap();
let b1 = make_entry(add_node_op("b1"), vec![g.hash], 2, "inst-b");
let mut log_b = OpLog::new(g.clone());
log_b.append(b1.clone()).unwrap();
let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
let payload_a_to_b = entries_missing(&log_a, &offer_b);
merge_entries(&mut log_b, &payload_a_to_b.entries).unwrap();
let offer_a = SyncOffer::from_oplog(&log_a, 2, 0);
let payload_b_to_a = entries_missing(&log_b, &offer_a);
merge_entries(&mut log_a, &payload_b_to_a.entries).unwrap();
assert_eq!(log_a.len(), 3);
assert_eq!(log_b.len(), 3);
let heads_a: HashSet<Hash> = log_a.heads().into_iter().collect();
let heads_b: HashSet<Hash> = log_b.heads().into_iter().collect();
assert_eq!(heads_a, heads_b);
assert!(heads_a.contains(&a1.hash));
assert!(heads_a.contains(&b1.hash));
}
#[test]
fn entries_missing_forces_heads_despite_bloom_fp() {
let g = genesis("inst-a");
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
let mut log_a = OpLog::new(g.clone());
log_a.append(e1.clone()).unwrap();
log_a.append(e2.clone()).unwrap();
let mut bloom = BloomFilter::new(128, 0.01);
bloom.insert(&g.hash);
bloom.insert(&e1.hash);
bloom.insert(&e2.hash);
let fake_offer = SyncOffer {
protocol_version: PROTOCOL_VERSION,
heads: vec![e1.hash], bloom,
physical_ms: 2,
logical: 0,
};
let payload = entries_missing(&log_a, &fake_offer);
let sent_hashes: HashSet<Hash> = payload.entries.iter().map(|e| e.hash).collect();
assert!(
sent_hashes.contains(&e2.hash),
"head entry must be sent even when bloom falsely contains it"
);
}
#[test]
fn entries_missing_forces_heads_with_ancestor_closure() {
let g = genesis("inst-a");
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
let e3 = make_entry(add_node_op("n3"), vec![e2.hash], 4, "inst-a");
let mut log_a = OpLog::new(g.clone());
log_a.append(e1.clone()).unwrap();
log_a.append(e2.clone()).unwrap();
log_a.append(e3.clone()).unwrap();
let mut bloom = BloomFilter::new(128, 0.01);
for entry in log_a.entries_since(None) {
bloom.insert(&entry.hash);
}
let fake_offer = SyncOffer {
protocol_version: PROTOCOL_VERSION,
heads: vec![g.hash],
bloom,
physical_ms: 1,
logical: 0,
};
let payload = entries_missing(&log_a, &fake_offer);
let sent_hashes: HashSet<Hash> = payload.entries.iter().map(|e| e.hash).collect();
assert!(
sent_hashes.contains(&e1.hash),
"e1 must be recovered by ancestor closure"
);
assert!(
sent_hashes.contains(&e2.hash),
"e2 must be recovered by ancestor closure"
);
assert!(sent_hashes.contains(&e3.hash), "e3 must be forced as head");
}
#[test]
fn sync_is_idempotent() {
let g = genesis("inst-a");
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
let mut log_a = OpLog::new(g.clone());
log_a.append(e1.clone()).unwrap();
let mut log_b = OpLog::new(g.clone());
let offer_b = SyncOffer::from_oplog(&log_b, 1, 0);
let payload = entries_missing(&log_a, &offer_b);
merge_entries(&mut log_b, &payload.entries).unwrap();
assert_eq!(log_b.len(), 2);
let offer_b2 = SyncOffer::from_oplog(&log_b, 2, 0);
let payload2 = entries_missing(&log_a, &offer_b2);
assert!(payload2.entries.is_empty());
let merged2 = merge_entries(&mut log_b, &payload2.entries).unwrap();
assert_eq!(merged2, 0);
assert_eq!(log_b.len(), 2);
}
}