zamsync_network/protocol/
codec.rs1use super::frame;
2use std::io::{Read, Write};
3use zamsync_core::{SyncMessage, ZamError, ZamResult};
4
5pub fn encode(msg: &SyncMessage, writer: &mut impl Write) -> ZamResult<()> {
6 let bytes =
7 rkyv::to_bytes::<_, 1024>(msg).map_err(|e| ZamError::Serialization(e.to_string()))?;
8 frame::write_frame(writer, &bytes)
9}
10
11pub fn decode(reader: &mut impl Read) -> ZamResult<SyncMessage> {
12 let bytes = frame::read_frame(reader)?;
13 rkyv::from_bytes::<SyncMessage>(&bytes).map_err(|e| ZamError::Serialization(format!("{}", e)))
14}
15
16#[cfg(test)]
17mod tests {
18 use super::*;
19 use std::io::Cursor;
20 use zamsync_core::{NodeId, VersionVector};
21
22 #[test]
23 fn test_handshake_roundtrip() {
24 let msg = SyncMessage::Handshake {
25 node_id: NodeId(42),
26 vv: VersionVector::new(),
27 };
28
29 let mut buf = Vec::new();
30 encode(&msg, &mut buf).unwrap();
31
32 let mut cursor = Cursor::new(&buf);
33 let decoded = decode(&mut cursor).unwrap();
34
35 match decoded {
36 SyncMessage::Handshake { node_id, .. } => assert_eq!(node_id.0, 42),
37 _ => panic!("unexpected message type"),
38 }
39 }
40
41 #[test]
42 fn test_pull_request_roundtrip() {
43 use zamsync_core::SequenceNumber;
44 let msg = SyncMessage::PullRequest {
45 origin_node: NodeId(1),
46 start_seq: SequenceNumber(100),
47 limit: 50,
48 };
49
50 let mut buf = Vec::new();
51 encode(&msg, &mut buf).unwrap();
52
53 let mut cursor = Cursor::new(&buf);
54 let decoded = decode(&mut cursor).unwrap();
55
56 match decoded {
57 SyncMessage::PullRequest {
58 origin_node,
59 start_seq,
60 limit,
61 } => {
62 assert_eq!(origin_node.0, 1);
63 assert_eq!(start_seq.0, 100);
64 assert_eq!(limit, 50);
65 }
66 _ => panic!("unexpected message type"),
67 }
68 }
69
70 #[test]
71 fn test_event_batch_roundtrip() {
72 use zamsync_core::{Event, Hlc, SequenceNumber};
73 let event = Event {
74 origin_node: NodeId(3),
75 seq: SequenceNumber(7),
76 hlc: Hlc::new(9999, 0),
77 event_type: 2,
78 payload: b"payload".to_vec(),
79 };
80 let msg = SyncMessage::EventBatch {
81 origin_node: NodeId(3),
82 events: vec![event],
83 };
84
85 let mut buf = Vec::new();
86 encode(&msg, &mut buf).unwrap();
87
88 let mut cursor = Cursor::new(&buf);
89 let decoded = decode(&mut cursor).unwrap();
90
91 match decoded {
92 SyncMessage::EventBatch {
93 origin_node,
94 events,
95 } => {
96 assert_eq!(origin_node.0, 3);
97 assert_eq!(events.len(), 1);
98 assert_eq!(events[0].payload, b"payload");
99 }
100 _ => panic!("unexpected message type"),
101 }
102 }
103}