Skip to main content

calimero_node_primitives/sync/
protocol.rs

1//! Sync protocol types and selection logic (CIP §2.3 - Protocol Selection Rules).
2//!
3//! Defines the available sync protocols and the logic to select the optimal one.
4
5use borsh::{BorshDeserialize, BorshSerialize};
6
7use super::handshake::{SyncCapabilities, SyncHandshake};
8use super::levelwise::should_use_levelwise;
9
10// =============================================================================
11// Protocol Kind (Discriminant-only)
12// =============================================================================
13
14/// Protocol capability identifier for sync negotiation.
15///
16/// This is a discriminant-only enum used for advertising which sync protocols
17/// a node supports. Unlike [`SyncProtocol`], this does not carry protocol-specific
18/// data, making it suitable for capability comparison with `contains()` and equality.
19///
20/// See CIP §2 - Sync Handshake Protocol.
21///
22/// **IMPORTANT**: Keep variants in sync with [`SyncProtocol`].
23#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, BorshSerialize, BorshDeserialize)]
24pub enum SyncProtocolKind {
25    /// No sync needed - root hashes already match.
26    None,
27    /// Delta-based sync via DAG traversal.
28    DeltaSync,
29    /// Hash-based Merkle tree comparison.
30    HashComparison,
31    /// Full state snapshot transfer.
32    Snapshot,
33    /// Bloom filter-based quick diff.
34    BloomFilter,
35    /// Subtree prefetch for deep localized changes.
36    SubtreePrefetch,
37    /// Level-wise sync for wide shallow trees.
38    LevelWise,
39}
40
41// =============================================================================
42// Protocol (With Data)
43// =============================================================================
44
45/// Sync protocol selection for negotiation.
46///
47/// Each variant represents a different synchronization strategy with different
48/// trade-offs in terms of bandwidth, latency, and computational overhead.
49///
50/// See CIP §1 - Sync Protocol Types.
51#[derive(Clone, Debug, PartialEq, BorshSerialize, BorshDeserialize)]
52pub enum SyncProtocol {
53    /// No sync needed - root hashes already match.
54    None,
55
56    /// Delta-based sync via DAG traversal.
57    ///
58    /// Best for: Small gaps, real-time updates.
59    DeltaSync {
60        /// Delta IDs that the requester is missing.
61        missing_delta_ids: Vec<[u8; 32]>,
62    },
63
64    /// Hash-based Merkle tree comparison.
65    ///
66    /// Best for: General-purpose catch-up, 10-50% divergence.
67    HashComparison {
68        /// Root hash to compare against.
69        root_hash: [u8; 32],
70        /// Subtree roots that differ (if known).
71        divergent_subtrees: Vec<[u8; 32]>,
72    },
73
74    /// Full state snapshot transfer.
75    ///
76    /// **CRITICAL**: Only valid for fresh nodes (Invariant I5).
77    /// Initialized nodes MUST use state-based sync with CRDT merge instead.
78    Snapshot {
79        /// Whether the snapshot is compressed.
80        compressed: bool,
81        /// Whether the responder guarantees snapshot is verifiable.
82        verified: bool,
83    },
84
85    /// Bloom filter-based quick diff.
86    ///
87    /// Best for: Large trees with small diff (<10% divergence).
88    BloomFilter {
89        /// Size of the bloom filter in bits.
90        filter_size: u64,
91        /// Expected false positive rate (0.0 to 1.0).
92        ///
93        /// **Note**: Validation of bounds is performed when constructing the actual
94        /// bloom filter, not at protocol negotiation time. Invalid values will cause
95        /// filter construction to fail gracefully.
96        false_positive_rate: f64,
97    },
98
99    /// Subtree prefetch for deep localized changes.
100    ///
101    /// Best for: Deep hierarchies with localized changes.
102    SubtreePrefetch {
103        /// Root IDs of subtrees to prefetch.
104        subtree_roots: Vec<[u8; 32]>,
105    },
106
107    /// Level-wise sync for wide shallow trees.
108    ///
109    /// Best for: Trees with depth ≤ 2 and many children.
110    LevelWise {
111        /// Maximum depth to sync.
112        max_depth: u32,
113    },
114}
115
116impl Default for SyncProtocol {
117    fn default() -> Self {
118        Self::None
119    }
120}
121
122impl SyncProtocol {
123    /// Returns the protocol kind (discriminant) for this protocol.
124    ///
125    /// Useful for capability matching where the protocol-specific data is irrelevant.
126    #[must_use]
127    pub fn kind(&self) -> SyncProtocolKind {
128        SyncProtocolKind::from(self)
129    }
130}
131
132impl From<&SyncProtocol> for SyncProtocolKind {
133    fn from(protocol: &SyncProtocol) -> Self {
134        match protocol {
135            SyncProtocol::None => Self::None,
136            SyncProtocol::DeltaSync { .. } => Self::DeltaSync,
137            SyncProtocol::HashComparison { .. } => Self::HashComparison,
138            SyncProtocol::Snapshot { .. } => Self::Snapshot,
139            SyncProtocol::BloomFilter { .. } => Self::BloomFilter,
140            SyncProtocol::SubtreePrefetch { .. } => Self::SubtreePrefetch,
141            SyncProtocol::LevelWise { .. } => Self::LevelWise,
142        }
143    }
144}
145
146// =============================================================================
147// Protocol Selection
148// =============================================================================
149
150/// Result of protocol selection with reasoning.
151#[derive(Clone, Debug)]
152pub struct ProtocolSelection {
153    /// The selected protocol.
154    pub protocol: SyncProtocol,
155    /// Human-readable reason for the selection (for logging).
156    pub reason: &'static str,
157}
158
159/// Calculate the divergence ratio between two handshakes.
160///
161/// Formula: `|local.entity_count - remote.entity_count| / max(remote.entity_count, 1)`
162///
163/// Returns a value in [0.0, ∞), where:
164/// - 0.0 = identical entity counts
165/// - 1.0 = 100% divergence (e.g., local=0, remote=100)
166/// - >1.0 = local has more entities than remote
167#[must_use]
168pub fn calculate_divergence(local: &SyncHandshake, remote: &SyncHandshake) -> f64 {
169    // Use abs_diff to avoid overflow when entity_count exceeds i64::MAX
170    let diff = local.entity_count.abs_diff(remote.entity_count);
171    let denominator = remote.entity_count.max(1);
172    diff as f64 / denominator as f64
173}
174
175/// Select the optimal sync protocol based on handshake information.
176///
177/// Implements the decision table from CIP §2.3:
178///
179/// | # | Condition | Selected Protocol |
180/// |---|-----------|-------------------|
181/// | 1 | `root_hash` match | `None` |
182/// | 2 | `!has_state` (fresh node) | `Snapshot` |
183/// | 3 | `has_state` AND divergence >50% | `HashComparison` |
184/// | 4 | `max_depth` >3 AND divergence <20% | `SubtreePrefetch` |
185/// | 5 | `entity_count` >50 AND divergence <10% | `BloomFilter` |
186/// | 6 | `max_depth` 1-2 AND avg children/level >10 | `LevelWise` |
187/// | 7 | (default) | `HashComparison` |
188///
189/// **CRITICAL (Invariant I5)**: Snapshot is NEVER selected for initialized nodes.
190/// This prevents silent data loss from overwriting local CRDT state.
191#[must_use]
192pub fn select_protocol(local: &SyncHandshake, remote: &SyncHandshake) -> ProtocolSelection {
193    // Rule 1: Already synced - no action needed
194    if local.root_hash == remote.root_hash {
195        return ProtocolSelection {
196            protocol: SyncProtocol::None,
197            reason: "root hashes match, already in sync",
198        };
199    }
200
201    // Check version compatibility first
202    if !local.is_version_compatible(remote) {
203        // Version mismatch - fall back to HashComparison as safest option
204        return ProtocolSelection {
205            protocol: SyncProtocol::HashComparison {
206                root_hash: remote.root_hash,
207                divergent_subtrees: vec![],
208            },
209            reason: "version mismatch, using safe fallback",
210        };
211    }
212
213    // Rule 2: Fresh node - Snapshot allowed
214    // CRITICAL: This is the ONLY case where Snapshot is permitted
215    if !local.has_state {
216        return ProtocolSelection {
217            protocol: SyncProtocol::Snapshot {
218                compressed: remote.entity_count > 100,
219                verified: true,
220            },
221            reason: "fresh node bootstrap via snapshot",
222        };
223    }
224
225    // From here on, local HAS state - NEVER use Snapshot (Invariant I5)
226    let divergence = calculate_divergence(local, remote);
227
228    // Rule 3: Large divergence (>50%) - use HashComparison with CRDT merge
229    if divergence > 0.5 {
230        return ProtocolSelection {
231            protocol: SyncProtocol::HashComparison {
232                root_hash: remote.root_hash,
233                divergent_subtrees: vec![],
234            },
235            reason: "high divergence (>50%), using hash comparison with CRDT merge",
236        };
237    }
238
239    // Rule 4: Deep tree with localized changes
240    if remote.max_depth > 3 && divergence < 0.2 {
241        return ProtocolSelection {
242            protocol: SyncProtocol::SubtreePrefetch {
243                subtree_roots: vec![], // Will be populated during sync
244            },
245            reason: "deep tree with low divergence, using subtree prefetch",
246        };
247    }
248
249    // Rule 5: Large tree with small diff
250    if remote.entity_count > 50 && divergence < 0.1 {
251        return ProtocolSelection {
252            protocol: SyncProtocol::BloomFilter {
253                // ~10 bits per entity, max 10k; saturating to avoid overflow on huge counts
254                filter_size: remote.entity_count.saturating_mul(10).min(10_000),
255                false_positive_rate: 0.01,
256            },
257            reason: "large tree with small divergence, using bloom filter",
258        };
259    }
260
261    // Rule 6: Wide shallow tree (depth 1-2 with many children per level)
262    // Delegate to the canonical heuristic in levelwise module
263    let max_depth_usize = remote.max_depth as usize;
264    let avg_children_per_level = if remote.max_depth > 0 {
265        (remote.entity_count / u64::from(remote.max_depth)) as usize
266    } else {
267        0
268    };
269    if should_use_levelwise(max_depth_usize, avg_children_per_level) {
270        return ProtocolSelection {
271            protocol: SyncProtocol::LevelWise {
272                max_depth: remote.max_depth,
273            },
274            reason: "wide shallow tree, using level-wise sync",
275        };
276    }
277
278    // Rule 7: Default fallback
279    ProtocolSelection {
280        protocol: SyncProtocol::HashComparison {
281            root_hash: remote.root_hash,
282            divergent_subtrees: vec![],
283        },
284        reason: "default: using hash comparison",
285    }
286}
287
288/// Check if a protocol kind is supported by the remote peer.
289#[must_use]
290pub fn is_protocol_supported(protocol: &SyncProtocol, capabilities: &SyncCapabilities) -> bool {
291    capabilities.supported_protocols.contains(&protocol.kind())
292}
293
294/// Select protocol with fallback if preferred is not supported.
295///
296/// Tries the preferred protocol first, then falls back through the decision
297/// table until a mutually supported protocol is found.
298#[must_use]
299pub fn select_protocol_with_fallback(
300    local: &SyncHandshake,
301    remote: &SyncHandshake,
302    remote_capabilities: &SyncCapabilities,
303) -> ProtocolSelection {
304    let preferred = select_protocol(local, remote);
305
306    // Check if preferred protocol is supported
307    if is_protocol_supported(&preferred.protocol, remote_capabilities) {
308        return preferred;
309    }
310
311    // Fallback: HashComparison is always safe for initialized nodes
312    if local.has_state {
313        let fallback = SyncProtocol::HashComparison {
314            root_hash: remote.root_hash,
315            divergent_subtrees: vec![],
316        };
317        if is_protocol_supported(&fallback, remote_capabilities) {
318            return ProtocolSelection {
319                protocol: fallback,
320                reason: "fallback to hash comparison (preferred not supported)",
321            };
322        }
323    }
324
325    // Last resort: None (will need manual intervention)
326    ProtocolSelection {
327        protocol: SyncProtocol::None,
328        reason: "no mutually supported protocol found",
329    }
330}
331
332// =============================================================================
333// Tests
334// =============================================================================
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339    use crate::sync::handshake::SYNC_PROTOCOL_VERSION;
340
341    #[test]
342    fn test_sync_protocol_roundtrip() {
343        let protocols = vec![
344            SyncProtocol::None,
345            SyncProtocol::DeltaSync {
346                missing_delta_ids: vec![[1; 32], [2; 32]],
347            },
348            SyncProtocol::HashComparison {
349                root_hash: [3; 32],
350                divergent_subtrees: vec![[4; 32]],
351            },
352            SyncProtocol::Snapshot {
353                compressed: true,
354                verified: false,
355            },
356            SyncProtocol::BloomFilter {
357                filter_size: 1024,
358                false_positive_rate: 0.01,
359            },
360            SyncProtocol::SubtreePrefetch {
361                subtree_roots: vec![[5; 32], [6; 32]],
362            },
363            SyncProtocol::LevelWise { max_depth: 3 },
364        ];
365
366        for protocol in protocols {
367            let encoded = borsh::to_vec(&protocol).expect("serialize");
368            let decoded: SyncProtocol = borsh::from_slice(&encoded).expect("deserialize");
369            assert_eq!(protocol, decoded);
370        }
371    }
372
373    #[test]
374    fn test_sync_protocol_kind_roundtrip() {
375        let kinds = vec![
376            SyncProtocolKind::None,
377            SyncProtocolKind::DeltaSync,
378            SyncProtocolKind::HashComparison,
379            SyncProtocolKind::Snapshot,
380            SyncProtocolKind::BloomFilter,
381            SyncProtocolKind::SubtreePrefetch,
382            SyncProtocolKind::LevelWise,
383        ];
384
385        for kind in kinds {
386            let encoded = borsh::to_vec(&kind).expect("serialize");
387            let decoded: SyncProtocolKind = borsh::from_slice(&encoded).expect("deserialize");
388            assert_eq!(kind, decoded);
389        }
390    }
391
392    #[test]
393    fn test_sync_protocol_kind_conversion() {
394        // Test kind() method and From trait
395        assert_eq!(SyncProtocol::None.kind(), SyncProtocolKind::None);
396        assert_eq!(
397            SyncProtocol::DeltaSync {
398                missing_delta_ids: vec![[1; 32]]
399            }
400            .kind(),
401            SyncProtocolKind::DeltaSync
402        );
403        assert_eq!(
404            SyncProtocol::HashComparison {
405                root_hash: [2; 32],
406                divergent_subtrees: vec![]
407            }
408            .kind(),
409            SyncProtocolKind::HashComparison
410        );
411        assert_eq!(
412            SyncProtocol::Snapshot {
413                compressed: true,
414                verified: true
415            }
416            .kind(),
417            SyncProtocolKind::Snapshot
418        );
419        assert_eq!(
420            SyncProtocol::BloomFilter {
421                filter_size: 1024,
422                false_positive_rate: 0.01
423            }
424            .kind(),
425            SyncProtocolKind::BloomFilter
426        );
427        assert_eq!(
428            SyncProtocol::SubtreePrefetch {
429                subtree_roots: vec![]
430            }
431            .kind(),
432            SyncProtocolKind::SubtreePrefetch
433        );
434        assert_eq!(
435            SyncProtocol::LevelWise { max_depth: 5 }.kind(),
436            SyncProtocolKind::LevelWise
437        );
438
439        // Test From trait directly
440        let protocol = SyncProtocol::HashComparison {
441            root_hash: [3; 32],
442            divergent_subtrees: vec![],
443        };
444        let kind: SyncProtocolKind = (&protocol).into();
445        assert_eq!(kind, SyncProtocolKind::HashComparison);
446    }
447
448    #[test]
449    fn test_calculate_divergence() {
450        // Same counts
451        let local = SyncHandshake::new([1; 32], 100, 5, vec![]);
452        let remote = SyncHandshake::new([2; 32], 100, 5, vec![]);
453        assert!((calculate_divergence(&local, &remote) - 0.0).abs() < f64::EPSILON);
454
455        // 50% divergence
456        let local = SyncHandshake::new([1; 32], 50, 5, vec![]);
457        let remote = SyncHandshake::new([2; 32], 100, 5, vec![]);
458        assert!((calculate_divergence(&local, &remote) - 0.5).abs() < f64::EPSILON);
459
460        // 100% divergence (local empty)
461        let local = SyncHandshake::new([1; 32], 0, 0, vec![]);
462        let remote = SyncHandshake::new([2; 32], 100, 5, vec![]);
463        assert!((calculate_divergence(&local, &remote) - 1.0).abs() < f64::EPSILON);
464
465        // Handles zero remote count
466        let local = SyncHandshake::new([1; 32], 100, 5, vec![]);
467        let remote = SyncHandshake::new([2; 32], 0, 0, vec![]);
468        assert!((calculate_divergence(&local, &remote) - 100.0).abs() < f64::EPSILON);
469    }
470
471    #[test]
472    fn test_select_protocol_rule1_already_synced() {
473        let local = SyncHandshake::new([42; 32], 100, 5, vec![]);
474        let remote = SyncHandshake::new([42; 32], 200, 3, vec![]); // Same root hash
475
476        let selection = select_protocol(&local, &remote);
477        assert!(matches!(selection.protocol, SyncProtocol::None));
478        assert!(selection.reason.contains("already in sync"));
479    }
480
481    #[test]
482    fn test_select_protocol_rule2_fresh_node_gets_snapshot() {
483        let local = SyncHandshake::new([0; 32], 0, 0, vec![]); // Fresh node
484        let remote = SyncHandshake::new([42; 32], 200, 5, vec![]);
485
486        let selection = select_protocol(&local, &remote);
487        assert!(matches!(selection.protocol, SyncProtocol::Snapshot { .. }));
488        assert!(selection.reason.contains("fresh node"));
489    }
490
491    #[test]
492    fn test_select_protocol_rule3_initialized_node_never_gets_snapshot() {
493        // CRITICAL TEST for Invariant I5
494        let local = SyncHandshake::new([1; 32], 1, 1, vec![]); // Has state!
495        let remote = SyncHandshake::new([42; 32], 200, 5, vec![]);
496
497        let selection = select_protocol(&local, &remote);
498        // Even with high divergence, should NOT get Snapshot
499        assert!(!matches!(selection.protocol, SyncProtocol::Snapshot { .. }));
500    }
501
502    #[test]
503    fn test_select_protocol_rule3_high_divergence_uses_hash_comparison() {
504        let local = SyncHandshake::new([1; 32], 10, 2, vec![]); // Has state
505        let remote = SyncHandshake::new([2; 32], 100, 5, vec![]); // 90% divergence
506
507        let selection = select_protocol(&local, &remote);
508        assert!(matches!(
509            selection.protocol,
510            SyncProtocol::HashComparison { .. }
511        ));
512        assert!(selection.reason.contains("divergence"));
513    }
514
515    #[test]
516    fn test_select_protocol_rule4_deep_tree_uses_subtree_prefetch() {
517        let local = SyncHandshake::new([1; 32], 90, 5, vec![]); // ~10% divergence
518        let remote = SyncHandshake::new([2; 32], 100, 5, vec![]); // depth > 3
519
520        let selection = select_protocol(&local, &remote);
521        assert!(matches!(
522            selection.protocol,
523            SyncProtocol::SubtreePrefetch { .. }
524        ));
525        assert!(selection.reason.contains("subtree"));
526    }
527
528    #[test]
529    fn test_select_protocol_rule5_large_tree_small_diff_uses_bloom() {
530        let local = SyncHandshake::new([1; 32], 95, 2, vec![]); // ~5% divergence
531        let remote = SyncHandshake::new([2; 32], 100, 2, vec![]); // entity_count > 50
532
533        let selection = select_protocol(&local, &remote);
534        assert!(matches!(
535            selection.protocol,
536            SyncProtocol::BloomFilter { .. }
537        ));
538        assert!(selection.reason.contains("bloom"));
539    }
540
541    #[test]
542    fn test_select_protocol_rule6_wide_shallow_uses_levelwise() {
543        // Wide shallow tree: max_depth <= 2, many children per level, entity_count <= 50
544        // (to avoid triggering bloom filter rule first)
545        let local = SyncHandshake::new([1; 32], 40, 2, vec![]);
546        let remote = SyncHandshake::new([2; 32], 40, 2, vec![]);
547
548        let selection = select_protocol(&local, &remote);
549        assert!(matches!(selection.protocol, SyncProtocol::LevelWise { .. }));
550        assert!(selection.reason.contains("level"));
551    }
552
553    #[test]
554    fn test_select_protocol_rule7_default_uses_hash_comparison() {
555        // Create conditions that don't match any specific rule
556        let local = SyncHandshake::new([1; 32], 30, 2, vec![]); // ~25% divergence
557        let remote = SyncHandshake::new([2; 32], 40, 3, vec![]); // depth=3, not >3
558
559        let selection = select_protocol(&local, &remote);
560        assert!(matches!(
561            selection.protocol,
562            SyncProtocol::HashComparison { .. }
563        ));
564        assert!(selection.reason.contains("default"));
565    }
566
567    #[test]
568    fn test_select_protocol_version_mismatch_uses_safe_fallback() {
569        let local = SyncHandshake::new([1; 32], 100, 5, vec![]);
570        let mut remote = SyncHandshake::new([2; 32], 100, 5, vec![]);
571        remote.version = SYNC_PROTOCOL_VERSION + 1; // Incompatible version
572
573        let selection = select_protocol(&local, &remote);
574        assert!(matches!(
575            selection.protocol,
576            SyncProtocol::HashComparison { .. }
577        ));
578        assert!(selection.reason.contains("version mismatch"));
579    }
580
581    #[test]
582    fn test_is_protocol_supported() {
583        let caps = SyncCapabilities::default();
584
585        // Supported (in default list)
586        assert!(is_protocol_supported(&SyncProtocol::None, &caps));
587        assert!(is_protocol_supported(
588            &SyncProtocol::HashComparison {
589                root_hash: [0; 32],
590                divergent_subtrees: vec![]
591            },
592            &caps
593        ));
594
595        // Not supported (not in default list)
596        assert!(!is_protocol_supported(
597            &SyncProtocol::SubtreePrefetch {
598                subtree_roots: vec![]
599            },
600            &caps
601        ));
602
603        // LevelWise IS supported in default list
604        assert!(is_protocol_supported(
605            &SyncProtocol::LevelWise { max_depth: 2 },
606            &caps
607        ));
608    }
609
610    #[test]
611    fn test_select_protocol_with_fallback() {
612        let local = SyncHandshake::new([1; 32], 90, 5, vec![]); // Would prefer SubtreePrefetch
613        let remote = SyncHandshake::new([2; 32], 100, 5, vec![]);
614        let caps = SyncCapabilities::default(); // Doesn't support SubtreePrefetch
615
616        let selection = select_protocol_with_fallback(&local, &remote, &caps);
617
618        // Should fall back to HashComparison since SubtreePrefetch not supported
619        assert!(matches!(
620            selection.protocol,
621            SyncProtocol::HashComparison { .. }
622        ));
623        assert!(selection.reason.contains("fallback"));
624    }
625}