Skip to main content

zamsync_network/protocol/
codec.rs

1use super::frame;
2use std::io::{Read, Write};
3use zamsync_core::{SyncMessage, ZamError, ZamResult};
4
5pub fn encode(msg: &SyncMessage, writer: &mut impl Write) -> ZamResult<()> {
6    let bytes =
7        rkyv::to_bytes::<_, 1024>(msg).map_err(|e| ZamError::Serialization(e.to_string()))?;
8    frame::write_frame(writer, &bytes)
9}
10
11pub fn decode(reader: &mut impl Read) -> ZamResult<SyncMessage> {
12    let bytes = frame::read_frame(reader)?;
13    rkyv::from_bytes::<SyncMessage>(&bytes).map_err(|e| ZamError::Serialization(format!("{}", e)))
14}
15
16#[cfg(test)]
17mod tests {
18    use super::*;
19    use std::io::Cursor;
20    use zamsync_core::{NodeId, VersionVector};
21
22    #[test]
23    fn test_handshake_roundtrip() {
24        let msg = SyncMessage::Handshake {
25            node_id: NodeId(42),
26            vv: VersionVector::new(),
27        };
28
29        let mut buf = Vec::new();
30        encode(&msg, &mut buf).unwrap();
31
32        let mut cursor = Cursor::new(&buf);
33        let decoded = decode(&mut cursor).unwrap();
34
35        match decoded {
36            SyncMessage::Handshake { node_id, .. } => assert_eq!(node_id.0, 42),
37            _ => panic!("unexpected message type"),
38        }
39    }
40
41    #[test]
42    fn test_pull_request_roundtrip() {
43        use zamsync_core::SequenceNumber;
44        let msg = SyncMessage::PullRequest {
45            origin_node: NodeId(1),
46            start_seq: SequenceNumber(100),
47            limit: 50,
48        };
49
50        let mut buf = Vec::new();
51        encode(&msg, &mut buf).unwrap();
52
53        let mut cursor = Cursor::new(&buf);
54        let decoded = decode(&mut cursor).unwrap();
55
56        match decoded {
57            SyncMessage::PullRequest {
58                origin_node,
59                start_seq,
60                limit,
61            } => {
62                assert_eq!(origin_node.0, 1);
63                assert_eq!(start_seq.0, 100);
64                assert_eq!(limit, 50);
65            }
66            _ => panic!("unexpected message type"),
67        }
68    }
69
70    #[test]
71    fn test_event_batch_roundtrip() {
72        use zamsync_core::{Event, Hlc, SequenceNumber};
73        let event = Event {
74            origin_node: NodeId(3),
75            seq: SequenceNumber(7),
76            hlc: Hlc::new(9999, 0),
77            event_type: 2,
78            payload: b"payload".to_vec(),
79        };
80        let msg = SyncMessage::EventBatch {
81            origin_node: NodeId(3),
82            events: vec![event],
83        };
84
85        let mut buf = Vec::new();
86        encode(&msg, &mut buf).unwrap();
87
88        let mut cursor = Cursor::new(&buf);
89        let decoded = decode(&mut cursor).unwrap();
90
91        match decoded {
92            SyncMessage::EventBatch {
93                origin_node,
94                events,
95            } => {
96                assert_eq!(origin_node.0, 3);
97                assert_eq!(events.len(), 1);
98                assert_eq!(events[0].payload, b"payload");
99            }
100            _ => panic!("unexpected message type"),
101        }
102    }
103}