nodedb_types/sync/wire/
frame.rs1#[derive(Debug, Clone, Copy, PartialEq, Eq)]
5#[repr(u8)]
6pub enum SyncMessageType {
7 Handshake = 0x01,
8 HandshakeAck = 0x02,
9 DeltaPush = 0x10,
10 DeltaAck = 0x11,
11 DeltaReject = 0x12,
12 CollectionPurged = 0x14,
18 ShapeSubscribe = 0x20,
19 ShapeSnapshot = 0x21,
20 ShapeDelta = 0x22,
21 ShapeUnsubscribe = 0x23,
22 VectorClockSync = 0x30,
23 TimeseriesPush = 0x40,
25 TimeseriesAck = 0x41,
27 ResyncRequest = 0x50,
30 Throttle = 0x52,
33 TokenRefresh = 0x60,
35 TokenRefreshAck = 0x61,
37 DefinitionSync = 0x70,
40 PresenceUpdate = 0x80,
42 PresenceBroadcast = 0x81,
44 PresenceLeave = 0x82,
46 PingPong = 0xFF,
47}
48
49impl SyncMessageType {
50 pub fn from_u8(v: u8) -> Option<Self> {
51 match v {
52 0x01 => Some(Self::Handshake),
53 0x02 => Some(Self::HandshakeAck),
54 0x10 => Some(Self::DeltaPush),
55 0x11 => Some(Self::DeltaAck),
56 0x12 => Some(Self::DeltaReject),
57 0x14 => Some(Self::CollectionPurged),
58 0x20 => Some(Self::ShapeSubscribe),
59 0x21 => Some(Self::ShapeSnapshot),
60 0x22 => Some(Self::ShapeDelta),
61 0x23 => Some(Self::ShapeUnsubscribe),
62 0x30 => Some(Self::VectorClockSync),
63 0x40 => Some(Self::TimeseriesPush),
64 0x41 => Some(Self::TimeseriesAck),
65 0x50 => Some(Self::ResyncRequest),
66 0x52 => Some(Self::Throttle),
67 0x60 => Some(Self::TokenRefresh),
68 0x61 => Some(Self::TokenRefreshAck),
69 0x70 => Some(Self::DefinitionSync),
70 0x80 => Some(Self::PresenceUpdate),
71 0x81 => Some(Self::PresenceBroadcast),
72 0x82 => Some(Self::PresenceLeave),
73 0xFF => Some(Self::PingPong),
74 _ => None,
75 }
76 }
77}
78
79#[derive(Clone)]
84pub struct SyncFrame {
85 pub msg_type: SyncMessageType,
86 pub body: Vec<u8>,
87}
88
89impl SyncFrame {
90 pub const HEADER_SIZE: usize = 5;
91
92 pub fn to_bytes(&self) -> Vec<u8> {
94 let len = self.body.len() as u32;
95 let mut buf = Vec::with_capacity(Self::HEADER_SIZE + self.body.len());
96 buf.push(self.msg_type as u8);
97 buf.extend_from_slice(&len.to_le_bytes());
98 buf.extend_from_slice(&self.body);
99 buf
100 }
101
102 pub fn from_bytes(data: &[u8]) -> Option<Self> {
106 if data.len() < Self::HEADER_SIZE {
107 return None;
108 }
109 let msg_type = SyncMessageType::from_u8(data[0])?;
110 let len = u32::from_le_bytes(data[1..5].try_into().ok()?) as usize;
111 if data.len() < Self::HEADER_SIZE + len {
112 return None;
113 }
114 let body = data[Self::HEADER_SIZE..Self::HEADER_SIZE + len].to_vec();
115 Some(Self { msg_type, body })
116 }
117
118 pub fn new_msgpack<T: zerompk::ToMessagePack>(
120 msg_type: SyncMessageType,
121 value: &T,
122 ) -> Option<Self> {
123 let body = zerompk::to_msgpack_vec(value).ok()?;
124 Some(Self { msg_type, body })
125 }
126
127 pub fn encode_or_empty<T: zerompk::ToMessagePack>(
130 msg_type: SyncMessageType,
131 value: &T,
132 ) -> Self {
133 Self::new_msgpack(msg_type, value).unwrap_or(Self {
134 msg_type,
135 body: Vec::new(),
136 })
137 }
138
139 pub fn decode_body<T: zerompk::FromMessagePackOwned>(&self) -> Option<T> {
141 zerompk::from_msgpack(&self.body).ok()
142 }
143}