Skip to main content

calimero_node_primitives/sync/
wire.rs

1//! Wire protocol types for sync stream communication.
2//!
3//! This module contains the message types used for all sync protocol
4//! communication over network streams:
5//!
6//! - [`StreamMessage`]: Top-level message wrapper (Init or Message)
7//! - [`InitPayload`]: Initial request types (blob share, key share, delta, snapshot, etc.)
8//! - [`MessagePayload`]: Response and follow-up message types
9//!
10//! # Protocol Flow
11//!
12//! ```text
13//! Initiator                              Responder
14//! │                                            │
15//! │ ── StreamMessage::Init { payload } ──────► │
16//! │                                            │
17//! │ ◄── StreamMessage::Message { payload } ── │
18//! │                                            │
19//! │ ... (continue as needed) ...               │
20//! └────────────────────────────────────────────┘
21//! ```
22//!
23//! # Adding New Protocols
24//!
25//! To add a new sync protocol's wire messages:
26//! 1. Add request variant to [`InitPayload`]
27//! 2. Add response variant(s) to [`MessagePayload`]
28//! 3. Update re-exports in `sync.rs`
29
30use std::borrow::Cow;
31
32use borsh::{BorshDeserialize, BorshSerialize};
33use calimero_crypto::Nonce;
34use calimero_primitives::blobs::BlobId;
35use calimero_primitives::context::ContextId;
36use calimero_primitives::hash::Hash;
37use calimero_primitives::identity::{PrivateKey, PublicKey};
38
39use super::hash_comparison::TreeNode;
40use super::levelwise::LevelNode;
41use super::snapshot::SnapshotError;
42
43/// Maximum depth allowed in TreeNodeRequest.
44///
45/// Prevents malicious peers from requesting expensive deep traversals.
46/// Handlers should validate against this limit before processing.
47pub const MAX_TREE_REQUEST_DEPTH: u8 = 16;
48
49// =============================================================================
50// Stream Message Wrapper
51// =============================================================================
52
53/// Top-level message for sync stream communication.
54///
55/// All sync protocol messages are wrapped in this enum, which provides:
56/// - Context and identity information (in Init)
57/// - Sequence tracking (in Message)
58/// - Nonce for replay protection
59#[derive(Debug, BorshSerialize, BorshDeserialize)]
60pub enum StreamMessage<'a> {
61    /// Initial message to start a sync operation.
62    Init {
63        /// Context being synchronized.
64        context_id: ContextId,
65        /// Identity of the sending party.
66        party_id: PublicKey,
67        /// The specific request payload.
68        payload: InitPayload,
69        /// Nonce for the next message.
70        next_nonce: Nonce,
71    },
72    /// Follow-up message in an ongoing sync operation.
73    Message {
74        /// Sequence number for ordering.
75        ///
76        /// # Wire Format Change
77        ///
78        /// Changed from `usize` to `u64` for cross-platform portability.
79        /// This is a breaking wire format change - nodes must be upgraded
80        /// together to avoid deserialization failures.
81        sequence_id: u64,
82        /// The message payload.
83        payload: MessagePayload<'a>,
84        /// Nonce for the next message.
85        next_nonce: Nonce,
86    },
87    /// Opaque error - reveals nothing about node state.
88    ///
89    /// Used when something goes wrong but we don't want to leak
90    /// information to potentially malicious peers.
91    OpaqueError,
92}
93
94// =============================================================================
95// Init Payload (Requests)
96// =============================================================================
97
98/// Initial request payloads for various sync protocols.
99///
100/// Each variant represents a different type of sync request that can
101/// be initiated by a node.
102#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)]
103pub enum InitPayload {
104    /// Request to share a blob.
105    BlobShare {
106        /// ID of the blob to share.
107        blob_id: BlobId,
108    },
109
110    /// Request to share encryption keys.
111    KeyShare,
112
113    /// Request a specific delta by ID (for DAG gap filling).
114    DeltaRequest {
115        /// Context for the delta.
116        context_id: ContextId,
117        /// ID of the specific delta to request.
118        delta_id: [u8; 32],
119    },
120
121    /// Request peer's current DAG heads for catchup.
122    DagHeadsRequest {
123        /// Context to get DAG heads for.
124        context_id: ContextId,
125    },
126
127    /// Request snapshot boundary negotiation.
128    SnapshotBoundaryRequest {
129        /// Context for snapshot sync.
130        context_id: ContextId,
131        /// Optional requested cutoff timestamp.
132        requested_cutoff_timestamp: Option<u64>,
133    },
134
135    /// Request to stream snapshot pages.
136    SnapshotStreamRequest {
137        /// Context for snapshot sync.
138        context_id: ContextId,
139        /// Root hash that was negotiated in boundary request.
140        boundary_root_hash: Hash,
141        /// Maximum pages per response.
142        page_limit: u16,
143        /// Maximum bytes per response.
144        byte_limit: u32,
145        /// Resume cursor from previous page (for pagination).
146        resume_cursor: Option<Vec<u8>>,
147    },
148
149    /// Request tree node(s) for HashComparison sync (CIP §4).
150    ///
151    /// Used by the HashComparison protocol to request subtrees from a peer
152    /// for Merkle tree comparison.
153    TreeNodeRequest {
154        /// Context being synchronized.
155        context_id: ContextId,
156        /// ID of the node to request (root hash or entity ID).
157        node_id: [u8; 32],
158        /// Maximum depth to traverse from this node.
159        /// None means only the requested node, Some(1) includes immediate children.
160        max_depth: Option<u8>,
161    },
162
163    /// Request nodes at a specific level for LevelWise sync (CIP Appendix B).
164    ///
165    /// Used by the LevelWise protocol for breadth-first tree synchronization,
166    /// optimized for wide, shallow trees (depth ≤ 2).
167    LevelWiseRequest {
168        /// Context being synchronized.
169        context_id: ContextId,
170        /// Level to request (0 = root's children, 1 = grandchildren, etc.).
171        level: u32,
172        /// Parent IDs to fetch children for.
173        /// - `None` = fetch all nodes at this level
174        /// - `Some(ids)` = fetch only children of specified parents
175        parent_ids: Option<Vec<[u8; 32]>>,
176    },
177}
178
179// =============================================================================
180// Message Payload (Responses)
181// =============================================================================
182
183/// Response and follow-up message payloads.
184///
185/// Each variant represents a different type of response or continuation
186/// message in a sync protocol exchange.
187#[derive(Debug, BorshSerialize, BorshDeserialize)]
188pub enum MessagePayload<'a> {
189    /// Blob data chunk.
190    BlobShare {
191        /// Chunk of blob data.
192        chunk: Cow<'a, [u8]>,
193    },
194
195    /// Encryption key share.
196    KeyShare {
197        /// The sender's private key for the context.
198        sender_key: PrivateKey,
199    },
200
201    /// Response to DeltaRequest containing the requested delta.
202    DeltaResponse {
203        /// The serialized delta data.
204        delta: Cow<'a, [u8]>,
205    },
206
207    /// Delta not found response.
208    DeltaNotFound,
209
210    /// Response to DagHeadsRequest containing peer's current heads and root hash.
211    DagHeadsResponse {
212        /// Current DAG head hashes.
213        dag_heads: Vec<[u8; 32]>,
214        /// Current root hash.
215        root_hash: Hash,
216    },
217
218    /// Challenge to prove ownership of claimed identity.
219    Challenge {
220        /// Random challenge bytes.
221        challenge: [u8; 32],
222    },
223
224    /// Response to challenge with signature (Ed25519 signature is 64 bytes).
225    ChallengeResponse {
226        /// Signature proving identity ownership.
227        signature: [u8; 64],
228    },
229
230    /// Response to SnapshotBoundaryRequest.
231    SnapshotBoundaryResponse {
232        /// Authoritative boundary timestamp (nanoseconds since epoch).
233        boundary_timestamp: u64,
234        /// Root hash for the boundary state.
235        boundary_root_hash: Hash,
236        /// Peer's DAG heads at the boundary.
237        dag_heads: Vec<[u8; 32]>,
238    },
239
240    /// A page of snapshot data.
241    SnapshotPage {
242        /// Compressed payload data.
243        payload: Cow<'a, [u8]>,
244        /// Uncompressed length for validation.
245        uncompressed_len: u32,
246        /// Cursor for resuming (None if complete).
247        cursor: Option<Vec<u8>>,
248        /// Total page count.
249        page_count: u64,
250        /// Pages sent so far.
251        sent_count: u64,
252    },
253
254    /// Snapshot sync error.
255    SnapshotError {
256        /// The error that occurred.
257        error: SnapshotError,
258    },
259
260    /// Response to TreeNodeRequest for HashComparison sync (CIP §4).
261    ///
262    /// Contains tree nodes from the requested subtree for Merkle comparison.
263    TreeNodeResponse {
264        /// Tree nodes in the requested subtree.
265        ///
266        /// For a request with max_depth=0: contains just the requested node.
267        /// For max_depth=1: contains the node and its immediate children.
268        nodes: Vec<TreeNode>,
269        /// True if the requested node was not found.
270        not_found: bool,
271    },
272
273    /// Response to LevelWiseRequest for LevelWise sync (CIP Appendix B).
274    ///
275    /// Contains all nodes at the requested level for breadth-first comparison.
276    LevelWiseResponse {
277        /// Level these nodes are at.
278        level: u32,
279        /// Nodes at this level.
280        ///
281        /// Each node includes:
282        /// - `id` and `hash` for comparison
283        /// - `parent_id` for tree structure
284        /// - `leaf_data` if this is a leaf (includes full entity data for CRDT merge)
285        nodes: Vec<LevelNode>,
286        /// Whether there are more levels below this one.
287        has_more_levels: bool,
288    },
289}
290
291// =============================================================================
292// Tests
293// =============================================================================
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298
299    #[test]
300    fn test_init_payload_tree_node_request() {
301        let request = InitPayload::TreeNodeRequest {
302            context_id: ContextId::from([1u8; 32]),
303            node_id: [2u8; 32],
304            max_depth: Some(1),
305        };
306
307        let encoded = borsh::to_vec(&request).expect("serialize");
308        let decoded: InitPayload = borsh::from_slice(&encoded).expect("deserialize");
309
310        match decoded {
311            InitPayload::TreeNodeRequest {
312                context_id,
313                node_id,
314                max_depth,
315            } => {
316                assert_eq!(*context_id.as_ref(), [1u8; 32]);
317                assert_eq!(node_id, [2u8; 32]);
318                assert_eq!(max_depth, Some(1));
319            }
320            _ => panic!("wrong variant"),
321        }
322    }
323
324    #[test]
325    fn test_message_payload_tree_node_response() {
326        use crate::sync::hash_comparison::{LeafMetadata, TreeLeafData, TreeNode};
327
328        let leaf_data = TreeLeafData::new(
329            [10u8; 32],
330            vec![1, 2, 3],
331            LeafMetadata::new(
332                crate::sync::hash_comparison::CrdtType::lww_register("test"),
333                100,
334                [0u8; 32],
335            ),
336        );
337        let node = TreeNode::leaf([1u8; 32], [2u8; 32], leaf_data);
338
339        let response = MessagePayload::TreeNodeResponse {
340            nodes: vec![node],
341            not_found: false,
342        };
343
344        let encoded = borsh::to_vec(&response).expect("serialize");
345        let decoded: MessagePayload = borsh::from_slice(&encoded).expect("deserialize");
346
347        match decoded {
348            MessagePayload::TreeNodeResponse { nodes, not_found } => {
349                assert_eq!(nodes.len(), 1);
350                assert!(!not_found);
351            }
352            _ => panic!("wrong variant"),
353        }
354    }
355
356    #[test]
357    fn test_message_payload_tree_node_response_not_found() {
358        let response = MessagePayload::TreeNodeResponse {
359            nodes: vec![],
360            not_found: true,
361        };
362
363        let encoded = borsh::to_vec(&response).expect("serialize");
364        let decoded: MessagePayload = borsh::from_slice(&encoded).expect("deserialize");
365
366        match decoded {
367            MessagePayload::TreeNodeResponse { nodes, not_found } => {
368                assert!(nodes.is_empty());
369                assert!(not_found);
370            }
371            _ => panic!("wrong variant"),
372        }
373    }
374
375    // =========================================================================
376    // LevelWise Wire Protocol Tests
377    // =========================================================================
378
379    #[test]
380    fn test_init_payload_levelwise_request_full_level() {
381        let request = InitPayload::LevelWiseRequest {
382            context_id: ContextId::from([1u8; 32]),
383            level: 0,
384            parent_ids: None,
385        };
386
387        let encoded = borsh::to_vec(&request).expect("serialize");
388        let decoded: InitPayload = borsh::from_slice(&encoded).expect("deserialize");
389
390        match decoded {
391            InitPayload::LevelWiseRequest {
392                context_id,
393                level,
394                parent_ids,
395            } => {
396                assert_eq!(*context_id.as_ref(), [1u8; 32]);
397                assert_eq!(level, 0);
398                assert!(parent_ids.is_none());
399            }
400            _ => panic!("wrong variant"),
401        }
402    }
403
404    #[test]
405    fn test_init_payload_levelwise_request_with_parents() {
406        let parents = vec![[10u8; 32], [20u8; 32], [30u8; 32]];
407        let request = InitPayload::LevelWiseRequest {
408            context_id: ContextId::from([2u8; 32]),
409            level: 1,
410            parent_ids: Some(parents.clone()),
411        };
412
413        let encoded = borsh::to_vec(&request).expect("serialize");
414        let decoded: InitPayload = borsh::from_slice(&encoded).expect("deserialize");
415
416        match decoded {
417            InitPayload::LevelWiseRequest {
418                context_id,
419                level,
420                parent_ids,
421            } => {
422                assert_eq!(*context_id.as_ref(), [2u8; 32]);
423                assert_eq!(level, 1);
424                assert_eq!(parent_ids, Some(parents));
425            }
426            _ => panic!("wrong variant"),
427        }
428    }
429
430    #[test]
431    fn test_message_payload_levelwise_response_internal_nodes() {
432        use crate::sync::levelwise::LevelNode;
433
434        let nodes = vec![
435            LevelNode::internal([1u8; 32], [10u8; 32], None),
436            LevelNode::internal([2u8; 32], [20u8; 32], None),
437        ];
438
439        let response = MessagePayload::LevelWiseResponse {
440            level: 0,
441            nodes: nodes.clone(),
442            has_more_levels: true,
443        };
444
445        let encoded = borsh::to_vec(&response).expect("serialize");
446        let decoded: MessagePayload = borsh::from_slice(&encoded).expect("deserialize");
447
448        match decoded {
449            MessagePayload::LevelWiseResponse {
450                level,
451                nodes: decoded_nodes,
452                has_more_levels,
453            } => {
454                assert_eq!(level, 0);
455                assert_eq!(decoded_nodes.len(), 2);
456                assert!(has_more_levels);
457                assert!(decoded_nodes[0].is_internal());
458                assert!(decoded_nodes[1].is_internal());
459            }
460            _ => panic!("wrong variant"),
461        }
462    }
463
464    #[test]
465    fn test_message_payload_levelwise_response_with_leaves() {
466        use crate::sync::hash_comparison::{CrdtType, LeafMetadata, TreeLeafData};
467        use crate::sync::levelwise::LevelNode;
468
469        let metadata = LeafMetadata::new(CrdtType::lww_register("test"), 100, [0u8; 32]);
470        let leaf_data = TreeLeafData::new([5u8; 32], vec![1, 2, 3, 4], metadata);
471
472        let nodes = vec![
473            LevelNode::internal([1u8; 32], [10u8; 32], None),
474            LevelNode::leaf([2u8; 32], [20u8; 32], Some([1u8; 32]), leaf_data),
475        ];
476
477        let response = MessagePayload::LevelWiseResponse {
478            level: 1,
479            nodes,
480            has_more_levels: false,
481        };
482
483        let encoded = borsh::to_vec(&response).expect("serialize");
484        let decoded: MessagePayload = borsh::from_slice(&encoded).expect("deserialize");
485
486        match decoded {
487            MessagePayload::LevelWiseResponse {
488                level,
489                nodes: decoded_nodes,
490                has_more_levels,
491            } => {
492                assert_eq!(level, 1);
493                assert_eq!(decoded_nodes.len(), 2);
494                assert!(!has_more_levels);
495                assert!(decoded_nodes[0].is_internal());
496                assert!(decoded_nodes[1].is_leaf());
497                assert_eq!(decoded_nodes[1].parent_id, Some([1u8; 32]));
498            }
499            _ => panic!("wrong variant"),
500        }
501    }
502
503    #[test]
504    fn test_message_payload_levelwise_response_empty() {
505        let response = MessagePayload::LevelWiseResponse {
506            level: 2,
507            nodes: vec![],
508            has_more_levels: false,
509        };
510
511        let encoded = borsh::to_vec(&response).expect("serialize");
512        let decoded: MessagePayload = borsh::from_slice(&encoded).expect("deserialize");
513
514        match decoded {
515            MessagePayload::LevelWiseResponse {
516                level,
517                nodes,
518                has_more_levels,
519            } => {
520                assert_eq!(level, 2);
521                assert!(nodes.is_empty());
522                assert!(!has_more_levels);
523            }
524            _ => panic!("wrong variant"),
525        }
526    }
527}