hive_btle/
hive_lite_sync.rs1use crate::registry::{AppOperation, DocumentType};
51use hive_lite::{CannedMessageAckEvent, NodeId as HiveLiteNodeId};
52
53pub const CANNED_MESSAGE_TYPE_ID: u8 = 0xC0;
57
58pub mod op_code {
60 pub const FULL_STATE: u8 = 0x00;
62 pub const ACK_UPDATE: u8 = 0x01;
64}
65
66#[derive(Debug, Clone)]
71pub struct CannedMessageDocument {
72 inner: CannedMessageAckEvent,
73}
74
75impl CannedMessageDocument {
76 pub fn new(event: CannedMessageAckEvent) -> Self {
78 Self { inner: event }
79 }
80
81 pub fn inner(&self) -> &CannedMessageAckEvent {
83 &self.inner
84 }
85
86 pub fn inner_mut(&mut self) -> &mut CannedMessageAckEvent {
88 &mut self.inner
89 }
90
91 pub fn into_inner(self) -> CannedMessageAckEvent {
93 self.inner
94 }
95
96 pub fn ack(&mut self, node_id: u32, ack_timestamp: u64) -> bool {
100 self.inner.ack(HiveLiteNodeId::new(node_id), ack_timestamp)
101 }
102
103 pub fn has_acked(&self, node_id: u32) -> bool {
105 self.inner.has_acked(HiveLiteNodeId::new(node_id))
106 }
107
108 pub fn ack_count(&self) -> usize {
110 self.inner.ack_count()
111 }
112
113 pub fn source_node(&self) -> u32 {
115 self.inner.source_node.as_u32()
116 }
117
118 pub fn timestamp(&self) -> u64 {
120 self.inner.timestamp
121 }
122
123 pub fn message_code(&self) -> u8 {
125 self.inner.message.as_u8()
126 }
127}
128
129impl From<CannedMessageAckEvent> for CannedMessageDocument {
130 fn from(event: CannedMessageAckEvent) -> Self {
131 Self::new(event)
132 }
133}
134
135impl From<CannedMessageDocument> for CannedMessageAckEvent {
136 fn from(doc: CannedMessageDocument) -> Self {
137 doc.into_inner()
138 }
139}
140
141impl DocumentType for CannedMessageDocument {
142 const TYPE_ID: u8 = CANNED_MESSAGE_TYPE_ID;
143 const TYPE_NAME: &'static str = "CannedMessage";
144
145 fn identity(&self) -> (u32, u64) {
146 (self.inner.source_node.as_u32(), self.inner.timestamp)
147 }
148
149 fn encode(&self) -> Vec<u8> {
150 let full = self.inner.encode();
153 if full.len() > 1 {
154 full[1..].to_vec()
155 } else {
156 Vec::new()
157 }
158 }
159
160 fn decode(data: &[u8]) -> Option<Self> {
161 let mut with_marker = Vec::with_capacity(1 + data.len());
164 with_marker.push(0xAF);
165 with_marker.extend_from_slice(data);
166 CannedMessageAckEvent::decode(&with_marker).map(Self::new)
167 }
168
169 fn merge(&mut self, other: &Self) -> bool {
170 self.inner.merge(&other.inner)
171 }
172
173 fn to_delta_op(&self) -> Option<AppOperation> {
174 let (source, doc_timestamp) = self.identity();
178
179 let sync_timestamp =
184 (doc_timestamp & 0x0000_FFFF_FFFF_FFFF) | ((self.inner.ack_count() as u64) << 48);
185
186 Some(
187 AppOperation::new(Self::TYPE_ID, op_code::FULL_STATE, source, sync_timestamp)
188 .with_payload(self.encode()),
189 )
190 }
191
192 fn apply_delta_op(&mut self, op: &AppOperation) -> bool {
193 if op.type_id != Self::TYPE_ID {
194 return false;
195 }
196
197 match op.op_code {
198 op_code::ACK_UPDATE => {
199 if op.payload.len() < 2 {
202 return false;
203 }
204
205 let num_acks = u16::from_le_bytes([op.payload[0], op.payload[1]]) as usize;
206 let expected_len = 2 + num_acks * 12;
207 if op.payload.len() < expected_len {
208 return false;
209 }
210
211 let mut changed = false;
212 let mut offset = 2;
213 for _ in 0..num_acks {
214 let acker_node = u32::from_le_bytes([
215 op.payload[offset],
216 op.payload[offset + 1],
217 op.payload[offset + 2],
218 op.payload[offset + 3],
219 ]);
220 let ack_ts = u64::from_le_bytes([
221 op.payload[offset + 4],
222 op.payload[offset + 5],
223 op.payload[offset + 6],
224 op.payload[offset + 7],
225 op.payload[offset + 8],
226 op.payload[offset + 9],
227 op.payload[offset + 10],
228 op.payload[offset + 11],
229 ]);
230 offset += 12;
231
232 if self.inner.ack(HiveLiteNodeId::new(acker_node), ack_ts) {
233 changed = true;
234 }
235 }
236
237 changed
238 }
239 op_code::FULL_STATE => {
240 if let Some(other) = Self::decode(&op.payload) {
242 self.inner.merge(&other.inner)
243 } else {
244 false
245 }
246 }
247 _ => false,
248 }
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255 use hive_lite::CannedMessage;
256
257 #[test]
258 fn test_document_identity() {
259 let event = CannedMessageAckEvent::new(
260 CannedMessage::CheckIn,
261 HiveLiteNodeId::new(0x12345678),
262 None,
263 1706234567000,
264 );
265 let doc = CannedMessageDocument::new(event);
266
267 assert_eq!(doc.identity(), (0x12345678, 1706234567000));
268 }
269
270 #[test]
271 fn test_document_encode_decode() {
272 let event = CannedMessageAckEvent::new(
273 CannedMessage::Emergency,
274 HiveLiteNodeId::new(0xAABBCCDD),
275 Some(HiveLiteNodeId::new(0x11223344)),
276 1000,
277 );
278 let doc = CannedMessageDocument::new(event);
279
280 let encoded = doc.encode();
281 let decoded = CannedMessageDocument::decode(&encoded).unwrap();
282
283 assert_eq!(decoded.identity(), doc.identity());
284 assert_eq!(decoded.message_code(), doc.message_code());
285 }
286
287 #[test]
288 fn test_document_merge() {
289 let source = HiveLiteNodeId::new(0x111);
290 let acker = HiveLiteNodeId::new(0x222);
291
292 let mut doc1 = CannedMessageDocument::new(CannedMessageAckEvent::new(
293 CannedMessage::Alert,
294 source,
295 None,
296 1000,
297 ));
298
299 let mut event2 = CannedMessageAckEvent::new(CannedMessage::Alert, source, None, 1000);
300 event2.ack(acker, 1500);
301 let doc2 = CannedMessageDocument::new(event2);
302
303 assert!(doc1.merge(&doc2));
305 assert!(doc1.has_acked(acker.as_u32()));
306 assert_eq!(doc1.ack_count(), 2);
307
308 assert!(!doc1.merge(&doc2));
310 }
311
312 #[test]
313 fn test_delta_op_encode_decode() {
314 let source = HiveLiteNodeId::new(0x12345678);
315 let acker1 = HiveLiteNodeId::new(0xAAAA);
316 let acker2 = HiveLiteNodeId::new(0xBBBB);
317
318 let mut event = CannedMessageAckEvent::new(CannedMessage::NeedSupport, source, None, 2000);
319 event.ack(acker1, 2100);
320 event.ack(acker2, 2200);
321
322 let doc = CannedMessageDocument::new(event);
323 let op = doc.to_delta_op().unwrap();
324
325 assert_eq!(op.type_id, CANNED_MESSAGE_TYPE_ID);
326 assert_eq!(op.op_code, op_code::FULL_STATE);
327 assert_eq!(op.source_node, 0x12345678);
328
329 let expected_timestamp = 2000u64 | (3u64 << 48);
332 assert_eq!(op.timestamp, expected_timestamp);
333
334 let doc_timestamp = op.timestamp & 0x0000_FFFF_FFFF_FFFF;
336 assert_eq!(doc_timestamp, 2000);
337
338 let mut fresh = CannedMessageDocument::new(CannedMessageAckEvent::new(
340 CannedMessage::NeedSupport,
341 source,
342 None,
343 2000,
344 ));
345
346 assert!(fresh.apply_delta_op(&op));
348 assert!(fresh.has_acked(acker1.as_u32()));
349 assert!(fresh.has_acked(acker2.as_u32()));
350 assert_eq!(fresh.ack_count(), 3); }
352
353 #[test]
354 fn test_type_constants() {
355 assert_eq!(CannedMessageDocument::TYPE_ID, 0xC0);
356 assert_eq!(CannedMessageDocument::TYPE_NAME, "CannedMessage");
357 }
358
359 #[test]
360 fn test_from_conversions() {
361 let event = CannedMessageAckEvent::new(
362 CannedMessage::Moving,
363 HiveLiteNodeId::new(0x999),
364 None,
365 5000,
366 );
367
368 let doc: CannedMessageDocument = event.clone().into();
369 assert_eq!(doc.source_node(), 0x999);
370
371 let recovered: CannedMessageAckEvent = doc.into();
372 assert_eq!(recovered.source_node, HiveLiteNodeId::new(0x999));
373 }
374}