calimero_node_primitives/sync/wire.rs
1//! Wire protocol types for sync stream communication.
2//!
3//! This module contains the message types used for all sync protocol
4//! communication over network streams:
5//!
6//! - [`StreamMessage`]: Top-level message wrapper (Init or Message)
7//! - [`InitPayload`]: Initial request types (blob share, key share, delta, snapshot, etc.)
8//! - [`MessagePayload`]: Response and follow-up message types
9//!
10//! # Protocol Flow
11//!
12//! ```text
13//! Initiator Responder
14//! │ │
15//! │ ── StreamMessage::Init { payload } ──────► │
16//! │ │
17//! │ ◄── StreamMessage::Message { payload } ── │
18//! │ │
19//! │ ... (continue as needed) ... │
20//! └────────────────────────────────────────────┘
21//! ```
22//!
23//! # Adding New Protocols
24//!
25//! To add a new sync protocol's wire messages:
26//! 1. Add request variant to [`InitPayload`]
27//! 2. Add response variant(s) to [`MessagePayload`]
28//! 3. Update re-exports in `sync.rs`
29
30use std::borrow::Cow;
31
32use borsh::{BorshDeserialize, BorshSerialize};
33use calimero_crypto::Nonce;
34use calimero_primitives::blobs::BlobId;
35use calimero_primitives::context::ContextId;
36use calimero_primitives::hash::Hash;
37use calimero_primitives::identity::{PrivateKey, PublicKey};
38
39use super::hash_comparison::TreeNode;
40use super::levelwise::LevelNode;
41use super::snapshot::SnapshotError;
42
43/// Maximum depth allowed in TreeNodeRequest.
44///
45/// Prevents malicious peers from requesting expensive deep traversals.
46/// Handlers should validate against this limit before processing.
47pub const MAX_TREE_REQUEST_DEPTH: u8 = 16;
48
49// =============================================================================
50// Stream Message Wrapper
51// =============================================================================
52
53/// Top-level message for sync stream communication.
54///
55/// All sync protocol messages are wrapped in this enum, which provides:
56/// - Context and identity information (in Init)
57/// - Sequence tracking (in Message)
58/// - Nonce for replay protection
59#[derive(Debug, BorshSerialize, BorshDeserialize)]
60pub enum StreamMessage<'a> {
61 /// Initial message to start a sync operation.
62 Init {
63 /// Context being synchronized.
64 context_id: ContextId,
65 /// Identity of the sending party.
66 party_id: PublicKey,
67 /// The specific request payload.
68 payload: InitPayload,
69 /// Nonce for the next message.
70 next_nonce: Nonce,
71 },
72 /// Follow-up message in an ongoing sync operation.
73 Message {
74 /// Sequence number for ordering.
75 ///
76 /// # Wire Format Change
77 ///
78 /// Changed from `usize` to `u64` for cross-platform portability.
79 /// This is a breaking wire format change - nodes must be upgraded
80 /// together to avoid deserialization failures.
81 sequence_id: u64,
82 /// The message payload.
83 payload: MessagePayload<'a>,
84 /// Nonce for the next message.
85 next_nonce: Nonce,
86 },
87 /// Opaque error - reveals nothing about node state.
88 ///
89 /// Used when something goes wrong but we don't want to leak
90 /// information to potentially malicious peers.
91 OpaqueError,
92}
93
94// =============================================================================
95// Init Payload (Requests)
96// =============================================================================
97
98/// Initial request payloads for various sync protocols.
99///
100/// Each variant represents a different type of sync request that can
101/// be initiated by a node.
102#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)]
103pub enum InitPayload {
104 /// Request to share a blob.
105 BlobShare {
106 /// ID of the blob to share.
107 blob_id: BlobId,
108 },
109
110 /// Request to share encryption keys.
111 KeyShare,
112
113 /// Request a specific delta by ID (for DAG gap filling).
114 DeltaRequest {
115 /// Context for the delta.
116 context_id: ContextId,
117 /// ID of the specific delta to request.
118 delta_id: [u8; 32],
119 },
120
121 /// Request peer's current DAG heads for catchup.
122 DagHeadsRequest {
123 /// Context to get DAG heads for.
124 context_id: ContextId,
125 },
126
127 /// Request snapshot boundary negotiation.
128 SnapshotBoundaryRequest {
129 /// Context for snapshot sync.
130 context_id: ContextId,
131 /// Optional requested cutoff timestamp.
132 requested_cutoff_timestamp: Option<u64>,
133 },
134
135 /// Request to stream snapshot pages.
136 SnapshotStreamRequest {
137 /// Context for snapshot sync.
138 context_id: ContextId,
139 /// Root hash that was negotiated in boundary request.
140 boundary_root_hash: Hash,
141 /// Maximum pages per response.
142 page_limit: u16,
143 /// Maximum bytes per response.
144 byte_limit: u32,
145 /// Resume cursor from previous page (for pagination).
146 resume_cursor: Option<Vec<u8>>,
147 },
148
149 /// Request tree node(s) for HashComparison sync (CIP §4).
150 ///
151 /// Used by the HashComparison protocol to request subtrees from a peer
152 /// for Merkle tree comparison.
153 TreeNodeRequest {
154 /// Context being synchronized.
155 context_id: ContextId,
156 /// ID of the node to request (root hash or entity ID).
157 node_id: [u8; 32],
158 /// Maximum depth to traverse from this node.
159 /// None means only the requested node, Some(1) includes immediate children.
160 max_depth: Option<u8>,
161 },
162
163 /// Request nodes at a specific level for LevelWise sync (CIP Appendix B).
164 ///
165 /// Used by the LevelWise protocol for breadth-first tree synchronization,
166 /// optimized for wide, shallow trees (depth ≤ 2).
167 LevelWiseRequest {
168 /// Context being synchronized.
169 context_id: ContextId,
170 /// Level to request (0 = root's children, 1 = grandchildren, etc.).
171 level: u32,
172 /// Parent IDs to fetch children for.
173 /// - `None` = fetch all nodes at this level
174 /// - `Some(ids)` = fetch only children of specified parents
175 parent_ids: Option<Vec<[u8; 32]>>,
176 },
177}
178
179// =============================================================================
180// Message Payload (Responses)
181// =============================================================================
182
183/// Response and follow-up message payloads.
184///
185/// Each variant represents a different type of response or continuation
186/// message in a sync protocol exchange.
187#[derive(Debug, BorshSerialize, BorshDeserialize)]
188pub enum MessagePayload<'a> {
189 /// Blob data chunk.
190 BlobShare {
191 /// Chunk of blob data.
192 chunk: Cow<'a, [u8]>,
193 },
194
195 /// Encryption key share.
196 KeyShare {
197 /// The sender's private key for the context.
198 sender_key: PrivateKey,
199 },
200
201 /// Response to DeltaRequest containing the requested delta.
202 DeltaResponse {
203 /// The serialized delta data.
204 delta: Cow<'a, [u8]>,
205 },
206
207 /// Delta not found response.
208 DeltaNotFound,
209
210 /// Response to DagHeadsRequest containing peer's current heads and root hash.
211 DagHeadsResponse {
212 /// Current DAG head hashes.
213 dag_heads: Vec<[u8; 32]>,
214 /// Current root hash.
215 root_hash: Hash,
216 },
217
218 /// Challenge to prove ownership of claimed identity.
219 Challenge {
220 /// Random challenge bytes.
221 challenge: [u8; 32],
222 },
223
224 /// Response to challenge with signature (Ed25519 signature is 64 bytes).
225 ChallengeResponse {
226 /// Signature proving identity ownership.
227 signature: [u8; 64],
228 },
229
230 /// Response to SnapshotBoundaryRequest.
231 SnapshotBoundaryResponse {
232 /// Authoritative boundary timestamp (nanoseconds since epoch).
233 boundary_timestamp: u64,
234 /// Root hash for the boundary state.
235 boundary_root_hash: Hash,
236 /// Peer's DAG heads at the boundary.
237 dag_heads: Vec<[u8; 32]>,
238 },
239
240 /// A page of snapshot data.
241 SnapshotPage {
242 /// Compressed payload data.
243 payload: Cow<'a, [u8]>,
244 /// Uncompressed length for validation.
245 uncompressed_len: u32,
246 /// Cursor for resuming (None if complete).
247 cursor: Option<Vec<u8>>,
248 /// Total page count.
249 page_count: u64,
250 /// Pages sent so far.
251 sent_count: u64,
252 },
253
254 /// Snapshot sync error.
255 SnapshotError {
256 /// The error that occurred.
257 error: SnapshotError,
258 },
259
260 /// Response to TreeNodeRequest for HashComparison sync (CIP §4).
261 ///
262 /// Contains tree nodes from the requested subtree for Merkle comparison.
263 TreeNodeResponse {
264 /// Tree nodes in the requested subtree.
265 ///
266 /// For a request with max_depth=0: contains just the requested node.
267 /// For max_depth=1: contains the node and its immediate children.
268 nodes: Vec<TreeNode>,
269 /// True if the requested node was not found.
270 not_found: bool,
271 },
272
273 /// Response to LevelWiseRequest for LevelWise sync (CIP Appendix B).
274 ///
275 /// Contains all nodes at the requested level for breadth-first comparison.
276 LevelWiseResponse {
277 /// Level these nodes are at.
278 level: u32,
279 /// Nodes at this level.
280 ///
281 /// Each node includes:
282 /// - `id` and `hash` for comparison
283 /// - `parent_id` for tree structure
284 /// - `leaf_data` if this is a leaf (includes full entity data for CRDT merge)
285 nodes: Vec<LevelNode>,
286 /// Whether there are more levels below this one.
287 has_more_levels: bool,
288 },
289}
290
291// =============================================================================
292// Tests
293// =============================================================================
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298
299 #[test]
300 fn test_init_payload_tree_node_request() {
301 let request = InitPayload::TreeNodeRequest {
302 context_id: ContextId::from([1u8; 32]),
303 node_id: [2u8; 32],
304 max_depth: Some(1),
305 };
306
307 let encoded = borsh::to_vec(&request).expect("serialize");
308 let decoded: InitPayload = borsh::from_slice(&encoded).expect("deserialize");
309
310 match decoded {
311 InitPayload::TreeNodeRequest {
312 context_id,
313 node_id,
314 max_depth,
315 } => {
316 assert_eq!(*context_id.as_ref(), [1u8; 32]);
317 assert_eq!(node_id, [2u8; 32]);
318 assert_eq!(max_depth, Some(1));
319 }
320 _ => panic!("wrong variant"),
321 }
322 }
323
324 #[test]
325 fn test_message_payload_tree_node_response() {
326 use crate::sync::hash_comparison::{LeafMetadata, TreeLeafData, TreeNode};
327
328 let leaf_data = TreeLeafData::new(
329 [10u8; 32],
330 vec![1, 2, 3],
331 LeafMetadata::new(
332 crate::sync::hash_comparison::CrdtType::lww_register("test"),
333 100,
334 [0u8; 32],
335 ),
336 );
337 let node = TreeNode::leaf([1u8; 32], [2u8; 32], leaf_data);
338
339 let response = MessagePayload::TreeNodeResponse {
340 nodes: vec![node],
341 not_found: false,
342 };
343
344 let encoded = borsh::to_vec(&response).expect("serialize");
345 let decoded: MessagePayload = borsh::from_slice(&encoded).expect("deserialize");
346
347 match decoded {
348 MessagePayload::TreeNodeResponse { nodes, not_found } => {
349 assert_eq!(nodes.len(), 1);
350 assert!(!not_found);
351 }
352 _ => panic!("wrong variant"),
353 }
354 }
355
356 #[test]
357 fn test_message_payload_tree_node_response_not_found() {
358 let response = MessagePayload::TreeNodeResponse {
359 nodes: vec![],
360 not_found: true,
361 };
362
363 let encoded = borsh::to_vec(&response).expect("serialize");
364 let decoded: MessagePayload = borsh::from_slice(&encoded).expect("deserialize");
365
366 match decoded {
367 MessagePayload::TreeNodeResponse { nodes, not_found } => {
368 assert!(nodes.is_empty());
369 assert!(not_found);
370 }
371 _ => panic!("wrong variant"),
372 }
373 }
374
375 // =========================================================================
376 // LevelWise Wire Protocol Tests
377 // =========================================================================
378
379 #[test]
380 fn test_init_payload_levelwise_request_full_level() {
381 let request = InitPayload::LevelWiseRequest {
382 context_id: ContextId::from([1u8; 32]),
383 level: 0,
384 parent_ids: None,
385 };
386
387 let encoded = borsh::to_vec(&request).expect("serialize");
388 let decoded: InitPayload = borsh::from_slice(&encoded).expect("deserialize");
389
390 match decoded {
391 InitPayload::LevelWiseRequest {
392 context_id,
393 level,
394 parent_ids,
395 } => {
396 assert_eq!(*context_id.as_ref(), [1u8; 32]);
397 assert_eq!(level, 0);
398 assert!(parent_ids.is_none());
399 }
400 _ => panic!("wrong variant"),
401 }
402 }
403
404 #[test]
405 fn test_init_payload_levelwise_request_with_parents() {
406 let parents = vec![[10u8; 32], [20u8; 32], [30u8; 32]];
407 let request = InitPayload::LevelWiseRequest {
408 context_id: ContextId::from([2u8; 32]),
409 level: 1,
410 parent_ids: Some(parents.clone()),
411 };
412
413 let encoded = borsh::to_vec(&request).expect("serialize");
414 let decoded: InitPayload = borsh::from_slice(&encoded).expect("deserialize");
415
416 match decoded {
417 InitPayload::LevelWiseRequest {
418 context_id,
419 level,
420 parent_ids,
421 } => {
422 assert_eq!(*context_id.as_ref(), [2u8; 32]);
423 assert_eq!(level, 1);
424 assert_eq!(parent_ids, Some(parents));
425 }
426 _ => panic!("wrong variant"),
427 }
428 }
429
430 #[test]
431 fn test_message_payload_levelwise_response_internal_nodes() {
432 use crate::sync::levelwise::LevelNode;
433
434 let nodes = vec![
435 LevelNode::internal([1u8; 32], [10u8; 32], None),
436 LevelNode::internal([2u8; 32], [20u8; 32], None),
437 ];
438
439 let response = MessagePayload::LevelWiseResponse {
440 level: 0,
441 nodes: nodes.clone(),
442 has_more_levels: true,
443 };
444
445 let encoded = borsh::to_vec(&response).expect("serialize");
446 let decoded: MessagePayload = borsh::from_slice(&encoded).expect("deserialize");
447
448 match decoded {
449 MessagePayload::LevelWiseResponse {
450 level,
451 nodes: decoded_nodes,
452 has_more_levels,
453 } => {
454 assert_eq!(level, 0);
455 assert_eq!(decoded_nodes.len(), 2);
456 assert!(has_more_levels);
457 assert!(decoded_nodes[0].is_internal());
458 assert!(decoded_nodes[1].is_internal());
459 }
460 _ => panic!("wrong variant"),
461 }
462 }
463
464 #[test]
465 fn test_message_payload_levelwise_response_with_leaves() {
466 use crate::sync::hash_comparison::{CrdtType, LeafMetadata, TreeLeafData};
467 use crate::sync::levelwise::LevelNode;
468
469 let metadata = LeafMetadata::new(CrdtType::lww_register("test"), 100, [0u8; 32]);
470 let leaf_data = TreeLeafData::new([5u8; 32], vec![1, 2, 3, 4], metadata);
471
472 let nodes = vec![
473 LevelNode::internal([1u8; 32], [10u8; 32], None),
474 LevelNode::leaf([2u8; 32], [20u8; 32], Some([1u8; 32]), leaf_data),
475 ];
476
477 let response = MessagePayload::LevelWiseResponse {
478 level: 1,
479 nodes,
480 has_more_levels: false,
481 };
482
483 let encoded = borsh::to_vec(&response).expect("serialize");
484 let decoded: MessagePayload = borsh::from_slice(&encoded).expect("deserialize");
485
486 match decoded {
487 MessagePayload::LevelWiseResponse {
488 level,
489 nodes: decoded_nodes,
490 has_more_levels,
491 } => {
492 assert_eq!(level, 1);
493 assert_eq!(decoded_nodes.len(), 2);
494 assert!(!has_more_levels);
495 assert!(decoded_nodes[0].is_internal());
496 assert!(decoded_nodes[1].is_leaf());
497 assert_eq!(decoded_nodes[1].parent_id, Some([1u8; 32]));
498 }
499 _ => panic!("wrong variant"),
500 }
501 }
502
503 #[test]
504 fn test_message_payload_levelwise_response_empty() {
505 let response = MessagePayload::LevelWiseResponse {
506 level: 2,
507 nodes: vec![],
508 has_more_levels: false,
509 };
510
511 let encoded = borsh::to_vec(&response).expect("serialize");
512 let decoded: MessagePayload = borsh::from_slice(&encoded).expect("deserialize");
513
514 match decoded {
515 MessagePayload::LevelWiseResponse {
516 level,
517 nodes,
518 has_more_levels,
519 } => {
520 assert_eq!(level, 2);
521 assert!(nodes.is_empty());
522 assert!(!has_more_levels);
523 }
524 _ => panic!("wrong variant"),
525 }
526 }
527}