hive_btle/
document_sync.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Document synchronization for HIVE BLE mesh
17//!
18//! This module provides centralized document state management for HIVE-Lite nodes.
19//! It manages the local CRDT state (GCounter) and handles merging with received documents.
20//!
21//! ## Design Notes
22//!
23//! This implementation uses a simple GCounter for resource-constrained devices (ESP32,
24//! smartwatches). For full HIVE nodes using AutomergeIroh, this component can be replaced
25//! or extended - the observer pattern and BLE transport layer are independent of the
26//! document format.
27//!
28//! ## Usage
29//!
30//! ```ignore
31//! use hive_btle::document_sync::DocumentSync;
32//! use hive_btle::NodeId;
33//!
34//! let sync = DocumentSync::new(NodeId::new(0x12345678), "SOLDIER-1");
35//!
36//! // Trigger an emergency
37//! let doc_bytes = sync.send_emergency();
38//! // ... broadcast doc_bytes over BLE
39//!
40//! // Handle received document
41//! if let Some(result) = sync.merge_document(&received_data) {
42//!     if result.is_emergency() {
43//!         println!("EMERGENCY from {:08X}", result.source_node.as_u32());
44//!     }
45//! }
46//! ```
47
48#[cfg(not(feature = "std"))]
49use alloc::{string::String, vec::Vec};
50#[cfg(feature = "std")]
51use std::sync::RwLock;
52
53#[cfg(not(feature = "std"))]
54use spin::RwLock;
55
56use core::sync::atomic::{AtomicU32, Ordering};
57
58use crate::document::{HiveDocument, MergeResult};
59use crate::sync::crdt::{EmergencyEvent, EventType, GCounter, Peripheral, PeripheralType};
60use crate::NodeId;
61
62/// Document synchronization manager for HIVE-Lite nodes
63///
64/// Manages the local CRDT state and handles document serialization/merging.
65/// Thread-safe for use from multiple BLE callbacks.
66///
67/// ## Integration with Full HIVE
68///
69/// This implementation uses a simple GCounter suitable for embedded devices.
70/// For integration with the larger HIVE project using AutomergeIroh:
71/// - The `build_document()` output can be wrapped in an Automerge-compatible format
72/// - The observer events (Emergency, Ack, DocumentSynced) work with any backend
73/// - The BLE transport layer is document-format agnostic
74pub struct DocumentSync {
75    /// Our node ID
76    node_id: NodeId,
77
78    /// CRDT G-Counter for mesh activity tracking
79    counter: RwLock<GCounter>,
80
81    /// Peripheral data (callsign, type, location)
82    peripheral: RwLock<Peripheral>,
83
84    /// Active emergency event with ACK tracking (CRDT)
85    emergency: RwLock<Option<EmergencyEvent>>,
86
87    /// Document version (monotonically increasing)
88    version: AtomicU32,
89}
90
91impl DocumentSync {
92    /// Create a new document sync manager
93    pub fn new(node_id: NodeId, callsign: &str) -> Self {
94        let peripheral = Peripheral::new(node_id.as_u32(), PeripheralType::SoldierSensor)
95            .with_callsign(callsign);
96
97        Self {
98            node_id,
99            counter: RwLock::new(GCounter::new()),
100            peripheral: RwLock::new(peripheral),
101            emergency: RwLock::new(None),
102            version: AtomicU32::new(1),
103        }
104    }
105
106    /// Create with a specific peripheral type
107    pub fn with_peripheral_type(node_id: NodeId, callsign: &str, ptype: PeripheralType) -> Self {
108        let peripheral = Peripheral::new(node_id.as_u32(), ptype).with_callsign(callsign);
109
110        Self {
111            node_id,
112            counter: RwLock::new(GCounter::new()),
113            peripheral: RwLock::new(peripheral),
114            emergency: RwLock::new(None),
115            version: AtomicU32::new(1),
116        }
117    }
118
119    /// Get our node ID
120    pub fn node_id(&self) -> NodeId {
121        self.node_id
122    }
123
124    /// Get the current document version
125    pub fn version(&self) -> u32 {
126        self.version.load(Ordering::Relaxed)
127    }
128
129    /// Get the total counter value
130    pub fn total_count(&self) -> u64 {
131        self.counter.read().unwrap().value()
132    }
133
134    /// Get our counter contribution
135    pub fn local_count(&self) -> u64 {
136        self.counter.read().unwrap().node_count(&self.node_id)
137    }
138
139    /// Get current event type (if any)
140    pub fn current_event(&self) -> Option<EventType> {
141        self.peripheral
142            .read()
143            .unwrap()
144            .last_event
145            .as_ref()
146            .map(|e| e.event_type)
147    }
148
149    /// Check if we're in emergency state
150    pub fn is_emergency_active(&self) -> bool {
151        self.current_event() == Some(EventType::Emergency)
152    }
153
154    /// Check if we've sent an ACK
155    pub fn is_ack_active(&self) -> bool {
156        self.current_event() == Some(EventType::Ack)
157    }
158
159    /// Get the callsign
160    pub fn callsign(&self) -> String {
161        self.peripheral.read().unwrap().callsign_str().to_string()
162    }
163
164    // ==================== State Mutations ====================
165
166    /// Send an emergency - returns the document bytes to broadcast
167    pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
168        // Set emergency event
169        {
170            let mut peripheral = self.peripheral.write().unwrap();
171            peripheral.set_event(EventType::Emergency, timestamp);
172        }
173
174        // Increment counter
175        self.increment_counter_internal();
176
177        // Build and return document
178        self.build_document()
179    }
180
181    /// Send an ACK - returns the document bytes to broadcast
182    pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
183        // Set ACK event
184        {
185            let mut peripheral = self.peripheral.write().unwrap();
186            peripheral.set_event(EventType::Ack, timestamp);
187        }
188
189        // Increment counter
190        self.increment_counter_internal();
191
192        // Build and return document
193        self.build_document()
194    }
195
196    /// Clear the current event
197    pub fn clear_event(&self) {
198        let mut peripheral = self.peripheral.write().unwrap();
199        peripheral.clear_event();
200        self.bump_version();
201    }
202
203    /// Increment the counter (for periodic sync)
204    pub fn increment_counter(&self) {
205        self.increment_counter_internal();
206    }
207
208    /// Update health status (battery percentage)
209    pub fn update_health(&self, battery_percent: u8) {
210        let mut peripheral = self.peripheral.write().unwrap();
211        peripheral.health.battery_percent = battery_percent;
212        self.bump_version();
213    }
214
215    /// Update activity level (0=still, 1=walking, 2=running, 3=fall)
216    pub fn update_activity(&self, activity: u8) {
217        let mut peripheral = self.peripheral.write().unwrap();
218        peripheral.health.activity = activity;
219        self.bump_version();
220    }
221
222    /// Update full health status (battery and activity)
223    pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
224        let mut peripheral = self.peripheral.write().unwrap();
225        peripheral.health.battery_percent = battery_percent;
226        peripheral.health.activity = activity;
227        self.bump_version();
228    }
229
230    // ==================== Emergency Management ====================
231
232    /// Start a new emergency event
233    ///
234    /// Creates an emergency event that tracks ACKs from all known peers.
235    /// Returns the document bytes to broadcast.
236    pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
237        // Create emergency event with our node as source
238        {
239            let mut emergency = self.emergency.write().unwrap();
240            *emergency = Some(EmergencyEvent::new(
241                self.node_id.as_u32(),
242                timestamp,
243                known_peers,
244            ));
245        }
246
247        // Also set peripheral event for backward compatibility
248        {
249            let mut peripheral = self.peripheral.write().unwrap();
250            peripheral.set_event(EventType::Emergency, timestamp);
251        }
252
253        self.increment_counter_internal();
254        self.build_document()
255    }
256
257    /// Record our ACK for the current emergency
258    ///
259    /// Returns the document bytes to broadcast, or None if no emergency is active.
260    pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
261        let changed = {
262            let mut emergency = self.emergency.write().unwrap();
263            if let Some(ref mut e) = *emergency {
264                e.ack(self.node_id.as_u32())
265            } else {
266                return None;
267            }
268        };
269
270        if changed {
271            // Also set peripheral event for backward compatibility
272            {
273                let mut peripheral = self.peripheral.write().unwrap();
274                peripheral.set_event(EventType::Ack, timestamp);
275            }
276
277            self.increment_counter_internal();
278        }
279
280        Some(self.build_document())
281    }
282
283    /// Clear the current emergency event
284    pub fn clear_emergency(&self) {
285        let mut emergency = self.emergency.write().unwrap();
286        if emergency.is_some() {
287            *emergency = None;
288            drop(emergency);
289
290            // Also clear peripheral event
291            let mut peripheral = self.peripheral.write().unwrap();
292            peripheral.clear_event();
293
294            self.bump_version();
295        }
296    }
297
298    /// Check if there's an active emergency
299    pub fn has_active_emergency(&self) -> bool {
300        self.emergency.read().unwrap().is_some()
301    }
302
303    /// Get emergency status info
304    ///
305    /// Returns (source_node, timestamp, acked_count, pending_count) if emergency is active.
306    pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
307        let emergency = self.emergency.read().unwrap();
308        emergency.as_ref().map(|e| {
309            (
310                e.source_node(),
311                e.timestamp(),
312                e.ack_count(),
313                e.pending_nodes().len(),
314            )
315        })
316    }
317
318    /// Check if a specific peer has ACKed the current emergency
319    pub fn has_peer_acked(&self, peer_id: u32) -> bool {
320        let emergency = self.emergency.read().unwrap();
321        emergency
322            .as_ref()
323            .map(|e| e.has_acked(peer_id))
324            .unwrap_or(false)
325    }
326
327    /// Check if all peers have ACKed the current emergency
328    pub fn all_peers_acked(&self) -> bool {
329        let emergency = self.emergency.read().unwrap();
330        emergency.as_ref().map(|e| e.all_acked()).unwrap_or(true)
331    }
332
333    // ==================== Document I/O ====================
334
335    /// Build the document for transmission
336    ///
337    /// Returns the encoded bytes ready for BLE GATT write.
338    pub fn build_document(&self) -> Vec<u8> {
339        let counter = self.counter.read().unwrap().clone();
340        let peripheral = self.peripheral.read().unwrap().clone();
341        let emergency = self.emergency.read().unwrap().clone();
342
343        let doc = HiveDocument {
344            version: self.version.load(Ordering::Relaxed),
345            node_id: self.node_id,
346            counter,
347            peripheral: Some(peripheral),
348            emergency,
349        };
350
351        doc.encode()
352    }
353
354    /// Merge a received document
355    ///
356    /// Returns `Some(MergeResult)` if the document was valid, `None` otherwise.
357    /// The result contains information about what changed and any events.
358    pub fn merge_document(&self, data: &[u8]) -> Option<MergeResult> {
359        let received = HiveDocument::decode(data)?;
360
361        // Don't process our own documents
362        if received.node_id == self.node_id {
363            return None;
364        }
365
366        // Merge the counter
367        let counter_changed = {
368            let mut counter = self.counter.write().unwrap();
369            let old_value = counter.value();
370            counter.merge(&received.counter);
371            counter.value() != old_value
372        };
373
374        // Merge emergency event (CRDT merge)
375        let emergency_changed = if let Some(ref received_emergency) = received.emergency {
376            let mut emergency = self.emergency.write().unwrap();
377            match &mut *emergency {
378                Some(ref mut our_emergency) => our_emergency.merge(received_emergency),
379                None => {
380                    *emergency = Some(received_emergency.clone());
381                    true
382                }
383            }
384        } else {
385            false
386        };
387
388        if counter_changed || emergency_changed {
389            self.bump_version();
390        }
391
392        // Extract event from received document
393        let event = received
394            .peripheral
395            .as_ref()
396            .and_then(|p| p.last_event.clone());
397
398        Some(MergeResult {
399            source_node: received.node_id,
400            event,
401            counter_changed,
402            emergency_changed,
403            total_count: self.total_count(),
404        })
405    }
406
407    /// Create a document from raw bytes (for inspection without merging)
408    pub fn decode_document(data: &[u8]) -> Option<HiveDocument> {
409        HiveDocument::decode(data)
410    }
411
412    // ==================== Internal Helpers ====================
413
414    fn increment_counter_internal(&self) {
415        let mut counter = self.counter.write().unwrap();
416        counter.increment(&self.node_id, 1);
417        drop(counter);
418        self.bump_version();
419    }
420
421    fn bump_version(&self) {
422        self.version.fetch_add(1, Ordering::Relaxed);
423    }
424}
425
426/// Result from checking if a document contains an emergency
427#[derive(Debug, Clone)]
428pub struct DocumentCheck {
429    /// Node ID from the document
430    pub node_id: NodeId,
431    /// Whether this document contains an emergency
432    pub is_emergency: bool,
433    /// Whether this document contains an ACK
434    pub is_ack: bool,
435}
436
437impl DocumentCheck {
438    /// Quick check of a document without full parsing
439    pub fn from_document(data: &[u8]) -> Option<Self> {
440        let doc = HiveDocument::decode(data)?;
441
442        let (is_emergency, is_ack) = doc
443            .peripheral
444            .as_ref()
445            .and_then(|p| p.last_event.as_ref())
446            .map(|e| {
447                (
448                    e.event_type == EventType::Emergency,
449                    e.event_type == EventType::Ack,
450                )
451            })
452            .unwrap_or((false, false));
453
454        Some(Self {
455            node_id: doc.node_id,
456            is_emergency,
457            is_ack,
458        })
459    }
460}
461
462#[cfg(test)]
463mod tests {
464    use super::*;
465
466    #[test]
467    fn test_document_sync_new() {
468        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
469
470        assert_eq!(sync.node_id().as_u32(), 0x12345678);
471        assert_eq!(sync.version(), 1);
472        assert_eq!(sync.total_count(), 0);
473        assert_eq!(sync.callsign(), "ALPHA-1");
474        assert!(sync.current_event().is_none());
475    }
476
477    #[test]
478    fn test_send_emergency() {
479        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
480
481        let doc_bytes = sync.send_emergency(1234567890);
482
483        assert!(!doc_bytes.is_empty());
484        assert_eq!(sync.total_count(), 1);
485        assert!(sync.is_emergency_active());
486        assert!(!sync.is_ack_active());
487
488        // Verify we can decode what we sent
489        let doc = HiveDocument::decode(&doc_bytes).unwrap();
490        assert_eq!(doc.node_id.as_u32(), 0x12345678);
491        assert!(doc.peripheral.is_some());
492        let event = doc.peripheral.unwrap().last_event.unwrap();
493        assert_eq!(event.event_type, EventType::Emergency);
494    }
495
496    #[test]
497    fn test_send_ack() {
498        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
499
500        let doc_bytes = sync.send_ack(1234567890);
501
502        assert!(!doc_bytes.is_empty());
503        assert_eq!(sync.total_count(), 1);
504        assert!(sync.is_ack_active());
505        assert!(!sync.is_emergency_active());
506    }
507
508    #[test]
509    fn test_clear_event() {
510        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
511
512        sync.send_emergency(1000);
513        assert!(sync.is_emergency_active());
514
515        sync.clear_event();
516        assert!(sync.current_event().is_none());
517    }
518
519    #[test]
520    fn test_merge_document() {
521        let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
522        let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
523
524        // sync2 sends emergency
525        let doc_bytes = sync2.send_emergency(1000);
526
527        // sync1 receives and merges
528        let result = sync1.merge_document(&doc_bytes);
529        assert!(result.is_some());
530
531        let result = result.unwrap();
532        assert_eq!(result.source_node.as_u32(), 0x22222222);
533        assert!(result.is_emergency());
534        assert!(result.counter_changed);
535        assert_eq!(result.total_count, 1);
536
537        // sync1's local count is still 0, but total includes sync2's contribution
538        assert_eq!(sync1.local_count(), 0);
539        assert_eq!(sync1.total_count(), 1);
540    }
541
542    #[test]
543    fn test_merge_own_document_ignored() {
544        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
545
546        let doc_bytes = sync.send_emergency(1000);
547
548        // Merging our own document should be ignored
549        let result = sync.merge_document(&doc_bytes);
550        assert!(result.is_none());
551    }
552
553    #[test]
554    fn test_version_increments() {
555        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
556
557        assert_eq!(sync.version(), 1);
558
559        sync.increment_counter();
560        assert_eq!(sync.version(), 2);
561
562        sync.send_emergency(1000);
563        assert_eq!(sync.version(), 3);
564
565        sync.clear_event();
566        assert_eq!(sync.version(), 4);
567    }
568
569    #[test]
570    fn test_document_check() {
571        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
572
573        let emergency_doc = sync.send_emergency(1000);
574        let check = DocumentCheck::from_document(&emergency_doc).unwrap();
575        assert_eq!(check.node_id.as_u32(), 0x12345678);
576        assert!(check.is_emergency);
577        assert!(!check.is_ack);
578
579        sync.clear_event();
580        let ack_doc = sync.send_ack(2000);
581        let check = DocumentCheck::from_document(&ack_doc).unwrap();
582        assert!(!check.is_emergency);
583        assert!(check.is_ack);
584    }
585
586    #[test]
587    fn test_counter_merge_idempotent() {
588        let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
589        let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
590
591        // sync2 sends something
592        let doc_bytes = sync2.send_emergency(1000);
593
594        // sync1 merges twice - second should not change counter
595        let result1 = sync1.merge_document(&doc_bytes).unwrap();
596        assert!(result1.counter_changed);
597        assert_eq!(sync1.total_count(), 1);
598
599        let result2 = sync1.merge_document(&doc_bytes).unwrap();
600        assert!(!result2.counter_changed); // No change on re-merge
601        assert_eq!(sync1.total_count(), 1);
602    }
603}