nodedb_types/sync/wire/
frame.rs1#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7#[repr(u8)]
8#[non_exhaustive]
9pub enum SyncMessageType {
10 Handshake = 0x01,
11 HandshakeAck = 0x02,
12 DeltaPush = 0x10,
13 DeltaAck = 0x11,
14 DeltaReject = 0x12,
15 CollectionPurged = 0x14,
21 ShapeSubscribe = 0x20,
22 ShapeSnapshot = 0x21,
23 ShapeDelta = 0x22,
24 ShapeUnsubscribe = 0x23,
25 VectorClockSync = 0x30,
26 TimeseriesPush = 0x40,
28 TimeseriesAck = 0x41,
30 ResyncRequest = 0x50,
33 Throttle = 0x52,
36 TokenRefresh = 0x60,
38 TokenRefreshAck = 0x61,
40 DefinitionSync = 0x70,
43 PresenceUpdate = 0x80,
45 PresenceBroadcast = 0x81,
47 PresenceLeave = 0x82,
49 ArrayDelta = 0x90,
51 ArrayDeltaBatch = 0x91,
53 ArraySnapshot = 0x92,
55 ArraySnapshotChunk = 0x93,
57 ArraySchema = 0x94,
59 ArrayAck = 0x95,
61 ArrayReject = 0x96,
63 ArrayCatchupRequest = 0x97,
65 PingPong = 0xFF,
66}
67
68impl SyncMessageType {
69 pub fn from_u8(v: u8) -> Option<Self> {
70 match v {
71 0x01 => Some(Self::Handshake),
72 0x02 => Some(Self::HandshakeAck),
73 0x10 => Some(Self::DeltaPush),
74 0x11 => Some(Self::DeltaAck),
75 0x12 => Some(Self::DeltaReject),
76 0x14 => Some(Self::CollectionPurged),
77 0x20 => Some(Self::ShapeSubscribe),
78 0x21 => Some(Self::ShapeSnapshot),
79 0x22 => Some(Self::ShapeDelta),
80 0x23 => Some(Self::ShapeUnsubscribe),
81 0x30 => Some(Self::VectorClockSync),
82 0x40 => Some(Self::TimeseriesPush),
83 0x41 => Some(Self::TimeseriesAck),
84 0x50 => Some(Self::ResyncRequest),
85 0x52 => Some(Self::Throttle),
86 0x60 => Some(Self::TokenRefresh),
87 0x61 => Some(Self::TokenRefreshAck),
88 0x70 => Some(Self::DefinitionSync),
89 0x80 => Some(Self::PresenceUpdate),
90 0x81 => Some(Self::PresenceBroadcast),
91 0x82 => Some(Self::PresenceLeave),
92 0x90 => Some(Self::ArrayDelta),
93 0x91 => Some(Self::ArrayDeltaBatch),
94 0x92 => Some(Self::ArraySnapshot),
95 0x93 => Some(Self::ArraySnapshotChunk),
96 0x94 => Some(Self::ArraySchema),
97 0x95 => Some(Self::ArrayAck),
98 0x96 => Some(Self::ArrayReject),
99 0x97 => Some(Self::ArrayCatchupRequest),
100 0xFF => Some(Self::PingPong),
101 _ => None,
102 }
103 }
104}
105
106#[non_exhaustive]
114#[derive(Clone)]
115pub struct SyncFrame {
116 pub msg_type: SyncMessageType,
117 pub body: Vec<u8>,
118}
119
120impl SyncFrame {
121 pub const HEADER_SIZE: usize = 5;
122
123 pub fn to_bytes(&self) -> Vec<u8> {
125 let len = self.body.len() as u32;
126 let mut buf = Vec::with_capacity(Self::HEADER_SIZE + self.body.len());
127 buf.push(self.msg_type as u8);
128 buf.extend_from_slice(&len.to_le_bytes());
129 buf.extend_from_slice(&self.body);
130 buf
131 }
132
133 pub fn from_bytes(data: &[u8]) -> Option<Self> {
137 if data.len() < Self::HEADER_SIZE {
138 return None;
139 }
140 let msg_type = SyncMessageType::from_u8(data[0])?;
141 let len = u32::from_le_bytes(data[1..5].try_into().ok()?) as usize;
142 if data.len() < Self::HEADER_SIZE + len {
143 return None;
144 }
145 let body = data[Self::HEADER_SIZE..Self::HEADER_SIZE + len].to_vec();
146 Some(Self { msg_type, body })
147 }
148
149 pub fn new_msgpack<T: zerompk::ToMessagePack>(
151 msg_type: SyncMessageType,
152 value: &T,
153 ) -> Option<Self> {
154 let body = zerompk::to_msgpack_vec(value).ok()?;
155 Some(Self { msg_type, body })
156 }
157
158 pub fn try_encode<T: zerompk::ToMessagePack>(
164 msg_type: SyncMessageType,
165 value: &T,
166 ) -> Option<Self> {
167 match zerompk::to_msgpack_vec(value) {
168 Ok(body) => Some(Self { msg_type, body }),
169 Err(e) => {
170 tracing::error!(
171 msg_type = msg_type as u8,
172 error = %e,
173 "failed to encode sync frame body; dropping response"
174 );
175 None
176 }
177 }
178 }
179
180 pub fn decode_body<T: zerompk::FromMessagePackOwned>(&self) -> Option<T> {
182 zerompk::from_msgpack(&self.body).ok()
183 }
184}