1use serde::{Deserialize, Serialize};
2use std::collections::{HashSet, VecDeque};
3
4use crate::bloom::BloomFilter;
5use crate::entry::{Entry, Hash};
6use crate::oplog::OpLog;
7
8const MAX_SYNC_BYTES: usize = 64 * 1024 * 1024;
10const MAX_ENTRIES_PER_MESSAGE: usize = 100_000;
12
13pub const PROTOCOL_VERSION: u32 = 1;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct SyncOffer {
23 #[serde(default)]
26 pub protocol_version: u32,
27 pub heads: Vec<Hash>,
29 pub bloom: BloomFilter,
31 pub physical_ms: u64,
33 pub logical: u32,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct SyncPayload {
43 pub entries: Vec<Entry>,
45 pub need: Vec<Hash>,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct Snapshot {
56 pub entries: Vec<Entry>,
58}
59
60impl SyncOffer {
61 pub fn from_oplog(oplog: &OpLog, physical_ms: u64, logical: u32) -> Self {
65 let all = oplog.entries_since(None);
66 let count = all.len().max(128);
69 let mut bloom = BloomFilter::new(count, 0.01);
70 for entry in &all {
71 bloom.insert(&entry.hash);
72 }
73 Self {
74 protocol_version: PROTOCOL_VERSION,
75 heads: oplog.heads(),
76 bloom,
77 physical_ms,
78 logical,
79 }
80 }
81
82 pub fn to_bytes(&self) -> Vec<u8> {
84 rmp_serde::to_vec(self).expect("sync offer serialization should not fail")
85 }
86
87 pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
90 if bytes.len() > MAX_SYNC_BYTES {
91 return Err(format!(
92 "sync offer too large: {} bytes (max {MAX_SYNC_BYTES})",
93 bytes.len()
94 ));
95 }
96 let offer: Self =
97 rmp_serde::from_slice(bytes).map_err(|e| format!("invalid sync offer: {e}"))?;
98 if offer.protocol_version > PROTOCOL_VERSION {
100 return Err(format!(
101 "unsupported protocol version {} (this peer supports up to {})",
102 offer.protocol_version, PROTOCOL_VERSION
103 ));
104 }
105 offer
106 .bloom
107 .validate()
108 .map_err(|e| format!("invalid bloom filter in sync offer: {e}"))?;
109 Ok(offer)
110 }
111}
112
113impl SyncPayload {
114 pub fn to_bytes(&self) -> Vec<u8> {
116 rmp_serde::to_vec(self).expect("sync payload serialization should not fail")
117 }
118
119 pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
122 if bytes.len() > MAX_SYNC_BYTES {
123 return Err(format!(
124 "sync payload too large: {} bytes (max {MAX_SYNC_BYTES})",
125 bytes.len()
126 ));
127 }
128 let payload: Self =
129 rmp_serde::from_slice(bytes).map_err(|e| format!("invalid sync payload: {e}"))?;
130 if payload.entries.len() > MAX_ENTRIES_PER_MESSAGE {
131 return Err(format!(
132 "too many entries in payload: {} (max {MAX_ENTRIES_PER_MESSAGE})",
133 payload.entries.len()
134 ));
135 }
136 Ok(payload)
137 }
138}
139
140impl Snapshot {
141 pub fn from_oplog(oplog: &OpLog) -> Self {
143 let entries: Vec<Entry> = oplog.entries_since(None).into_iter().cloned().collect();
144 Self { entries }
145 }
146
147 pub fn to_bytes(&self) -> Vec<u8> {
149 rmp_serde::to_vec(self).expect("snapshot serialization should not fail")
150 }
151
152 pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
155 if bytes.len() > MAX_SYNC_BYTES {
156 return Err(format!(
157 "snapshot too large: {} bytes (max {MAX_SYNC_BYTES})",
158 bytes.len()
159 ));
160 }
161 let snap: Self =
162 rmp_serde::from_slice(bytes).map_err(|e| format!("invalid snapshot: {e}"))?;
163 if snap.entries.len() > MAX_ENTRIES_PER_MESSAGE {
164 return Err(format!(
165 "too many entries in snapshot: {} (max {MAX_ENTRIES_PER_MESSAGE})",
166 snap.entries.len()
167 ));
168 }
169 Ok(snap)
170 }
171}
172
173pub fn entries_missing(oplog: &OpLog, remote_offer: &SyncOffer) -> SyncPayload {
186 let remote_heads_set: HashSet<Hash> = remote_offer.heads.iter().copied().collect();
187
188 let all_entries = oplog.entries_since(None);
190
191 let our_heads: HashSet<Hash> = oplog.heads().into_iter().collect();
193 if our_heads.is_subset(&remote_heads_set) {
194 let need = compute_need(oplog, &remote_offer.heads);
197 return SyncPayload {
198 entries: vec![],
199 need,
200 };
201 }
202
203 let mut send_set: HashSet<Hash> = HashSet::new();
205 for entry in &all_entries {
206 if !remote_offer.bloom.contains(&entry.hash) {
207 send_set.insert(entry.hash);
208 }
209 }
210
211 for &head in &our_heads {
217 if !remote_heads_set.contains(&head) {
218 send_set.insert(head);
219 }
220 }
221
222 {
231 let mut queue: VecDeque<Hash> = send_set.iter().copied().collect();
232 while let Some(hash) = queue.pop_front() {
233 if let Some(entry) = oplog.get(&hash) {
234 for parent_hash in &entry.next {
235 if !send_set.contains(parent_hash)
236 && !remote_heads_set.contains(parent_hash)
237 && oplog.get(parent_hash).is_some()
238 {
239 send_set.insert(*parent_hash);
240 queue.push_back(*parent_hash);
241 }
242 }
243 }
244 }
245 }
246
247 let missing: Vec<Entry> = all_entries
249 .into_iter()
250 .filter(|e| send_set.contains(&e.hash))
251 .cloned()
252 .collect();
253
254 let need = compute_need(oplog, &remote_offer.heads);
256
257 SyncPayload {
258 entries: missing,
259 need,
260 }
261}
262
263fn compute_need(oplog: &OpLog, remote_heads: &[Hash]) -> Vec<Hash> {
265 remote_heads
266 .iter()
267 .filter(|h| oplog.get(h).is_none())
268 .copied()
269 .collect()
270}
271
272pub fn merge_entries(oplog: &mut OpLog, entries: &[Entry]) -> Result<usize, String> {
281 let mut inserted = 0;
282 let mut remaining: Vec<&Entry> = entries.iter().collect();
283 let mut max_passes = remaining.len() + 1;
284
285 while !remaining.is_empty() && max_passes > 0 {
286 let mut next_remaining = Vec::new();
287 for entry in &remaining {
288 match oplog.append((*entry).clone()) {
289 Ok(true) => {
290 inserted += 1;
291 }
292 Ok(false) => {
293 }
295 Err(crate::oplog::OpLogError::MissingParent(_)) => {
296 next_remaining.push(*entry);
298 }
299 Err(crate::oplog::OpLogError::InvalidHash) => {
300 return Err(format!(
301 "invalid hash for entry {}",
302 hex::encode(entry.hash)
303 ));
304 }
305 }
306 }
307 if next_remaining.len() == remaining.len() {
308 return Err(format!(
310 "{} entries have unresolvable parents",
311 remaining.len()
312 ));
313 }
314 remaining = next_remaining;
315 max_passes -= 1;
316 }
317
318 Ok(inserted)
319}
320
321#[cfg(test)]
322mod tests {
323 use super::*;
324 use crate::clock::LamportClock;
325 use crate::entry::GraphOp;
326 use crate::ontology::{NodeTypeDef, Ontology};
327 use std::collections::BTreeMap;
328
329 fn test_ontology() -> Ontology {
330 Ontology {
331 node_types: BTreeMap::from([(
332 "entity".into(),
333 NodeTypeDef {
334 description: None,
335 properties: BTreeMap::new(),
336 subtypes: None,
337 parent_type: None,
338 },
339 )]),
340 edge_types: BTreeMap::new(),
341 }
342 }
343
344 fn genesis(author: &str) -> Entry {
345 Entry::new(
346 GraphOp::DefineOntology {
347 ontology: test_ontology(),
348 },
349 vec![],
350 vec![],
351 LamportClock::new(author),
352 author,
353 )
354 }
355
356 fn add_node_op(id: &str) -> GraphOp {
357 GraphOp::AddNode {
358 node_id: id.into(),
359 node_type: "entity".into(),
360 label: id.into(),
361 properties: BTreeMap::new(),
362 subtype: None,
363 }
364 }
365
366 fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64, author: &str) -> Entry {
367 Entry::new(
368 op,
369 next,
370 vec![],
371 LamportClock::with_values(author, clock_time, 0),
372 author,
373 )
374 }
375
376 #[test]
379 fn sync_offer_from_oplog() {
380 let g = genesis("inst-a");
381 let mut log = OpLog::new(g.clone());
382 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
383 log.append(e1.clone()).unwrap();
384
385 let offer = SyncOffer::from_oplog(&log, 2, 0);
386 assert_eq!(offer.heads, vec![e1.hash]);
387 assert!(offer.bloom.contains(&g.hash));
388 assert!(offer.bloom.contains(&e1.hash));
389 assert_eq!(offer.physical_ms, 2);
390 assert_eq!(offer.logical, 0);
391 }
392
393 #[test]
394 fn sync_offer_serialization_roundtrip() {
395 let g = genesis("inst-a");
396 let log = OpLog::new(g.clone());
397 let offer = SyncOffer::from_oplog(&log, 1, 0);
398
399 let bytes = offer.to_bytes();
400 let restored = SyncOffer::from_bytes(&bytes).unwrap();
401 assert_eq!(restored.heads, offer.heads);
402 assert_eq!(restored.physical_ms, offer.physical_ms);
403 assert_eq!(restored.logical, offer.logical);
404 assert!(restored.bloom.contains(&g.hash));
405 }
406
407 #[test]
410 fn entries_missing_detects_delta() {
411 let g = genesis("inst-a");
415 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
416 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
417
418 let mut log_a = OpLog::new(g.clone());
419 log_a.append(e1.clone()).unwrap();
420 log_a.append(e2.clone()).unwrap();
421
422 let mut log_b = OpLog::new(g.clone());
423 log_b.append(e1.clone()).unwrap();
424
425 let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
426 let payload = entries_missing(&log_a, &offer_b);
427
428 assert_eq!(payload.entries.len(), 1);
429 assert_eq!(payload.entries[0].hash, e2.hash);
430 assert!(payload.need.is_empty());
431 }
432
433 #[test]
434 fn entries_missing_nothing_when_in_sync() {
435 let g = genesis("inst-a");
436 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
437
438 let mut log_a = OpLog::new(g.clone());
439 log_a.append(e1.clone()).unwrap();
440
441 let mut log_b = OpLog::new(g.clone());
442 log_b.append(e1.clone()).unwrap();
443
444 let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
445 let payload = entries_missing(&log_a, &offer_b);
446
447 assert!(payload.entries.is_empty());
448 assert!(payload.need.is_empty());
449 }
450
451 #[test]
452 fn entries_missing_need_list_for_remote_only() {
453 let g = genesis("inst-a");
456 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-b");
457
458 let log_a = OpLog::new(g.clone());
459
460 let mut log_b = OpLog::new(g.clone());
461 log_b.append(e1.clone()).unwrap();
462
463 let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
464 let payload = entries_missing(&log_a, &offer_b);
465
466 assert_eq!(payload.need.len(), 1);
470 assert_eq!(payload.need[0], e1.hash);
471 }
472
473 #[test]
474 fn entries_missing_bloom_reduces_transfer() {
475 let g = genesis("inst-a");
478 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
479 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
480
481 let mut log_a = OpLog::new(g.clone());
482 log_a.append(e1.clone()).unwrap();
483 log_a.append(e2.clone()).unwrap();
484
485 let mut log_b = OpLog::new(g.clone());
486 log_b.append(e1.clone()).unwrap();
487
488 let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
489 let payload = entries_missing(&log_a, &offer_b);
490
491 assert_eq!(payload.entries.len(), 1);
493 assert_eq!(payload.entries[0].hash, e2.hash);
494 }
495
496 #[test]
499 fn merge_entries_basic() {
500 let g = genesis("inst-a");
501 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
502 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
503
504 let mut log_b = OpLog::new(g.clone());
505 let merged = merge_entries(&mut log_b, &[e1.clone(), e2.clone()]).unwrap();
507
508 assert_eq!(merged, 2);
509 assert_eq!(log_b.len(), 3); assert!(log_b.get(&e1.hash).is_some());
511 assert!(log_b.get(&e2.hash).is_some());
512 }
513
514 #[test]
515 fn merge_entries_out_of_order() {
516 let g = genesis("inst-a");
518 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
519 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
520
521 let mut log_b = OpLog::new(g.clone());
522 let merged = merge_entries(&mut log_b, &[e2.clone(), e1.clone()]).unwrap();
524
525 assert_eq!(merged, 2);
526 assert_eq!(log_b.len(), 3);
527 }
528
529 #[test]
530 fn merge_entries_duplicates_ignored() {
531 let g = genesis("inst-a");
532 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
533
534 let mut log_b = OpLog::new(g.clone());
535 log_b.append(e1.clone()).unwrap();
536
537 let merged = merge_entries(&mut log_b, &[e1.clone()]).unwrap();
539 assert_eq!(merged, 0);
540 assert_eq!(log_b.len(), 2);
541 }
542
543 #[test]
544 fn merge_entries_rejects_invalid_hash() {
545 let g = genesis("inst-a");
546 let mut bad = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
547 bad.author = "tampered".into(); let mut log_b = OpLog::new(g.clone());
550 let result = merge_entries(&mut log_b, &[bad]);
551 assert!(result.is_err());
552 assert!(result.unwrap_err().contains("invalid hash"));
553 }
554
555 #[test]
558 fn snapshot_roundtrip() {
559 let g = genesis("inst-a");
560 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
561 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
562
563 let mut log = OpLog::new(g.clone());
564 log.append(e1.clone()).unwrap();
565 log.append(e2.clone()).unwrap();
566
567 let snapshot = Snapshot::from_oplog(&log);
568 assert_eq!(snapshot.entries.len(), 3);
569
570 let bytes = snapshot.to_bytes();
571 let restored = Snapshot::from_bytes(&bytes).unwrap();
572 assert_eq!(restored.entries.len(), 3);
573 assert_eq!(restored.entries[0].hash, g.hash);
574 assert_eq!(restored.entries[1].hash, e1.hash);
575 assert_eq!(restored.entries[2].hash, e2.hash);
576 }
577
578 #[test]
579 fn snapshot_can_bootstrap_new_peer() {
580 let g = genesis("inst-a");
582 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
583 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
584
585 let mut log_a = OpLog::new(g.clone());
586 log_a.append(e1.clone()).unwrap();
587 log_a.append(e2.clone()).unwrap();
588
589 let snapshot = Snapshot::from_oplog(&log_a);
590
591 let genesis_entry = &snapshot.entries[0];
593 let mut log_b = OpLog::new(genesis_entry.clone());
594 let remaining = &snapshot.entries[1..];
595 let merged = merge_entries(&mut log_b, remaining).unwrap();
596
597 assert_eq!(merged, 2);
598 assert_eq!(log_b.len(), 3);
599 assert_eq!(log_b.heads(), log_a.heads());
600 }
601
602 #[test]
605 fn full_sync_roundtrip_a_to_b() {
606 let g = genesis("inst-a");
608 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
609 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
610
611 let mut log_a = OpLog::new(g.clone());
612 log_a.append(e1.clone()).unwrap();
613 log_a.append(e2.clone()).unwrap();
614
615 let mut log_b = OpLog::new(g.clone());
616
617 let offer_b = SyncOffer::from_oplog(&log_b, 1, 0);
619
620 let payload = entries_missing(&log_a, &offer_b);
622
623 let merged = merge_entries(&mut log_b, &payload.entries).unwrap();
625
626 assert_eq!(merged, 2);
627 assert_eq!(log_b.len(), 3);
628 assert_eq!(log_a.heads(), log_b.heads());
629 }
630
631 #[test]
632 fn full_sync_bidirectional() {
633 let g = genesis("inst-a");
635
636 let a1 = make_entry(add_node_op("a1"), vec![g.hash], 2, "inst-a");
638 let mut log_a = OpLog::new(g.clone());
639 log_a.append(a1.clone()).unwrap();
640
641 let b1 = make_entry(add_node_op("b1"), vec![g.hash], 2, "inst-b");
643 let mut log_b = OpLog::new(g.clone());
644 log_b.append(b1.clone()).unwrap();
645
646 let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
648 let payload_a_to_b = entries_missing(&log_a, &offer_b);
649 merge_entries(&mut log_b, &payload_a_to_b.entries).unwrap();
650
651 let offer_a = SyncOffer::from_oplog(&log_a, 2, 0);
653 let payload_b_to_a = entries_missing(&log_b, &offer_a);
654 merge_entries(&mut log_a, &payload_b_to_a.entries).unwrap();
655
656 assert_eq!(log_a.len(), 3);
658 assert_eq!(log_b.len(), 3);
659
660 let heads_a: HashSet<Hash> = log_a.heads().into_iter().collect();
662 let heads_b: HashSet<Hash> = log_b.heads().into_iter().collect();
663 assert_eq!(heads_a, heads_b);
664 assert!(heads_a.contains(&a1.hash));
665 assert!(heads_a.contains(&b1.hash));
666 }
667
668 #[test]
669 fn entries_missing_forces_heads_despite_bloom_fp() {
670 let g = genesis("inst-a");
674 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
675 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
676
677 let mut log_a = OpLog::new(g.clone());
678 log_a.append(e1.clone()).unwrap();
679 log_a.append(e2.clone()).unwrap();
680
681 let mut bloom = BloomFilter::new(128, 0.01);
684 bloom.insert(&g.hash);
685 bloom.insert(&e1.hash);
686 bloom.insert(&e2.hash); let fake_offer = SyncOffer {
689 protocol_version: PROTOCOL_VERSION,
690 heads: vec![e1.hash], bloom,
692 physical_ms: 2,
693 logical: 0,
694 };
695
696 let payload = entries_missing(&log_a, &fake_offer);
697
698 let sent_hashes: HashSet<Hash> = payload.entries.iter().map(|e| e.hash).collect();
700 assert!(
701 sent_hashes.contains(&e2.hash),
702 "head entry must be sent even when bloom falsely contains it"
703 );
704 }
705
706 #[test]
707 fn entries_missing_forces_heads_with_ancestor_closure() {
708 let g = genesis("inst-a");
711 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
712 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
713 let e3 = make_entry(add_node_op("n3"), vec![e2.hash], 4, "inst-a");
714
715 let mut log_a = OpLog::new(g.clone());
716 log_a.append(e1.clone()).unwrap();
717 log_a.append(e2.clone()).unwrap();
718 log_a.append(e3.clone()).unwrap();
719
720 let mut bloom = BloomFilter::new(128, 0.01);
722 for entry in log_a.entries_since(None) {
723 bloom.insert(&entry.hash);
724 }
725
726 let fake_offer = SyncOffer {
727 protocol_version: PROTOCOL_VERSION,
728 heads: vec![g.hash],
729 bloom,
730 physical_ms: 1,
731 logical: 0,
732 };
733
734 let payload = entries_missing(&log_a, &fake_offer);
735 let sent_hashes: HashSet<Hash> = payload.entries.iter().map(|e| e.hash).collect();
736
737 assert!(
740 sent_hashes.contains(&e1.hash),
741 "e1 must be recovered by ancestor closure"
742 );
743 assert!(
744 sent_hashes.contains(&e2.hash),
745 "e2 must be recovered by ancestor closure"
746 );
747 assert!(sent_hashes.contains(&e3.hash), "e3 must be forced as head");
748 }
749
750 #[test]
751 fn sync_is_idempotent() {
752 let g = genesis("inst-a");
753 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
754
755 let mut log_a = OpLog::new(g.clone());
756 log_a.append(e1.clone()).unwrap();
757
758 let mut log_b = OpLog::new(g.clone());
759
760 let offer_b = SyncOffer::from_oplog(&log_b, 1, 0);
762 let payload = entries_missing(&log_a, &offer_b);
763 merge_entries(&mut log_b, &payload.entries).unwrap();
764 assert_eq!(log_b.len(), 2);
765
766 let offer_b2 = SyncOffer::from_oplog(&log_b, 2, 0);
768 let payload2 = entries_missing(&log_a, &offer_b2);
769 assert!(payload2.entries.is_empty());
770 let merged2 = merge_entries(&mut log_b, &payload2.entries).unwrap();
771 assert_eq!(merged2, 0);
772 assert_eq!(log_b.len(), 2);
773 }
774}