Skip to main content

silk/
sync.rs

1use serde::{Deserialize, Serialize};
2use std::collections::{HashSet, VecDeque};
3
4use crate::bloom::BloomFilter;
5use crate::entry::{Entry, Hash};
6use crate::oplog::OpLog;
7
8/// S-03: Maximum byte size for sync messages (64 MB).
9const MAX_SYNC_BYTES: usize = 64 * 1024 * 1024;
10/// S-03: Maximum entries in a single sync payload or snapshot.
11const MAX_ENTRIES_PER_MESSAGE: usize = 100_000;
12
13/// A sync offer — sent by a peer to advertise its state.
14///
15/// Contains the peer's current DAG heads and a bloom filter of all
16/// entry hashes it holds. The recipient uses this to compute which
17/// entries the peer is missing and needs to receive.
18/// Current protocol version. Incremented on breaking wire format changes.
19pub const PROTOCOL_VERSION: u32 = 1;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct SyncOffer {
23    /// Protocol version — peers reject offers with unknown versions.
24    /// Defaults to 0 for backward compat with pre-versioned offers.
25    #[serde(default)]
26    pub protocol_version: u32,
27    /// Current DAG heads of the offering peer.
28    pub heads: Vec<Hash>,
29    /// Bloom filter containing all entry hashes the peer has.
30    pub bloom: BloomFilter,
31    /// Physical time (ms) of the offering peer's clock.
32    pub physical_ms: u64,
33    /// Logical counter of the offering peer's clock.
34    pub logical: u32,
35}
36
37/// A sync response — entries the recipient should merge.
38///
39/// Contains the entries the peer is missing (not in their bloom filter
40/// and not reachable from their heads).
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct SyncPayload {
43    /// Entries the peer is missing, in topological (causal) order.
44    pub entries: Vec<Entry>,
45    /// Hashes the sender still needs (explicit request for false-positive resolution).
46    pub need: Vec<Hash>,
47}
48
49/// A full snapshot — for bootstrapping new peers.
50///
51/// Contains every entry in the op log, serialized in topological order.
52/// New peers deserialize this, rebuild their op log and materialized graph,
53/// then switch to delta sync.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct Snapshot {
56    /// All entries in topological (causal) order.
57    pub entries: Vec<Entry>,
58}
59
60impl SyncOffer {
61    /// Build a sync offer from an op log.
62    ///
63    /// Constructs a bloom filter of all entry hashes and captures current heads.
64    pub fn from_oplog(oplog: &OpLog, physical_ms: u64, logical: u32) -> Self {
65        let all = oplog.entries_since(None);
66        // Use a minimum of 128 expected items so the bloom filter has enough
67        // bits to avoid false positives with very small entry sets.
68        let count = all.len().max(128);
69        let mut bloom = BloomFilter::new(count, 0.01);
70        for entry in &all {
71            bloom.insert(&entry.hash);
72        }
73        Self {
74            protocol_version: PROTOCOL_VERSION,
75            heads: oplog.heads(),
76            bloom,
77            physical_ms,
78            logical,
79        }
80    }
81
82    /// Serialize to MessagePack bytes.
83    pub fn to_bytes(&self) -> Vec<u8> {
84        rmp_serde::to_vec(self).expect("sync offer serialization should not fail")
85    }
86
87    /// Deserialize from MessagePack bytes.
88    /// S-03: validates byte length. S-05: validates bloom filter dimensions.
89    pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
90        if bytes.len() > MAX_SYNC_BYTES {
91            return Err(format!(
92                "sync offer too large: {} bytes (max {MAX_SYNC_BYTES})",
93                bytes.len()
94            ));
95        }
96        let offer: Self =
97            rmp_serde::from_slice(bytes).map_err(|e| format!("invalid sync offer: {e}"))?;
98        // Protocol version check — reject offers from incompatible future versions
99        if offer.protocol_version > PROTOCOL_VERSION {
100            return Err(format!(
101                "unsupported protocol version {} (this peer supports up to {})",
102                offer.protocol_version, PROTOCOL_VERSION
103            ));
104        }
105        offer
106            .bloom
107            .validate()
108            .map_err(|e| format!("invalid bloom filter in sync offer: {e}"))?;
109        Ok(offer)
110    }
111}
112
113impl SyncPayload {
114    /// Serialize to MessagePack bytes.
115    pub fn to_bytes(&self) -> Vec<u8> {
116        rmp_serde::to_vec(self).expect("sync payload serialization should not fail")
117    }
118
119    /// Deserialize from MessagePack bytes.
120    /// S-03: validates byte length and entry count.
121    pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
122        if bytes.len() > MAX_SYNC_BYTES {
123            return Err(format!(
124                "sync payload too large: {} bytes (max {MAX_SYNC_BYTES})",
125                bytes.len()
126            ));
127        }
128        let payload: Self =
129            rmp_serde::from_slice(bytes).map_err(|e| format!("invalid sync payload: {e}"))?;
130        if payload.entries.len() > MAX_ENTRIES_PER_MESSAGE {
131            return Err(format!(
132                "too many entries in payload: {} (max {MAX_ENTRIES_PER_MESSAGE})",
133                payload.entries.len()
134            ));
135        }
136        Ok(payload)
137    }
138}
139
140impl Snapshot {
141    /// Build a full snapshot from an op log.
142    pub fn from_oplog(oplog: &OpLog) -> Self {
143        let entries: Vec<Entry> = oplog.entries_since(None).into_iter().cloned().collect();
144        Self { entries }
145    }
146
147    /// Serialize to MessagePack bytes.
148    pub fn to_bytes(&self) -> Vec<u8> {
149        rmp_serde::to_vec(self).expect("snapshot serialization should not fail")
150    }
151
152    /// Deserialize from MessagePack bytes.
153    /// S-03: validates byte length and entry count.
154    pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
155        if bytes.len() > MAX_SYNC_BYTES {
156            return Err(format!(
157                "snapshot too large: {} bytes (max {MAX_SYNC_BYTES})",
158                bytes.len()
159            ));
160        }
161        let snap: Self =
162            rmp_serde::from_slice(bytes).map_err(|e| format!("invalid snapshot: {e}"))?;
163        if snap.entries.len() > MAX_ENTRIES_PER_MESSAGE {
164            return Err(format!(
165                "too many entries in snapshot: {} (max {MAX_ENTRIES_PER_MESSAGE})",
166                snap.entries.len()
167            ));
168        }
169        Ok(snap)
170    }
171}
172
173/// Compute the entries that a remote peer is missing.
174///
175/// Given a remote peer's sync offer (heads + bloom filter), determine which
176/// entries from our local op log the peer doesn't have and should receive.
177///
178/// Uses bloom filter for fast "probably has it" checks. Entries definitely
179/// in the bloom filter are skipped; entries not in the bloom are included.
180///
181/// To prevent false positives from breaking causal chains, the payload
182/// includes the transitive closure of ancestors for every missing entry.
183/// If a parent was false-positively skipped by the bloom filter, it gets
184/// included anyway because a descendant needs it.
185pub fn entries_missing(oplog: &OpLog, remote_offer: &SyncOffer) -> SyncPayload {
186    let remote_heads_set: HashSet<Hash> = remote_offer.heads.iter().copied().collect();
187
188    // Get all our entries.
189    let all_entries = oplog.entries_since(None);
190
191    // Check if remote already has all our heads.
192    let our_heads: HashSet<Hash> = oplog.heads().into_iter().collect();
193    if our_heads.is_subset(&remote_heads_set) {
194        // Remote is up-to-date (or ahead). Nothing to send.
195        // But we might need entries from them.
196        let need = compute_need(oplog, &remote_offer.heads);
197        return SyncPayload {
198            entries: vec![],
199            need,
200        };
201    }
202
203    // Phase 1: collect entries the bloom says the remote doesn't have.
204    let mut send_set: HashSet<Hash> = HashSet::new();
205    for entry in &all_entries {
206        if !remote_offer.bloom.contains(&entry.hash) {
207            send_set.insert(entry.hash);
208        }
209    }
210
211    // Phase 1.5: Force our heads into send_set if the remote doesn't have
212    // them as heads. Bloom filter false positives on head entries (DAG tips)
213    // cannot be recovered by Phase 2's ancestor closure because no descendant
214    // exists in send_set to trigger the walk. Forcing heads guarantees they
215    // are always sent, and Phase 2 then pulls in their full causal chain.
216    for &head in &our_heads {
217        if !remote_heads_set.contains(&head) {
218            send_set.insert(head);
219        }
220    }
221
222    // Phase 2: ancestor closure — for each entry we're sending, ensure
223    // all parents are either in the remote's heads OR in our send set.
224    // This recovers bloom filter false positives that would break causal chains.
225    //
226    // EXP-01 fix: BFS queue instead of O(n × depth) nested loop.
227    // The old code iterated ALL entries per pass, needing O(depth) passes.
228    // For a 900-entry linear chain: 900 × 1000 = 900K iterations.
229    // BFS processes each entry at most once: O(|send_set| + |ancestors|).
230    {
231        let mut queue: VecDeque<Hash> = send_set.iter().copied().collect();
232        while let Some(hash) = queue.pop_front() {
233            if let Some(entry) = oplog.get(&hash) {
234                for parent_hash in &entry.next {
235                    if !send_set.contains(parent_hash)
236                        && !remote_heads_set.contains(parent_hash)
237                        && oplog.get(parent_hash).is_some()
238                    {
239                        send_set.insert(*parent_hash);
240                        queue.push_back(*parent_hash);
241                    }
242                }
243            }
244        }
245    }
246
247    // Build the payload in topological order.
248    let missing: Vec<Entry> = all_entries
249        .into_iter()
250        .filter(|e| send_set.contains(&e.hash))
251        .cloned()
252        .collect();
253
254    // Compute what we need from the remote.
255    let need = compute_need(oplog, &remote_offer.heads);
256
257    SyncPayload {
258        entries: missing,
259        need,
260    }
261}
262
263/// Compute which remote heads we don't have (we need them).
264fn compute_need(oplog: &OpLog, remote_heads: &[Hash]) -> Vec<Hash> {
265    remote_heads
266        .iter()
267        .filter(|h| oplog.get(h).is_none())
268        .copied()
269        .collect()
270}
271
272/// Merge remote entries into a local op log.
273///
274/// Entries are validated (hash verification, parent existence) and appended.
275/// Returns the number of new entries successfully merged.
276///
277/// Entries should be in topological order (parents before children).
278/// If an entry's parents haven't arrived yet, it's retried after processing
279/// the rest of the batch (handles minor ordering issues).
280pub fn merge_entries(oplog: &mut OpLog, entries: &[Entry]) -> Result<usize, String> {
281    let mut inserted = 0;
282    let mut remaining: Vec<&Entry> = entries.iter().collect();
283    let mut max_passes = remaining.len() + 1;
284
285    while !remaining.is_empty() && max_passes > 0 {
286        let mut next_remaining = Vec::new();
287        for entry in &remaining {
288            match oplog.append((*entry).clone()) {
289                Ok(true) => {
290                    inserted += 1;
291                }
292                Ok(false) => {
293                    // Duplicate — already have it, skip.
294                }
295                Err(crate::oplog::OpLogError::MissingParent(_)) => {
296                    // Parent not yet available — retry later in the batch.
297                    next_remaining.push(*entry);
298                }
299                Err(crate::oplog::OpLogError::InvalidHash) => {
300                    return Err(format!(
301                        "invalid hash for entry {}",
302                        hex::encode(entry.hash)
303                    ));
304                }
305            }
306        }
307        if next_remaining.len() == remaining.len() {
308            // No progress — remaining entries have unresolvable parents.
309            return Err(format!(
310                "{} entries have unresolvable parents",
311                remaining.len()
312            ));
313        }
314        remaining = next_remaining;
315        max_passes -= 1;
316    }
317
318    Ok(inserted)
319}
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324    use crate::clock::LamportClock;
325    use crate::entry::GraphOp;
326    use crate::ontology::{NodeTypeDef, Ontology};
327    use std::collections::BTreeMap;
328
329    fn test_ontology() -> Ontology {
330        Ontology {
331            node_types: BTreeMap::from([(
332                "entity".into(),
333                NodeTypeDef {
334                    description: None,
335                    properties: BTreeMap::new(),
336                    subtypes: None,
337                    parent_type: None,
338                },
339            )]),
340            edge_types: BTreeMap::new(),
341        }
342    }
343
344    fn genesis(author: &str) -> Entry {
345        Entry::new(
346            GraphOp::DefineOntology {
347                ontology: test_ontology(),
348            },
349            vec![],
350            vec![],
351            LamportClock::new(author),
352            author,
353        )
354    }
355
356    fn add_node_op(id: &str) -> GraphOp {
357        GraphOp::AddNode {
358            node_id: id.into(),
359            node_type: "entity".into(),
360            label: id.into(),
361            properties: BTreeMap::new(),
362            subtype: None,
363        }
364    }
365
366    fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64, author: &str) -> Entry {
367        Entry::new(
368            op,
369            next,
370            vec![],
371            LamportClock::with_values(author, clock_time, 0),
372            author,
373        )
374    }
375
376    // -- SyncOffer tests --
377
378    #[test]
379    fn sync_offer_from_oplog() {
380        let g = genesis("inst-a");
381        let mut log = OpLog::new(g.clone());
382        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
383        log.append(e1.clone()).unwrap();
384
385        let offer = SyncOffer::from_oplog(&log, 2, 0);
386        assert_eq!(offer.heads, vec![e1.hash]);
387        assert!(offer.bloom.contains(&g.hash));
388        assert!(offer.bloom.contains(&e1.hash));
389        assert_eq!(offer.physical_ms, 2);
390        assert_eq!(offer.logical, 0);
391    }
392
393    #[test]
394    fn sync_offer_serialization_roundtrip() {
395        let g = genesis("inst-a");
396        let log = OpLog::new(g.clone());
397        let offer = SyncOffer::from_oplog(&log, 1, 0);
398
399        let bytes = offer.to_bytes();
400        let restored = SyncOffer::from_bytes(&bytes).unwrap();
401        assert_eq!(restored.heads, offer.heads);
402        assert_eq!(restored.physical_ms, offer.physical_ms);
403        assert_eq!(restored.logical, offer.logical);
404        assert!(restored.bloom.contains(&g.hash));
405    }
406
407    // -- entries_missing tests --
408
409    #[test]
410    fn entries_missing_detects_delta() {
411        // Log A has: genesis → n1 → n2
412        // Log B has: genesis → n1
413        // B's offer should cause A to send n2.
414        let g = genesis("inst-a");
415        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
416        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
417
418        let mut log_a = OpLog::new(g.clone());
419        log_a.append(e1.clone()).unwrap();
420        log_a.append(e2.clone()).unwrap();
421
422        let mut log_b = OpLog::new(g.clone());
423        log_b.append(e1.clone()).unwrap();
424
425        let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
426        let payload = entries_missing(&log_a, &offer_b);
427
428        assert_eq!(payload.entries.len(), 1);
429        assert_eq!(payload.entries[0].hash, e2.hash);
430        assert!(payload.need.is_empty());
431    }
432
433    #[test]
434    fn entries_missing_nothing_when_in_sync() {
435        let g = genesis("inst-a");
436        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
437
438        let mut log_a = OpLog::new(g.clone());
439        log_a.append(e1.clone()).unwrap();
440
441        let mut log_b = OpLog::new(g.clone());
442        log_b.append(e1.clone()).unwrap();
443
444        let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
445        let payload = entries_missing(&log_a, &offer_b);
446
447        assert!(payload.entries.is_empty());
448        assert!(payload.need.is_empty());
449    }
450
451    #[test]
452    fn entries_missing_need_list_for_remote_only() {
453        // A has genesis only. B has genesis → n1.
454        // A should report that it needs B's head.
455        let g = genesis("inst-a");
456        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-b");
457
458        let log_a = OpLog::new(g.clone());
459
460        let mut log_b = OpLog::new(g.clone());
461        log_b.append(e1.clone()).unwrap();
462
463        let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
464        let payload = entries_missing(&log_a, &offer_b);
465
466        // A may send genesis because it can't verify B has it (B's head n1
467        // isn't in A's oplog). This is safe — merge ignores duplicates.
468        // The essential assertion: A needs B's head (e1).
469        assert_eq!(payload.need.len(), 1);
470        assert_eq!(payload.need[0], e1.hash);
471    }
472
473    #[test]
474    fn entries_missing_bloom_reduces_transfer() {
475        // Both have genesis + n1. A also has n2.
476        // B's bloom should contain genesis + n1, so only n2 is sent.
477        let g = genesis("inst-a");
478        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
479        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
480
481        let mut log_a = OpLog::new(g.clone());
482        log_a.append(e1.clone()).unwrap();
483        log_a.append(e2.clone()).unwrap();
484
485        let mut log_b = OpLog::new(g.clone());
486        log_b.append(e1.clone()).unwrap();
487
488        let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
489        let payload = entries_missing(&log_a, &offer_b);
490
491        // Only n2 should be sent (genesis and n1 are in B's bloom).
492        assert_eq!(payload.entries.len(), 1);
493        assert_eq!(payload.entries[0].hash, e2.hash);
494    }
495
496    // -- merge_entries tests --
497
498    #[test]
499    fn merge_entries_basic() {
500        let g = genesis("inst-a");
501        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
502        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
503
504        let mut log_b = OpLog::new(g.clone());
505        // B doesn't have e1 or e2 yet.
506        let merged = merge_entries(&mut log_b, &[e1.clone(), e2.clone()]).unwrap();
507
508        assert_eq!(merged, 2);
509        assert_eq!(log_b.len(), 3); // genesis + 2
510        assert!(log_b.get(&e1.hash).is_some());
511        assert!(log_b.get(&e2.hash).is_some());
512    }
513
514    #[test]
515    fn merge_entries_out_of_order() {
516        // Entries arrive child-first — merge should handle re-ordering.
517        let g = genesis("inst-a");
518        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
519        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
520
521        let mut log_b = OpLog::new(g.clone());
522        // Send e2 before e1 — e2 depends on e1.
523        let merged = merge_entries(&mut log_b, &[e2.clone(), e1.clone()]).unwrap();
524
525        assert_eq!(merged, 2);
526        assert_eq!(log_b.len(), 3);
527    }
528
529    #[test]
530    fn merge_entries_duplicates_ignored() {
531        let g = genesis("inst-a");
532        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
533
534        let mut log_b = OpLog::new(g.clone());
535        log_b.append(e1.clone()).unwrap();
536
537        // Merge same entry again — should be idempotent.
538        let merged = merge_entries(&mut log_b, &[e1.clone()]).unwrap();
539        assert_eq!(merged, 0);
540        assert_eq!(log_b.len(), 2);
541    }
542
543    #[test]
544    fn merge_entries_rejects_invalid_hash() {
545        let g = genesis("inst-a");
546        let mut bad = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
547        bad.author = "tampered".into(); // hash no longer valid
548
549        let mut log_b = OpLog::new(g.clone());
550        let result = merge_entries(&mut log_b, &[bad]);
551        assert!(result.is_err());
552        assert!(result.unwrap_err().contains("invalid hash"));
553    }
554
555    // -- Snapshot tests --
556
557    #[test]
558    fn snapshot_roundtrip() {
559        let g = genesis("inst-a");
560        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
561        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
562
563        let mut log = OpLog::new(g.clone());
564        log.append(e1.clone()).unwrap();
565        log.append(e2.clone()).unwrap();
566
567        let snapshot = Snapshot::from_oplog(&log);
568        assert_eq!(snapshot.entries.len(), 3);
569
570        let bytes = snapshot.to_bytes();
571        let restored = Snapshot::from_bytes(&bytes).unwrap();
572        assert_eq!(restored.entries.len(), 3);
573        assert_eq!(restored.entries[0].hash, g.hash);
574        assert_eq!(restored.entries[1].hash, e1.hash);
575        assert_eq!(restored.entries[2].hash, e2.hash);
576    }
577
578    #[test]
579    fn snapshot_can_bootstrap_new_peer() {
580        // Create log A with some entries, snapshot it, load into fresh log B.
581        let g = genesis("inst-a");
582        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
583        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
584
585        let mut log_a = OpLog::new(g.clone());
586        log_a.append(e1.clone()).unwrap();
587        log_a.append(e2.clone()).unwrap();
588
589        let snapshot = Snapshot::from_oplog(&log_a);
590
591        // Bootstrap a new peer from the snapshot.
592        let genesis_entry = &snapshot.entries[0];
593        let mut log_b = OpLog::new(genesis_entry.clone());
594        let remaining = &snapshot.entries[1..];
595        let merged = merge_entries(&mut log_b, remaining).unwrap();
596
597        assert_eq!(merged, 2);
598        assert_eq!(log_b.len(), 3);
599        assert_eq!(log_b.heads(), log_a.heads());
600    }
601
602    // -- Full sync protocol round-trip --
603
604    #[test]
605    fn full_sync_roundtrip_a_to_b() {
606        // A has entries B doesn't. Sync A → B.
607        let g = genesis("inst-a");
608        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
609        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
610
611        let mut log_a = OpLog::new(g.clone());
612        log_a.append(e1.clone()).unwrap();
613        log_a.append(e2.clone()).unwrap();
614
615        let mut log_b = OpLog::new(g.clone());
616
617        // Step 1: B generates offer.
618        let offer_b = SyncOffer::from_oplog(&log_b, 1, 0);
619
620        // Step 2: A computes what B is missing.
621        let payload = entries_missing(&log_a, &offer_b);
622
623        // Step 3: B merges the entries.
624        let merged = merge_entries(&mut log_b, &payload.entries).unwrap();
625
626        assert_eq!(merged, 2);
627        assert_eq!(log_b.len(), 3);
628        assert_eq!(log_a.heads(), log_b.heads());
629    }
630
631    #[test]
632    fn full_sync_bidirectional() {
633        // A and B both have unique entries. After bidirectional sync, both converge.
634        let g = genesis("inst-a");
635
636        // A: genesis → a1
637        let a1 = make_entry(add_node_op("a1"), vec![g.hash], 2, "inst-a");
638        let mut log_a = OpLog::new(g.clone());
639        log_a.append(a1.clone()).unwrap();
640
641        // B: genesis → b1
642        let b1 = make_entry(add_node_op("b1"), vec![g.hash], 2, "inst-b");
643        let mut log_b = OpLog::new(g.clone());
644        log_b.append(b1.clone()).unwrap();
645
646        // Sync A → B.
647        let offer_b = SyncOffer::from_oplog(&log_b, 2, 0);
648        let payload_a_to_b = entries_missing(&log_a, &offer_b);
649        merge_entries(&mut log_b, &payload_a_to_b.entries).unwrap();
650
651        // Sync B → A.
652        let offer_a = SyncOffer::from_oplog(&log_a, 2, 0);
653        let payload_b_to_a = entries_missing(&log_b, &offer_a);
654        merge_entries(&mut log_a, &payload_b_to_a.entries).unwrap();
655
656        // Both should have genesis + a1 + b1 = 3 entries.
657        assert_eq!(log_a.len(), 3);
658        assert_eq!(log_b.len(), 3);
659
660        // Both should have the same heads (a1 and b1 — fork).
661        let heads_a: HashSet<Hash> = log_a.heads().into_iter().collect();
662        let heads_b: HashSet<Hash> = log_b.heads().into_iter().collect();
663        assert_eq!(heads_a, heads_b);
664        assert!(heads_a.contains(&a1.hash));
665        assert!(heads_a.contains(&b1.hash));
666    }
667
668    #[test]
669    fn entries_missing_forces_heads_despite_bloom_fp() {
670        // D-027 fix: if our head is falsely contained in the remote's bloom,
671        // it must still be included in the payload (it's not in the remote's
672        // heads, so they don't have it).
673        let g = genesis("inst-a");
674        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
675        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
676
677        let mut log_a = OpLog::new(g.clone());
678        log_a.append(e1.clone()).unwrap();
679        log_a.append(e2.clone()).unwrap();
680
681        // Craft a fake offer where the bloom claims to have ALL of A's entries
682        // (simulating false positives), but remote heads are only [e1].
683        let mut bloom = BloomFilter::new(128, 0.01);
684        bloom.insert(&g.hash);
685        bloom.insert(&e1.hash);
686        bloom.insert(&e2.hash); // FP: bloom claims remote has e2
687
688        let fake_offer = SyncOffer {
689            protocol_version: PROTOCOL_VERSION,
690            heads: vec![e1.hash], // remote doesn't actually have e2
691            bloom,
692            physical_ms: 2,
693            logical: 0,
694        };
695
696        let payload = entries_missing(&log_a, &fake_offer);
697
698        // e2 must be included — it's our head and not in remote's heads.
699        let sent_hashes: HashSet<Hash> = payload.entries.iter().map(|e| e.hash).collect();
700        assert!(
701            sent_hashes.contains(&e2.hash),
702            "head entry must be sent even when bloom falsely contains it"
703        );
704    }
705
706    #[test]
707    fn entries_missing_forces_heads_with_ancestor_closure() {
708        // When the bloom FPs both a head AND its parent, the ancestor closure
709        // (Phase 2) should recover the parent after Phase 1.5 forces the head.
710        let g = genesis("inst-a");
711        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
712        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3, "inst-a");
713        let e3 = make_entry(add_node_op("n3"), vec![e2.hash], 4, "inst-a");
714
715        let mut log_a = OpLog::new(g.clone());
716        log_a.append(e1.clone()).unwrap();
717        log_a.append(e2.clone()).unwrap();
718        log_a.append(e3.clone()).unwrap();
719
720        // Bloom FPs everything. Remote only has genesis.
721        let mut bloom = BloomFilter::new(128, 0.01);
722        for entry in log_a.entries_since(None) {
723            bloom.insert(&entry.hash);
724        }
725
726        let fake_offer = SyncOffer {
727            protocol_version: PROTOCOL_VERSION,
728            heads: vec![g.hash],
729            bloom,
730            physical_ms: 1,
731            logical: 0,
732        };
733
734        let payload = entries_missing(&log_a, &fake_offer);
735        let sent_hashes: HashSet<Hash> = payload.entries.iter().map(|e| e.hash).collect();
736
737        // All non-genesis entries must be sent: e1, e2, e3.
738        // Phase 1.5 forces e3 (our head), Phase 2 recovers e2 and e1.
739        assert!(
740            sent_hashes.contains(&e1.hash),
741            "e1 must be recovered by ancestor closure"
742        );
743        assert!(
744            sent_hashes.contains(&e2.hash),
745            "e2 must be recovered by ancestor closure"
746        );
747        assert!(sent_hashes.contains(&e3.hash), "e3 must be forced as head");
748    }
749
750    #[test]
751    fn sync_is_idempotent() {
752        let g = genesis("inst-a");
753        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2, "inst-a");
754
755        let mut log_a = OpLog::new(g.clone());
756        log_a.append(e1.clone()).unwrap();
757
758        let mut log_b = OpLog::new(g.clone());
759
760        // Sync once.
761        let offer_b = SyncOffer::from_oplog(&log_b, 1, 0);
762        let payload = entries_missing(&log_a, &offer_b);
763        merge_entries(&mut log_b, &payload.entries).unwrap();
764        assert_eq!(log_b.len(), 2);
765
766        // Sync again — should be a no-op.
767        let offer_b2 = SyncOffer::from_oplog(&log_b, 2, 0);
768        let payload2 = entries_missing(&log_a, &offer_b2);
769        assert!(payload2.entries.is_empty());
770        let merged2 = merge_entries(&mut log_b, &payload2.entries).unwrap();
771        assert_eq!(merged2, 0);
772        assert_eq!(log_b.len(), 2);
773    }
774}