Skip to main content

silk/
sync.rs

1use serde::{Deserialize, Serialize};
2use std::collections::HashSet;
3
4use crate::bloom::BloomFilter;
5use crate::entry::{Entry, Hash};
6use crate::oplog::OpLog;
7
8/// S-03: Maximum byte size for sync messages (64 MB).
9const MAX_SYNC_BYTES: usize = 64 * 1024 * 1024;
10/// S-03: Maximum entries in a single sync payload or snapshot.
11const MAX_ENTRIES_PER_MESSAGE: usize = 100_000;
12
13/// A sync offer — sent by a peer to advertise its state.
14///
15/// Contains the peer's current DAG heads and a bloom filter of all
16/// entry hashes it holds. The recipient uses this to compute which
17/// entries the peer is missing and needs to receive.
18/// Current protocol version. Incremented on breaking wire format changes.
19pub const PROTOCOL_VERSION: u32 = 1;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct SyncOffer {
23    /// Protocol version — peers reject offers with unknown versions.
24    /// Defaults to 0 for backward compat with pre-versioned offers.
25    #[serde(default)]
26    pub protocol_version: u32,
27    /// Current DAG heads of the offering peer.
28    pub heads: Vec<Hash>,
29    /// Bloom filter containing all entry hashes the peer has.
30    pub bloom: BloomFilter,
31    /// Physical time (ms) of the offering peer's clock.
32    pub physical_ms: u64,
33    /// Logical counter of the offering peer's clock.
34    pub logical: u32,
35}
36
37/// A sync response — entries the recipient should merge.
38///
39/// Contains the entries the peer is missing (not in their bloom filter
40/// and not reachable from their heads).
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct SyncPayload {
43    /// Entries the peer is missing, in topological (causal) order.
44    pub entries: Vec<Entry>,
45    /// Hashes the sender still needs (explicit request for false-positive resolution).
46    pub need: Vec<Hash>,
47}
48
49/// A full snapshot — for bootstrapping new peers.
50///
51/// Contains every entry in the op log, serialized in topological order.
52/// New peers deserialize this, rebuild their op log and materialized graph,
53/// then switch to delta sync.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct Snapshot {
56    /// All entries in topological (causal) order.
57    pub entries: Vec<Entry>,
58}
59
60impl SyncOffer {
61    /// Build a sync offer from an op log.
62    ///
63    /// Constructs a bloom filter of all entry hashes and captures current heads.
64    pub fn from_oplog(oplog: &OpLog, physical_ms: u64, logical: u32) -> Self {
65        let all = oplog.entries_since(None);
66        // Use a minimum of 128 expected items so the bloom filter has enough
67        // bits to avoid false positives with very small entry sets.
68        let count = all.len().max(128);
69        let mut bloom = BloomFilter::new(count, 0.01);
70        for entry in &all {
71            bloom.insert(&entry.hash);
72        }
73        Self {
74            protocol_version: PROTOCOL_VERSION,
75            heads: oplog.heads(),
76            bloom,
77            physical_ms,
78            logical,
79        }
80    }
81
82    /// Serialize to MessagePack bytes.
83    pub fn to_bytes(&self) -> Vec<u8> {
84        rmp_serde::to_vec(self).expect("sync offer serialization should not fail")
85    }
86
87    /// Deserialize from MessagePack bytes.
88    /// S-03: validates byte length. S-05: validates bloom filter dimensions.
89    pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
90        if bytes.len() > MAX_SYNC_BYTES {
91            return Err(format!(
92                "sync offer too large: {} bytes (max {MAX_SYNC_BYTES})",
93                bytes.len()
94            ));
95        }
96        let offer: Self =
97            rmp_serde::from_slice(bytes).map_err(|e| format!("invalid sync offer: {e}"))?;
98        // Protocol version check — reject offers from incompatible future versions
99        if offer.protocol_version > PROTOCOL_VERSION {
100            return Err(format!(
101                "unsupported protocol version {} (this peer supports up to {})",
102                offer.protocol_version, PROTOCOL_VERSION
103            ));
104        }
105        offer
106            .bloom
107            .validate()
108            .map_err(|e| format!("invalid bloom filter in sync offer: {e}"))?;
109        Ok(offer)
110    }
111}
112
113impl SyncPayload {
114    /// Serialize to MessagePack bytes.
115    pub fn to_bytes(&self) -> Vec<u8> {
116        rmp_serde::to_vec(self).expect("sync payload serialization should not fail")
117    }
118
119    /// Deserialize from MessagePack bytes.
120    /// S-03: validates byte length and entry count.
121    pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
122        if bytes.len() > MAX_SYNC_BYTES {
123            return Err(format!(
124                "sync payload too large: {} bytes (max {MAX_SYNC_BYTES})",
125                bytes.len()
126            ));
127        }
128        let payload: Self =
129            rmp_serde::from_slice(bytes).map_err(|e| format!("invalid sync payload: {e}"))?;
130        if payload.entries.len() > MAX_ENTRIES_PER_MESSAGE {
131            return Err(format!(
132                "too many entries in payload: {} (max {MAX_ENTRIES_PER_MESSAGE})",
133                payload.entries.len()
134            ));
135        }
136        Ok(payload)
137    }
138}
139
140impl Snapshot {
141    /// Build a full snapshot from an op log.
142    pub fn from_oplog(oplog: &OpLog) -> Self {
143        let entries: Vec<Entry> = oplog.entries_since(None).into_iter().cloned().collect();
144        Self { entries }
145    }
146
147    /// Serialize to MessagePack bytes.
148    pub fn to_bytes(&self) -> Vec<u8> {
149        rmp_serde::to_vec(self).expect("snapshot serialization should not fail")
150    }
151
152    /// Deserialize from MessagePack bytes.
153    /// S-03: validates byte length and entry count.
154    pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
155        if bytes.len() > MAX_SYNC_BYTES {
156            return Err(format!(
157                "snapshot too large: {} bytes (max {MAX_SYNC_BYTES})",
158                bytes.len()
159            ));
160        }
161        let snap: Self =
162            rmp_serde::from_slice(bytes).map_err(|e| format!("invalid snapshot: {e}"))?;
163        if snap.entries.len() > MAX_ENTRIES_PER_MESSAGE {
164            return Err(format!(
165                "too many entries in snapshot: {} (max {MAX_ENTRIES_PER_MESSAGE})",
166                snap.entries.len()
167            ));
168        }
169        Ok(snap)
170    }
171}
172
173/// Compute the entries that a remote peer is missing.
174///
175/// Given a remote peer's sync offer (heads + bloom filter), determine which
176/// entries from our local op log the peer doesn't have and should receive.
177///
178/// Uses bloom filter for fast "probably has it" checks. Entries definitely
179/// in the bloom filter are skipped; entries not in the bloom are included.
180///
181/// To prevent false positives from breaking causal chains, the payload
182/// includes the transitive closure of ancestors for every missing entry.
183/// If a parent was false-positively skipped by the bloom filter, it gets
184/// included anyway because a descendant needs it.
185pub fn entries_missing(oplog: &OpLog, remote_offer: &SyncOffer) -> SyncPayload {
186    let remote_heads_set: HashSet<Hash> = remote_offer.heads.iter().copied().collect();
187
188    // Get all our entries.
189    let all_entries = oplog.entries_since(None);
190
191    // Check if remote already has all our heads.
192    let our_heads: HashSet<Hash> = oplog.heads().into_iter().collect();
193    if our_heads.is_subset(&remote_heads_set) {
194        // Remote is up-to-date (or ahead). Nothing to send.
195        // But we might need entries from them.
196        let need = compute_need(oplog, &remote_offer.heads);
197        return SyncPayload {
198            entries: vec![],
199            need,
200        };
201    }
202
203    // Phase 1: collect entries the bloom says the remote doesn't have.
204    let mut send_set: HashSet<Hash> = HashSet::new();
205    for entry in &all_entries {
206        if !remote_offer.bloom.contains(&entry.hash) {
207            send_set.insert(entry.hash);
208        }
209    }
210
211    // Phase 1.5: Force our heads into send_set if the remote doesn't have
212    // them as heads. Bloom filter false positives on head entries (DAG tips)
213    // cannot be recovered by Phase 2's ancestor closure because no descendant
214    // exists in send_set to trigger the walk. Forcing heads guarantees they
215    // are always sent, and Phase 2 then pulls in their full causal chain.
216    for &head in &our_heads {
217        if !remote_heads_set.contains(&head) {
218            send_set.insert(head);
219        }
220    }
221
222    // Phase 2: ancestor closure — for each entry we're sending, ensure
223    // all parents are either in the remote's bloom OR in our send set.
224    // This resolves false positives that would break causal chains.
225    let mut changed = true;
226    while changed {
227        changed = false;
228        for entry in &all_entries {
229            if !send_set.contains(&entry.hash) {
230                continue;
231            }
232            for parent_hash in &entry.next {
233                // If the parent is not already being sent and is not in the
234                // remote's heads (which they definitely have), check if they
235                // might need it.
236                if !send_set.contains(parent_hash)
237                    && !remote_heads_set.contains(parent_hash)
238                    && oplog.get(parent_hash).is_some()
239                {
240                    // The parent might have been a bloom false positive.
241                    // Include it to be safe.
242                    send_set.insert(*parent_hash);
243                    changed = true;
244                }
245            }
246        }
247    }
248
249    // Build the payload in topological order.
250    let missing: Vec<Entry> = all_entries
251        .into_iter()
252        .filter(|e| send_set.contains(&e.hash))
253        .cloned()
254        .collect();
255
256    // Compute what we need from the remote.
257    let need = compute_need(oplog, &remote_offer.heads);
258
259    SyncPayload {
260        entries: missing,
261        need,
262    }
263}
264
265/// Compute which remote heads we don't have (we need them).
266fn compute_need(oplog: &OpLog, remote_heads: &[Hash]) -> Vec<Hash> {
267    remote_heads
268        .iter()
269        .filter(|h| oplog.get(h).is_none())
270        .copied()
271        .collect()
272}
273
274/// Merge remote entries into a local op log.
275///
276/// Entries are validated (hash verification, parent existence) and appended.
277/// Returns the number of new entries successfully merged.
278///
279/// Entries should be in topological order (parents before children).
280/// If an entry's parents haven't arrived yet, it's retried after processing
281/// the rest of the batch (handles minor ordering issues).
282pub fn merge_entries(oplog: &mut OpLog, entries: &[Entry]) -> Result<usize, String> {
283    let mut inserted = 0;
284    let mut remaining: Vec<&Entry> = entries.iter().collect();
285    let mut max_passes = remaining.len() + 1;
286
287    while !remaining.is_empty() && max_passes > 0 {
288        let mut next_remaining = Vec::new();
289        for entry in &remaining {
290            match oplog.append((*entry).clone()) {
291                Ok(true) => {
292                    inserted += 1;
293                }
294                Ok(false) => {
295                    // Duplicate — already have it, skip.
296                }
297                Err(crate::oplog::OpLogError::MissingParent(_)) => {
298                    // Parent not yet available — retry later in the batch.
299                    next_remaining.push(*entry);
300                }
301                Err(crate::oplog::OpLogError::InvalidHash) => {
302                    return Err(format!(
303                        "invalid hash for entry {}",
304                        hex::encode(entry.hash)
305                    ));
306                }
307            }
308        }
309        if next_remaining.len() == remaining.len() {
310            // No progress — remaining entries have unresolvable parents.
311            return Err(format!(
312                "{} entries have unresolvable parents",
313                remaining.len()
314            ));
315        }
316        remaining = next_remaining;
317        max_passes -= 1;
318    }
319
320    Ok(inserted)
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326    use crate::clock::LamportClock;
327    use crate::entry::GraphOp;
328    use crate::ontology::{NodeTypeDef, Ontology};
329    use std::collections::BTreeMap;
330
331    fn test_ontology() -> Ontology {
332        Ontology {
333            node_types: BTreeMap::from([(
334                "entity".into(),
335                NodeTypeDef {
336                    description: None,
337                    properties: BTreeMap::new(),
338                    subtypes: None,
339                },
340            )]),
341            edge_types: BTreeMap::new(),
342        }
343    }
344
345    fn genesis(author: &str) -> Entry {
346        Entry::new(
347            GraphOp::DefineOntology {
348                ontology: test_ontology(),
349            },
350            vec![],
351            vec![],
352            LamportClock::new(author),
353            author,
354        )
355    }
356
357    fn add_node_op(id: &str) -> GraphOp {
358        GraphOp::AddNode {
359            node_id: id.into(),
360            node_type: "entity".into(),
361            label: id.into(),
362            properties: BTreeMap::new(),
363            subtype: None,
364        }
365    }
366
367    fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64, author: &str) -> Entry {
368        Entry::new(
369            op,
370            next,
371            vec![],
372            LamportClock::with_values(author, clock_time, 0),
373            author,
374        )
375    }
376
377    // -- SyncOffer tests --
378
379    #[test]
380    fn sync_offer_from_oplog() {
381        let g = genesis("inst-a");
382        let mut log = OpLog::new(g.clone());
383        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
384        log.append(e1.clone()).unwrap();
385
386        let offer = SyncOffer::from_oplog(&log, 2, 0);
387        assert_eq!(offer.heads, vec![e1.hash]);
388        assert!(offer.bloom.contains(&g.hash));
389        assert!(offer.bloom.contains(&e1.hash));
390        assert_eq!(offer.physical_ms, 2);
391        assert_eq!(offer.logical, 0);
392    }
393
394    #[test]
395    fn sync_offer_serialization_roundtrip() {
396        let g = genesis("inst-a");
397        let log = OpLog::new(g.clone());
398        let offer = SyncOffer::from_oplog(&log, 1, 0);
399
400        let bytes = offer.to_bytes();
401        let restored = SyncOffer::from_bytes(&bytes).unwrap();
402        assert_eq!(restored.heads, offer.heads);
403        assert_eq!(restored.physical_ms, offer.physical_ms);
404        assert_eq!(restored.logical, offer.logical);
405        assert!(restored.bloom.contains(&g.hash));
406    }
407
408    // -- entries_missing tests --
409
410    #[test]
411    fn entries_missing_detects_delta() {
412        // Log A has: genesis → n1 → n2
413        // Log B has: genesis → n1
414        // B's offer should cause A to send n2.
415        let g = genesis("inst-a");
416        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
417        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
418
419        let mut log_a = OpLog::new(g.clone());
420        log_a.append(e1.clone()).unwrap();
421        log_a.append(e2.clone()).unwrap();
422
423        let mut log_b = OpLog::new(g.clone());
424        log_b.append(e1.clone()).unwrap();
425
426        let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
427        let payload = entries_missing(&log_a, &offer_b);
428
429        assert_eq!(payload.entries.len(), 1);
430        assert_eq!(payload.entries[0].hash, e2.hash);
431        assert!(payload.need.is_empty());
432    }
433
434    #[test]
435    fn entries_missing_nothing_when_in_sync() {
436        let g = genesis("inst-a");
437        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
438
439        let mut log_a = OpLog::new(g.clone());
440        log_a.append(e1.clone()).unwrap();
441
442        let mut log_b = OpLog::new(g.clone());
443        log_b.append(e1.clone()).unwrap();
444
445        let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
446        let payload = entries_missing(&log_a, &offer_b);
447
448        assert!(payload.entries.is_empty());
449        assert!(payload.need.is_empty());
450    }
451
452    #[test]
453    fn entries_missing_need_list_for_remote_only() {
454        // A has genesis only. B has genesis → n1.
455        // A should report that it needs B's head.
456        let g = genesis("inst-a");
457        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-b");
458
459        let log_a = OpLog::new(g.clone());
460
461        let mut log_b = OpLog::new(g.clone());
462        log_b.append(e1.clone()).unwrap();
463
464        let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
465        let payload = entries_missing(&log_a, &offer_b);
466
467        // A may send genesis because it can't verify B has it (B's head n1
468        // isn't in A's oplog). This is safe — merge ignores duplicates.
469        // The essential assertion: A needs B's head (e1).
470        assert_eq!(payload.need.len(), 1);
471        assert_eq!(payload.need[0], e1.hash);
472    }
473
474    #[test]
475    fn entries_missing_bloom_reduces_transfer() {
476        // Both have genesis + n1. A also has n2.
477        // B's bloom should contain genesis + n1, so only n2 is sent.
478        let g = genesis("inst-a");
479        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
480        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
481
482        let mut log_a = OpLog::new(g.clone());
483        log_a.append(e1.clone()).unwrap();
484        log_a.append(e2.clone()).unwrap();
485
486        let mut log_b = OpLog::new(g.clone());
487        log_b.append(e1.clone()).unwrap();
488
489        let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
490        let payload = entries_missing(&log_a, &offer_b);
491
492        // Only n2 should be sent (genesis and n1 are in B's bloom).
493        assert_eq!(payload.entries.len(), 1);
494        assert_eq!(payload.entries[0].hash, e2.hash);
495    }
496
497    // -- merge_entries tests --
498
499    #[test]
500    fn merge_entries_basic() {
501        let g = genesis("inst-a");
502        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
503        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
504
505        let mut log_b = OpLog::new(g.clone());
506        // B doesn't have e1 or e2 yet.
507        let merged = merge_entries(&mut log_b, &[e1.clone(), e2.clone()]).unwrap();
508
509        assert_eq!(merged, 2);
510        assert_eq!(log_b.len(), 3); // genesis + 2
511        assert!(log_b.get(&e1.hash).is_some());
512        assert!(log_b.get(&e2.hash).is_some());
513    }
514
515    #[test]
516    fn merge_entries_out_of_order() {
517        // Entries arrive child-first — merge should handle re-ordering.
518        let g = genesis("inst-a");
519        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
520        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
521
522        let mut log_b = OpLog::new(g.clone());
523        // Send e2 before e1 — e2 depends on e1.
524        let merged = merge_entries(&mut log_b, &[e2.clone(), e1.clone()]).unwrap();
525
526        assert_eq!(merged, 2);
527        assert_eq!(log_b.len(), 3);
528    }
529
530    #[test]
531    fn merge_entries_duplicates_ignored() {
532        let g = genesis("inst-a");
533        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
534
535        let mut log_b = OpLog::new(g.clone());
536        log_b.append(e1.clone()).unwrap();
537
538        // Merge same entry again — should be idempotent.
539        let merged = merge_entries(&mut log_b, &[e1.clone()]).unwrap();
540        assert_eq!(merged, 0);
541        assert_eq!(log_b.len(), 2);
542    }
543
544    #[test]
545    fn merge_entries_rejects_invalid_hash() {
546        let g = genesis("inst-a");
547        let mut bad = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
548        bad.author = "tampered".into(); // hash no longer valid
549
550        let mut log_b = OpLog::new(g.clone());
551        let result = merge_entries(&mut log_b, &[bad]);
552        assert!(result.is_err());
553        assert!(result.unwrap_err().contains("invalid hash"));
554    }
555
556    // -- Snapshot tests --
557
558    #[test]
559    fn snapshot_roundtrip() {
560        let g = genesis("inst-a");
561        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
562        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
563
564        let mut log = OpLog::new(g.clone());
565        log.append(e1.clone()).unwrap();
566        log.append(e2.clone()).unwrap();
567
568        let snapshot = Snapshot::from_oplog(&log);
569        assert_eq!(snapshot.entries.len(), 3);
570
571        let bytes = snapshot.to_bytes();
572        let restored = Snapshot::from_bytes(&bytes).unwrap();
573        assert_eq!(restored.entries.len(), 3);
574        assert_eq!(restored.entries[0].hash, g.hash);
575        assert_eq!(restored.entries[1].hash, e1.hash);
576        assert_eq!(restored.entries[2].hash, e2.hash);
577    }
578
579    #[test]
580    fn snapshot_can_bootstrap_new_peer() {
581        // Create log A with some entries, snapshot it, load into fresh log B.
582        let g = genesis("inst-a");
583        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
584        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
585
586        let mut log_a = OpLog::new(g.clone());
587        log_a.append(e1.clone()).unwrap();
588        log_a.append(e2.clone()).unwrap();
589
590        let snapshot = Snapshot::from_oplog(&log_a);
591
592        // Bootstrap a new peer from the snapshot.
593        let genesis_entry = &snapshot.entries[0];
594        let mut log_b = OpLog::new(genesis_entry.clone());
595        let remaining = &snapshot.entries[1..];
596        let merged = merge_entries(&mut log_b, remaining).unwrap();
597
598        assert_eq!(merged, 2);
599        assert_eq!(log_b.len(), 3);
600        assert_eq!(log_b.heads(), log_a.heads());
601    }
602
603    // -- Full sync protocol round-trip --
604
605    #[test]
606    fn full_sync_roundtrip_a_to_b() {
607        // A has entries B doesn't. Sync A → B.
608        let g = genesis("inst-a");
609        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
610        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
611
612        let mut log_a = OpLog::new(g.clone());
613        log_a.append(e1.clone()).unwrap();
614        log_a.append(e2.clone()).unwrap();
615
616        let mut log_b = OpLog::new(g.clone());
617
618        // Step 1: B generates offer.
619        let offer_b = SyncOffer::from_oplog(&log_b, 1, 0);
620
621        // Step 2: A computes what B is missing.
622        let payload = entries_missing(&log_a, &offer_b);
623
624        // Step 3: B merges the entries.
625        let merged = merge_entries(&mut log_b, &payload.entries).unwrap();
626
627        assert_eq!(merged, 2);
628        assert_eq!(log_b.len(), 3);
629        assert_eq!(log_a.heads(), log_b.heads());
630    }
631
632    #[test]
633    fn full_sync_bidirectional() {
634        // A and B both have unique entries. After bidirectional sync, both converge.
635        let g = genesis("inst-a");
636
637        // A: genesis → a1
638        let a1 = make_entry(add_node_op("a1"), vec![g.hash], 2, "inst-a");
639        let mut log_a = OpLog::new(g.clone());
640        log_a.append(a1.clone()).unwrap();
641
642        // B: genesis → b1
643        let b1 = make_entry(add_node_op("b1"), vec![g.hash], 2, "inst-b");
644        let mut log_b = OpLog::new(g.clone());
645        log_b.append(b1.clone()).unwrap();
646
647        // Sync A → B.
648        let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
649        let payload_a_to_b = entries_missing(&log_a, &offer_b);
650        merge_entries(&mut log_b, &payload_a_to_b.entries).unwrap();
651
652        // Sync B → A.
653        let offer_a = SyncOffer::from_oplog(&log_a, 2, 0);
654        let payload_b_to_a = entries_missing(&log_b, &offer_a);
655        merge_entries(&mut log_a, &payload_b_to_a.entries).unwrap();
656
657        // Both should have genesis + a1 + b1 = 3 entries.
658        assert_eq!(log_a.len(), 3);
659        assert_eq!(log_b.len(), 3);
660
661        // Both should have the same heads (a1 and b1 — fork).
662        let heads_a: HashSet<Hash> = log_a.heads().into_iter().collect();
663        let heads_b: HashSet<Hash> = log_b.heads().into_iter().collect();
664        assert_eq!(heads_a, heads_b);
665        assert!(heads_a.contains(&a1.hash));
666        assert!(heads_a.contains(&b1.hash));
667    }
668
669    #[test]
670    fn entries_missing_forces_heads_despite_bloom_fp() {
671        // D-027 fix: if our head is falsely contained in the remote's bloom,
672        // it must still be included in the payload (it's not in the remote's
673        // heads, so they don't have it).
674        let g = genesis("inst-a");
675        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
676        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
677
678        let mut log_a = OpLog::new(g.clone());
679        log_a.append(e1.clone()).unwrap();
680        log_a.append(e2.clone()).unwrap();
681
682        // Craft a fake offer where the bloom claims to have ALL of A's entries
683        // (simulating false positives), but remote heads are only [e1].
684        let mut bloom = BloomFilter::new(128, 0.01);
685        bloom.insert(&g.hash);
686        bloom.insert(&e1.hash);
687        bloom.insert(&e2.hash); // FP: bloom claims remote has e2
688
689        let fake_offer = SyncOffer {
690            protocol_version: PROTOCOL_VERSION,
691            heads: vec![e1.hash], // remote doesn't actually have e2
692            bloom,
693            physical_ms: 2,
694            logical: 0,
695        };
696
697        let payload = entries_missing(&log_a, &fake_offer);
698
699        // e2 must be included — it's our head and not in remote's heads.
700        let sent_hashes: HashSet<Hash> = payload.entries.iter().map(|e| e.hash).collect();
701        assert!(
702            sent_hashes.contains(&e2.hash),
703            "head entry must be sent even when bloom falsely contains it"
704        );
705    }
706
707    #[test]
708    fn entries_missing_forces_heads_with_ancestor_closure() {
709        // When the bloom FPs both a head AND its parent, the ancestor closure
710        // (Phase 2) should recover the parent after Phase 1.5 forces the head.
711        let g = genesis("inst-a");
712        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
713        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
714        let e3 = make_entry(add_node_op("n3"), vec![e2.hash], 4, "inst-a");
715
716        let mut log_a = OpLog::new(g.clone());
717        log_a.append(e1.clone()).unwrap();
718        log_a.append(e2.clone()).unwrap();
719        log_a.append(e3.clone()).unwrap();
720
721        // Bloom FPs everything. Remote only has genesis.
722        let mut bloom = BloomFilter::new(128, 0.01);
723        for entry in log_a.entries_since(None) {
724            bloom.insert(&entry.hash);
725        }
726
727        let fake_offer = SyncOffer {
728            protocol_version: PROTOCOL_VERSION,
729            heads: vec![g.hash],
730            bloom,
731            physical_ms: 1,
732            logical: 0,
733        };
734
735        let payload = entries_missing(&log_a, &fake_offer);
736        let sent_hashes: HashSet<Hash> = payload.entries.iter().map(|e| e.hash).collect();
737
738        // All non-genesis entries must be sent: e1, e2, e3.
739        // Phase 1.5 forces e3 (our head), Phase 2 recovers e2 and e1.
740        assert!(
741            sent_hashes.contains(&e1.hash),
742            "e1 must be recovered by ancestor closure"
743        );
744        assert!(
745            sent_hashes.contains(&e2.hash),
746            "e2 must be recovered by ancestor closure"
747        );
748        assert!(sent_hashes.contains(&e3.hash), "e3 must be forced as head");
749    }
750
751    #[test]
752    fn sync_is_idempotent() {
753        let g = genesis("inst-a");
754        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
755
756        let mut log_a = OpLog::new(g.clone());
757        log_a.append(e1.clone()).unwrap();
758
759        let mut log_b = OpLog::new(g.clone());
760
761        // Sync once.
762        let offer_b = SyncOffer::from_oplog(&log_b, 1, 0);
763        let payload = entries_missing(&log_a, &offer_b);
764        merge_entries(&mut log_b, &payload.entries).unwrap();
765        assert_eq!(log_b.len(), 2);
766
767        // Sync again — should be a no-op.
768        let offer_b2 = SyncOffer::from_oplog(&log_b, 2, 0);
769        let payload2 = entries_missing(&log_a, &offer_b2);
770        assert!(payload2.entries.is_empty());
771        let merged2 = merge_entries(&mut log_b, &payload2.entries).unwrap();
772        assert_eq!(merged2, 0);
773        assert_eq!(log_b.len(), 2);
774    }
775}