1use serde::{Deserialize, Serialize};
2use std::collections::HashSet;
3
4use crate::bloom::BloomFilter;
5use crate::entry::{Entry, Hash};
6use crate::oplog::OpLog;
7
8const MAX_SYNC_BYTES: usize = 64 * 1024 * 1024;
10const MAX_ENTRIES_PER_MESSAGE: usize = 100_000;
12
13pub const PROTOCOL_VERSION: u32 = 1;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct SyncOffer {
23 #[serde(default)]
26 pub protocol_version: u32,
27 pub heads: Vec<Hash>,
29 pub bloom: BloomFilter,
31 pub physical_ms: u64,
33 pub logical: u32,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct SyncPayload {
43 pub entries: Vec<Entry>,
45 pub need: Vec<Hash>,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct Snapshot {
56 pub entries: Vec<Entry>,
58}
59
60impl SyncOffer {
61 pub fn from_oplog(oplog: &OpLog, physical_ms: u64, logical: u32) -> Self {
65 let all = oplog.entries_since(None);
66 let count = all.len().max(128);
69 let mut bloom = BloomFilter::new(count, 0.01);
70 for entry in &all {
71 bloom.insert(&entry.hash);
72 }
73 Self {
74 protocol_version: PROTOCOL_VERSION,
75 heads: oplog.heads(),
76 bloom,
77 physical_ms,
78 logical,
79 }
80 }
81
82 pub fn to_bytes(&self) -> Vec<u8> {
84 rmp_serde::to_vec(self).expect("sync offer serialization should not fail")
85 }
86
87 pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
90 if bytes.len() > MAX_SYNC_BYTES {
91 return Err(format!(
92 "sync offer too large: {} bytes (max {MAX_SYNC_BYTES})",
93 bytes.len()
94 ));
95 }
96 let offer: Self =
97 rmp_serde::from_slice(bytes).map_err(|e| format!("invalid sync offer: {e}"))?;
98 if offer.protocol_version > PROTOCOL_VERSION {
100 return Err(format!(
101 "unsupported protocol version {} (this peer supports up to {})",
102 offer.protocol_version, PROTOCOL_VERSION
103 ));
104 }
105 offer
106 .bloom
107 .validate()
108 .map_err(|e| format!("invalid bloom filter in sync offer: {e}"))?;
109 Ok(offer)
110 }
111}
112
113impl SyncPayload {
114 pub fn to_bytes(&self) -> Vec<u8> {
116 rmp_serde::to_vec(self).expect("sync payload serialization should not fail")
117 }
118
119 pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
122 if bytes.len() > MAX_SYNC_BYTES {
123 return Err(format!(
124 "sync payload too large: {} bytes (max {MAX_SYNC_BYTES})",
125 bytes.len()
126 ));
127 }
128 let payload: Self =
129 rmp_serde::from_slice(bytes).map_err(|e| format!("invalid sync payload: {e}"))?;
130 if payload.entries.len() > MAX_ENTRIES_PER_MESSAGE {
131 return Err(format!(
132 "too many entries in payload: {} (max {MAX_ENTRIES_PER_MESSAGE})",
133 payload.entries.len()
134 ));
135 }
136 Ok(payload)
137 }
138}
139
140impl Snapshot {
141 pub fn from_oplog(oplog: &OpLog) -> Self {
143 let entries: Vec<Entry> = oplog.entries_since(None).into_iter().cloned().collect();
144 Self { entries }
145 }
146
147 pub fn to_bytes(&self) -> Vec<u8> {
149 rmp_serde::to_vec(self).expect("snapshot serialization should not fail")
150 }
151
152 pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
155 if bytes.len() > MAX_SYNC_BYTES {
156 return Err(format!(
157 "snapshot too large: {} bytes (max {MAX_SYNC_BYTES})",
158 bytes.len()
159 ));
160 }
161 let snap: Self =
162 rmp_serde::from_slice(bytes).map_err(|e| format!("invalid snapshot: {e}"))?;
163 if snap.entries.len() > MAX_ENTRIES_PER_MESSAGE {
164 return Err(format!(
165 "too many entries in snapshot: {} (max {MAX_ENTRIES_PER_MESSAGE})",
166 snap.entries.len()
167 ));
168 }
169 Ok(snap)
170 }
171}
172
173pub fn entries_missing(oplog: &OpLog, remote_offer: &SyncOffer) -> SyncPayload {
186 let remote_heads_set: HashSet<Hash> = remote_offer.heads.iter().copied().collect();
187
188 let all_entries = oplog.entries_since(None);
190
191 let our_heads: HashSet<Hash> = oplog.heads().into_iter().collect();
193 if our_heads.is_subset(&remote_heads_set) {
194 let need = compute_need(oplog, &remote_offer.heads);
197 return SyncPayload {
198 entries: vec![],
199 need,
200 };
201 }
202
203 let mut send_set: HashSet<Hash> = HashSet::new();
205 for entry in &all_entries {
206 if !remote_offer.bloom.contains(&entry.hash) {
207 send_set.insert(entry.hash);
208 }
209 }
210
211 for &head in &our_heads {
217 if !remote_heads_set.contains(&head) {
218 send_set.insert(head);
219 }
220 }
221
222 let mut changed = true;
226 while changed {
227 changed = false;
228 for entry in &all_entries {
229 if !send_set.contains(&entry.hash) {
230 continue;
231 }
232 for parent_hash in &entry.next {
233 if !send_set.contains(parent_hash)
237 && !remote_heads_set.contains(parent_hash)
238 && oplog.get(parent_hash).is_some()
239 {
240 send_set.insert(*parent_hash);
243 changed = true;
244 }
245 }
246 }
247 }
248
249 let missing: Vec<Entry> = all_entries
251 .into_iter()
252 .filter(|e| send_set.contains(&e.hash))
253 .cloned()
254 .collect();
255
256 let need = compute_need(oplog, &remote_offer.heads);
258
259 SyncPayload {
260 entries: missing,
261 need,
262 }
263}
264
265fn compute_need(oplog: &OpLog, remote_heads: &[Hash]) -> Vec<Hash> {
267 remote_heads
268 .iter()
269 .filter(|h| oplog.get(h).is_none())
270 .copied()
271 .collect()
272}
273
274pub fn merge_entries(oplog: &mut OpLog, entries: &[Entry]) -> Result<usize, String> {
283 let mut inserted = 0;
284 let mut remaining: Vec<&Entry> = entries.iter().collect();
285 let mut max_passes = remaining.len() + 1;
286
287 while !remaining.is_empty() && max_passes > 0 {
288 let mut next_remaining = Vec::new();
289 for entry in &remaining {
290 match oplog.append((*entry).clone()) {
291 Ok(true) => {
292 inserted += 1;
293 }
294 Ok(false) => {
295 }
297 Err(crate::oplog::OpLogError::MissingParent(_)) => {
298 next_remaining.push(*entry);
300 }
301 Err(crate::oplog::OpLogError::InvalidHash) => {
302 return Err(format!(
303 "invalid hash for entry {}",
304 hex::encode(entry.hash)
305 ));
306 }
307 }
308 }
309 if next_remaining.len() == remaining.len() {
310 return Err(format!(
312 "{} entries have unresolvable parents",
313 remaining.len()
314 ));
315 }
316 remaining = next_remaining;
317 max_passes -= 1;
318 }
319
320 Ok(inserted)
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326 use crate::clock::LamportClock;
327 use crate::entry::GraphOp;
328 use crate::ontology::{NodeTypeDef, Ontology};
329 use std::collections::BTreeMap;
330
331 fn test_ontology() -> Ontology {
332 Ontology {
333 node_types: BTreeMap::from([(
334 "entity".into(),
335 NodeTypeDef {
336 description: None,
337 properties: BTreeMap::new(),
338 subtypes: None,
339 },
340 )]),
341 edge_types: BTreeMap::new(),
342 }
343 }
344
345 fn genesis(author: &str) -> Entry {
346 Entry::new(
347 GraphOp::DefineOntology {
348 ontology: test_ontology(),
349 },
350 vec![],
351 vec![],
352 LamportClock::new(author),
353 author,
354 )
355 }
356
357 fn add_node_op(id: &str) -> GraphOp {
358 GraphOp::AddNode {
359 node_id: id.into(),
360 node_type: "entity".into(),
361 label: id.into(),
362 properties: BTreeMap::new(),
363 subtype: None,
364 }
365 }
366
367 fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64, author: &str) -> Entry {
368 Entry::new(
369 op,
370 next,
371 vec![],
372 LamportClock::with_values(author, clock_time, 0),
373 author,
374 )
375 }
376
377 #[test]
380 fn sync_offer_from_oplog() {
381 let g = genesis("inst-a");
382 let mut log = OpLog::new(g.clone());
383 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
384 log.append(e1.clone()).unwrap();
385
386 let offer = SyncOffer::from_oplog(&log, 2, 0);
387 assert_eq!(offer.heads, vec![e1.hash]);
388 assert!(offer.bloom.contains(&g.hash));
389 assert!(offer.bloom.contains(&e1.hash));
390 assert_eq!(offer.physical_ms, 2);
391 assert_eq!(offer.logical, 0);
392 }
393
394 #[test]
395 fn sync_offer_serialization_roundtrip() {
396 let g = genesis("inst-a");
397 let log = OpLog::new(g.clone());
398 let offer = SyncOffer::from_oplog(&log, 1, 0);
399
400 let bytes = offer.to_bytes();
401 let restored = SyncOffer::from_bytes(&bytes).unwrap();
402 assert_eq!(restored.heads, offer.heads);
403 assert_eq!(restored.physical_ms, offer.physical_ms);
404 assert_eq!(restored.logical, offer.logical);
405 assert!(restored.bloom.contains(&g.hash));
406 }
407
408 #[test]
411 fn entries_missing_detects_delta() {
412 let g = genesis("inst-a");
416 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
417 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
418
419 let mut log_a = OpLog::new(g.clone());
420 log_a.append(e1.clone()).unwrap();
421 log_a.append(e2.clone()).unwrap();
422
423 let mut log_b = OpLog::new(g.clone());
424 log_b.append(e1.clone()).unwrap();
425
426 let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
427 let payload = entries_missing(&log_a, &offer_b);
428
429 assert_eq!(payload.entries.len(), 1);
430 assert_eq!(payload.entries[0].hash, e2.hash);
431 assert!(payload.need.is_empty());
432 }
433
434 #[test]
435 fn entries_missing_nothing_when_in_sync() {
436 let g = genesis("inst-a");
437 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
438
439 let mut log_a = OpLog::new(g.clone());
440 log_a.append(e1.clone()).unwrap();
441
442 let mut log_b = OpLog::new(g.clone());
443 log_b.append(e1.clone()).unwrap();
444
445 let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
446 let payload = entries_missing(&log_a, &offer_b);
447
448 assert!(payload.entries.is_empty());
449 assert!(payload.need.is_empty());
450 }
451
452 #[test]
453 fn entries_missing_need_list_for_remote_only() {
454 let g = genesis("inst-a");
457 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-b");
458
459 let log_a = OpLog::new(g.clone());
460
461 let mut log_b = OpLog::new(g.clone());
462 log_b.append(e1.clone()).unwrap();
463
464 let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
465 let payload = entries_missing(&log_a, &offer_b);
466
467 assert_eq!(payload.need.len(), 1);
471 assert_eq!(payload.need[0], e1.hash);
472 }
473
474 #[test]
475 fn entries_missing_bloom_reduces_transfer() {
476 let g = genesis("inst-a");
479 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
480 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
481
482 let mut log_a = OpLog::new(g.clone());
483 log_a.append(e1.clone()).unwrap();
484 log_a.append(e2.clone()).unwrap();
485
486 let mut log_b = OpLog::new(g.clone());
487 log_b.append(e1.clone()).unwrap();
488
489 let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
490 let payload = entries_missing(&log_a, &offer_b);
491
492 assert_eq!(payload.entries.len(), 1);
494 assert_eq!(payload.entries[0].hash, e2.hash);
495 }
496
497 #[test]
500 fn merge_entries_basic() {
501 let g = genesis("inst-a");
502 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
503 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
504
505 let mut log_b = OpLog::new(g.clone());
506 let merged = merge_entries(&mut log_b, &[e1.clone(), e2.clone()]).unwrap();
508
509 assert_eq!(merged, 2);
510 assert_eq!(log_b.len(), 3); assert!(log_b.get(&e1.hash).is_some());
512 assert!(log_b.get(&e2.hash).is_some());
513 }
514
515 #[test]
516 fn merge_entries_out_of_order() {
517 let g = genesis("inst-a");
519 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
520 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
521
522 let mut log_b = OpLog::new(g.clone());
523 let merged = merge_entries(&mut log_b, &[e2.clone(), e1.clone()]).unwrap();
525
526 assert_eq!(merged, 2);
527 assert_eq!(log_b.len(), 3);
528 }
529
530 #[test]
531 fn merge_entries_duplicates_ignored() {
532 let g = genesis("inst-a");
533 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
534
535 let mut log_b = OpLog::new(g.clone());
536 log_b.append(e1.clone()).unwrap();
537
538 let merged = merge_entries(&mut log_b, &[e1.clone()]).unwrap();
540 assert_eq!(merged, 0);
541 assert_eq!(log_b.len(), 2);
542 }
543
544 #[test]
545 fn merge_entries_rejects_invalid_hash() {
546 let g = genesis("inst-a");
547 let mut bad = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
548 bad.author = "tampered".into(); let mut log_b = OpLog::new(g.clone());
551 let result = merge_entries(&mut log_b, &[bad]);
552 assert!(result.is_err());
553 assert!(result.unwrap_err().contains("invalid hash"));
554 }
555
556 #[test]
559 fn snapshot_roundtrip() {
560 let g = genesis("inst-a");
561 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
562 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
563
564 let mut log = OpLog::new(g.clone());
565 log.append(e1.clone()).unwrap();
566 log.append(e2.clone()).unwrap();
567
568 let snapshot = Snapshot::from_oplog(&log);
569 assert_eq!(snapshot.entries.len(), 3);
570
571 let bytes = snapshot.to_bytes();
572 let restored = Snapshot::from_bytes(&bytes).unwrap();
573 assert_eq!(restored.entries.len(), 3);
574 assert_eq!(restored.entries[0].hash, g.hash);
575 assert_eq!(restored.entries[1].hash, e1.hash);
576 assert_eq!(restored.entries[2].hash, e2.hash);
577 }
578
579 #[test]
580 fn snapshot_can_bootstrap_new_peer() {
581 let g = genesis("inst-a");
583 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
584 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
585
586 let mut log_a = OpLog::new(g.clone());
587 log_a.append(e1.clone()).unwrap();
588 log_a.append(e2.clone()).unwrap();
589
590 let snapshot = Snapshot::from_oplog(&log_a);
591
592 let genesis_entry = &snapshot.entries[0];
594 let mut log_b = OpLog::new(genesis_entry.clone());
595 let remaining = &snapshot.entries[1..];
596 let merged = merge_entries(&mut log_b, remaining).unwrap();
597
598 assert_eq!(merged, 2);
599 assert_eq!(log_b.len(), 3);
600 assert_eq!(log_b.heads(), log_a.heads());
601 }
602
603 #[test]
606 fn full_sync_roundtrip_a_to_b() {
607 let g = genesis("inst-a");
609 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
610 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
611
612 let mut log_a = OpLog::new(g.clone());
613 log_a.append(e1.clone()).unwrap();
614 log_a.append(e2.clone()).unwrap();
615
616 let mut log_b = OpLog::new(g.clone());
617
618 let offer_b = SyncOffer::from_oplog(&log_b, 1, 0);
620
621 let payload = entries_missing(&log_a, &offer_b);
623
624 let merged = merge_entries(&mut log_b, &payload.entries).unwrap();
626
627 assert_eq!(merged, 2);
628 assert_eq!(log_b.len(), 3);
629 assert_eq!(log_a.heads(), log_b.heads());
630 }
631
632 #[test]
633 fn full_sync_bidirectional() {
634 let g = genesis("inst-a");
636
637 let a1 = make_entry(add_node_op("a1"), vec![g.hash], 2, "inst-a");
639 let mut log_a = OpLog::new(g.clone());
640 log_a.append(a1.clone()).unwrap();
641
642 let b1 = make_entry(add_node_op("b1"), vec![g.hash], 2, "inst-b");
644 let mut log_b = OpLog::new(g.clone());
645 log_b.append(b1.clone()).unwrap();
646
647 let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
649 let payload_a_to_b = entries_missing(&log_a, &offer_b);
650 merge_entries(&mut log_b, &payload_a_to_b.entries).unwrap();
651
652 let offer_a = SyncOffer::from_oplog(&log_a, 2, 0);
654 let payload_b_to_a = entries_missing(&log_b, &offer_a);
655 merge_entries(&mut log_a, &payload_b_to_a.entries).unwrap();
656
657 assert_eq!(log_a.len(), 3);
659 assert_eq!(log_b.len(), 3);
660
661 let heads_a: HashSet<Hash> = log_a.heads().into_iter().collect();
663 let heads_b: HashSet<Hash> = log_b.heads().into_iter().collect();
664 assert_eq!(heads_a, heads_b);
665 assert!(heads_a.contains(&a1.hash));
666 assert!(heads_a.contains(&b1.hash));
667 }
668
669 #[test]
670 fn entries_missing_forces_heads_despite_bloom_fp() {
671 let g = genesis("inst-a");
675 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
676 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
677
678 let mut log_a = OpLog::new(g.clone());
679 log_a.append(e1.clone()).unwrap();
680 log_a.append(e2.clone()).unwrap();
681
682 let mut bloom = BloomFilter::new(128, 0.01);
685 bloom.insert(&g.hash);
686 bloom.insert(&e1.hash);
687 bloom.insert(&e2.hash); let fake_offer = SyncOffer {
690 protocol_version: PROTOCOL_VERSION,
691 heads: vec![e1.hash], bloom,
693 physical_ms: 2,
694 logical: 0,
695 };
696
697 let payload = entries_missing(&log_a, &fake_offer);
698
699 let sent_hashes: HashSet<Hash> = payload.entries.iter().map(|e| e.hash).collect();
701 assert!(
702 sent_hashes.contains(&e2.hash),
703 "head entry must be sent even when bloom falsely contains it"
704 );
705 }
706
707 #[test]
708 fn entries_missing_forces_heads_with_ancestor_closure() {
709 let g = genesis("inst-a");
712 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
713 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
714 let e3 = make_entry(add_node_op("n3"), vec![e2.hash], 4, "inst-a");
715
716 let mut log_a = OpLog::new(g.clone());
717 log_a.append(e1.clone()).unwrap();
718 log_a.append(e2.clone()).unwrap();
719 log_a.append(e3.clone()).unwrap();
720
721 let mut bloom = BloomFilter::new(128, 0.01);
723 for entry in log_a.entries_since(None) {
724 bloom.insert(&entry.hash);
725 }
726
727 let fake_offer = SyncOffer {
728 protocol_version: PROTOCOL_VERSION,
729 heads: vec![g.hash],
730 bloom,
731 physical_ms: 1,
732 logical: 0,
733 };
734
735 let payload = entries_missing(&log_a, &fake_offer);
736 let sent_hashes: HashSet<Hash> = payload.entries.iter().map(|e| e.hash).collect();
737
738 assert!(
741 sent_hashes.contains(&e1.hash),
742 "e1 must be recovered by ancestor closure"
743 );
744 assert!(
745 sent_hashes.contains(&e2.hash),
746 "e2 must be recovered by ancestor closure"
747 );
748 assert!(sent_hashes.contains(&e3.hash), "e3 must be forced as head");
749 }
750
751 #[test]
752 fn sync_is_idempotent() {
753 let g = genesis("inst-a");
754 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
755
756 let mut log_a = OpLog::new(g.clone());
757 log_a.append(e1.clone()).unwrap();
758
759 let mut log_b = OpLog::new(g.clone());
760
761 let offer_b = SyncOffer::from_oplog(&log_b, 1, 0);
763 let payload = entries_missing(&log_a, &offer_b);
764 merge_entries(&mut log_b, &payload.entries).unwrap();
765 assert_eq!(log_b.len(), 2);
766
767 let offer_b2 = SyncOffer::from_oplog(&log_b, 2, 0);
769 let payload2 = entries_missing(&log_a, &offer_b2);
770 assert!(payload2.entries.is_empty());
771 let merged2 = merge_entries(&mut log_b, &payload2.entries).unwrap();
772 assert_eq!(merged2, 0);
773 assert_eq!(log_b.len(), 2);
774 }
775}