Skip to main content

hive_btle/
hive_lite_sync.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Integration between hive-btle and hive-lite.
17//!
18//! This module provides [`DocumentType`] implementations for hive-lite types,
19//! enabling them to sync through the hive-btle mesh using the extensible
20//! document registry.
21//!
22//! ## Usage
23//!
24//! Enable the `hive-lite-sync` feature in your `Cargo.toml`:
25//!
26//! ```toml
27//! [dependencies]
28//! hive-btle = { version = "0.1", features = ["hive-lite-sync"] }
29//! ```
30//!
31//! Then register the CannedMessage type with the mesh:
32//!
33//! ```ignore
34//! use hive_btle::{HiveMesh, DocumentRegistry};
35//! use hive_btle::hive_lite_sync::CannedMessageDocument;
36//!
37//! // Register the CannedMessage document type
38//! mesh.document_registry().register::<CannedMessageDocument>();
39//!
40//! // Store a canned message for sync
41//! let event = hive_lite::CannedMessageAckEvent::new(
42//!     hive_lite::CannedMessage::CheckIn,
43//!     hive_lite::NodeId::new(my_node_id),
44//!     None,
45//!     timestamp_ms,
46//! );
47//! mesh.store_document(CannedMessageDocument::from(event));
48//! ```
49
50use crate::registry::{AppOperation, DocumentType};
51use hive_lite::{CannedMessageAckEvent, NodeId as HiveLiteNodeId};
52
53/// App-layer type ID for CannedMessage documents.
54///
55/// Uses 0xC0, the first app-layer slot.
56pub const CANNED_MESSAGE_TYPE_ID: u8 = 0xC0;
57
58/// Delta operation codes for CannedMessage.
59pub mod op_code {
60    /// Full state update (used when delta not available).
61    pub const FULL_STATE: u8 = 0x00;
62    /// ACK update only (efficient delta for ACK additions).
63    pub const ACK_UPDATE: u8 = 0x01;
64}
65
66/// Wrapper around [`CannedMessageAckEvent`] that implements [`DocumentType`].
67///
68/// This enables CannedMessage events to sync through the hive-btle mesh
69/// using the extensible document registry.
70#[derive(Debug, Clone)]
71pub struct CannedMessageDocument {
72    inner: CannedMessageAckEvent,
73}
74
75impl CannedMessageDocument {
76    /// Create a new document from a CannedMessageAckEvent.
77    pub fn new(event: CannedMessageAckEvent) -> Self {
78        Self { inner: event }
79    }
80
81    /// Get a reference to the inner event.
82    pub fn inner(&self) -> &CannedMessageAckEvent {
83        &self.inner
84    }
85
86    /// Get a mutable reference to the inner event.
87    pub fn inner_mut(&mut self) -> &mut CannedMessageAckEvent {
88        &mut self.inner
89    }
90
91    /// Consume and return the inner event.
92    pub fn into_inner(self) -> CannedMessageAckEvent {
93        self.inner
94    }
95
96    /// Record an ACK from a node.
97    ///
98    /// Delegates to [`CannedMessageAckEvent::ack`].
99    pub fn ack(&mut self, node_id: u32, ack_timestamp: u64) -> bool {
100        self.inner.ack(HiveLiteNodeId::new(node_id), ack_timestamp)
101    }
102
103    /// Check if a node has acknowledged.
104    pub fn has_acked(&self, node_id: u32) -> bool {
105        self.inner.has_acked(HiveLiteNodeId::new(node_id))
106    }
107
108    /// Get the number of ACKs.
109    pub fn ack_count(&self) -> usize {
110        self.inner.ack_count()
111    }
112
113    /// Get the source node ID.
114    pub fn source_node(&self) -> u32 {
115        self.inner.source_node.as_u32()
116    }
117
118    /// Get the timestamp.
119    pub fn timestamp(&self) -> u64 {
120        self.inner.timestamp
121    }
122
123    /// Get the message code.
124    pub fn message_code(&self) -> u8 {
125        self.inner.message.as_u8()
126    }
127}
128
129impl From<CannedMessageAckEvent> for CannedMessageDocument {
130    fn from(event: CannedMessageAckEvent) -> Self {
131        Self::new(event)
132    }
133}
134
135impl From<CannedMessageDocument> for CannedMessageAckEvent {
136    fn from(doc: CannedMessageDocument) -> Self {
137        doc.into_inner()
138    }
139}
140
141impl DocumentType for CannedMessageDocument {
142    const TYPE_ID: u8 = CANNED_MESSAGE_TYPE_ID;
143    const TYPE_NAME: &'static str = "CannedMessage";
144
145    fn identity(&self) -> (u32, u64) {
146        (self.inner.source_node.as_u32(), self.inner.timestamp)
147    }
148
149    fn encode(&self) -> Vec<u8> {
150        // Convert from heapless::Vec to std::Vec, skipping the 0xAF marker
151        // since the document registry adds its own type header (0xC0)
152        let full = self.inner.encode();
153        if full.len() > 1 {
154            full[1..].to_vec()
155        } else {
156            Vec::new()
157        }
158    }
159
160    fn decode(data: &[u8]) -> Option<Self> {
161        // Prepend the 0xAF marker that hive-lite expects
162        // (it was stripped when we encoded for the registry)
163        let mut with_marker = Vec::with_capacity(1 + data.len());
164        with_marker.push(0xAF);
165        with_marker.extend_from_slice(data);
166        CannedMessageAckEvent::decode(&with_marker).map(Self::new)
167    }
168
169    fn merge(&mut self, other: &Self) -> bool {
170        self.inner.merge(&other.inner)
171    }
172
173    fn to_delta_op(&self) -> Option<AppOperation> {
174        // Send full document state for reliable sync.
175        // The delta encoder filters by (key, timestamp) - we encode ack_count
176        // in the upper bits so changes trigger re-sync.
177        let (source, doc_timestamp) = self.identity();
178
179        // Combine document timestamp with ack_count for versioning:
180        // - Lower 48 bits: original document timestamp
181        // - Upper 16 bits: ack_count (version indicator)
182        // This ensures the delta encoder re-sends when ACKs change.
183        let sync_timestamp =
184            (doc_timestamp & 0x0000_FFFF_FFFF_FFFF) | ((self.inner.ack_count() as u64) << 48);
185
186        Some(
187            AppOperation::new(Self::TYPE_ID, op_code::FULL_STATE, source, sync_timestamp)
188                .with_payload(self.encode()),
189        )
190    }
191
192    fn apply_delta_op(&mut self, op: &AppOperation) -> bool {
193        if op.type_id != Self::TYPE_ID {
194            return false;
195        }
196
197        match op.op_code {
198            op_code::ACK_UPDATE => {
199                // Parse ACK entries from payload
200                // Format: [num_acks: 2B] [entries: 12B each]
201                if op.payload.len() < 2 {
202                    return false;
203                }
204
205                let num_acks = u16::from_le_bytes([op.payload[0], op.payload[1]]) as usize;
206                let expected_len = 2 + num_acks * 12;
207                if op.payload.len() < expected_len {
208                    return false;
209                }
210
211                let mut changed = false;
212                let mut offset = 2;
213                for _ in 0..num_acks {
214                    let acker_node = u32::from_le_bytes([
215                        op.payload[offset],
216                        op.payload[offset + 1],
217                        op.payload[offset + 2],
218                        op.payload[offset + 3],
219                    ]);
220                    let ack_ts = u64::from_le_bytes([
221                        op.payload[offset + 4],
222                        op.payload[offset + 5],
223                        op.payload[offset + 6],
224                        op.payload[offset + 7],
225                        op.payload[offset + 8],
226                        op.payload[offset + 9],
227                        op.payload[offset + 10],
228                        op.payload[offset + 11],
229                    ]);
230                    offset += 12;
231
232                    if self.inner.ack(HiveLiteNodeId::new(acker_node), ack_ts) {
233                        changed = true;
234                    }
235                }
236
237                changed
238            }
239            op_code::FULL_STATE => {
240                // Full state replacement
241                if let Some(other) = Self::decode(&op.payload) {
242                    self.inner.merge(&other.inner)
243                } else {
244                    false
245                }
246            }
247            _ => false,
248        }
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255    use hive_lite::CannedMessage;
256
257    #[test]
258    fn test_document_identity() {
259        let event = CannedMessageAckEvent::new(
260            CannedMessage::CheckIn,
261            HiveLiteNodeId::new(0x12345678),
262            None,
263            1706234567000,
264        );
265        let doc = CannedMessageDocument::new(event);
266
267        assert_eq!(doc.identity(), (0x12345678, 1706234567000));
268    }
269
270    #[test]
271    fn test_document_encode_decode() {
272        let event = CannedMessageAckEvent::new(
273            CannedMessage::Emergency,
274            HiveLiteNodeId::new(0xAABBCCDD),
275            Some(HiveLiteNodeId::new(0x11223344)),
276            1000,
277        );
278        let doc = CannedMessageDocument::new(event);
279
280        let encoded = doc.encode();
281        let decoded = CannedMessageDocument::decode(&encoded).unwrap();
282
283        assert_eq!(decoded.identity(), doc.identity());
284        assert_eq!(decoded.message_code(), doc.message_code());
285    }
286
287    #[test]
288    fn test_document_merge() {
289        let source = HiveLiteNodeId::new(0x111);
290        let acker = HiveLiteNodeId::new(0x222);
291
292        let mut doc1 = CannedMessageDocument::new(CannedMessageAckEvent::new(
293            CannedMessage::Alert,
294            source,
295            None,
296            1000,
297        ));
298
299        let mut event2 = CannedMessageAckEvent::new(CannedMessage::Alert, source, None, 1000);
300        event2.ack(acker, 1500);
301        let doc2 = CannedMessageDocument::new(event2);
302
303        // Merge should add the ACK
304        assert!(doc1.merge(&doc2));
305        assert!(doc1.has_acked(acker.as_u32()));
306        assert_eq!(doc1.ack_count(), 2);
307
308        // Merging again should not change
309        assert!(!doc1.merge(&doc2));
310    }
311
312    #[test]
313    fn test_delta_op_encode_decode() {
314        let source = HiveLiteNodeId::new(0x12345678);
315        let acker1 = HiveLiteNodeId::new(0xAAAA);
316        let acker2 = HiveLiteNodeId::new(0xBBBB);
317
318        let mut event = CannedMessageAckEvent::new(CannedMessage::NeedSupport, source, None, 2000);
319        event.ack(acker1, 2100);
320        event.ack(acker2, 2200);
321
322        let doc = CannedMessageDocument::new(event);
323        let op = doc.to_delta_op().unwrap();
324
325        assert_eq!(op.type_id, CANNED_MESSAGE_TYPE_ID);
326        assert_eq!(op.op_code, op_code::FULL_STATE);
327        assert_eq!(op.source_node, 0x12345678);
328
329        // Timestamp encodes version (ack_count=3) in upper bits, doc timestamp in lower
330        // ack_count = 3 (source auto-acks + acker1 + acker2)
331        let expected_timestamp = 2000u64 | (3u64 << 48);
332        assert_eq!(op.timestamp, expected_timestamp);
333
334        // Extract original doc timestamp for document identity
335        let doc_timestamp = op.timestamp & 0x0000_FFFF_FFFF_FFFF;
336        assert_eq!(doc_timestamp, 2000);
337
338        // Verify we can apply the delta to a fresh event
339        let mut fresh = CannedMessageDocument::new(CannedMessageAckEvent::new(
340            CannedMessage::NeedSupport,
341            source,
342            None,
343            2000,
344        ));
345
346        // FULL_STATE merges the complete document state
347        assert!(fresh.apply_delta_op(&op));
348        assert!(fresh.has_acked(acker1.as_u32()));
349        assert!(fresh.has_acked(acker2.as_u32()));
350        assert_eq!(fresh.ack_count(), 3); // source + acker1 + acker2
351    }
352
353    #[test]
354    fn test_type_constants() {
355        assert_eq!(CannedMessageDocument::TYPE_ID, 0xC0);
356        assert_eq!(CannedMessageDocument::TYPE_NAME, "CannedMessage");
357    }
358
359    #[test]
360    fn test_from_conversions() {
361        let event = CannedMessageAckEvent::new(
362            CannedMessage::Moving,
363            HiveLiteNodeId::new(0x999),
364            None,
365            5000,
366        );
367
368        let doc: CannedMessageDocument = event.clone().into();
369        assert_eq!(doc.source_node(), 0x999);
370
371        let recovered: CannedMessageAckEvent = doc.into();
372        assert_eq!(recovered.source_node, HiveLiteNodeId::new(0x999));
373    }
374}