hive_btle/
document_sync.rs

1//! Document synchronization for HIVE BLE mesh
2//!
3//! This module provides centralized document state management for HIVE-Lite nodes.
4//! It manages the local CRDT state (GCounter) and handles merging with received documents.
5//!
6//! ## Design Notes
7//!
8//! This implementation uses a simple GCounter for resource-constrained devices (ESP32,
9//! smartwatches). For full HIVE nodes using AutomergeIroh, this component can be replaced
10//! or extended - the observer pattern and BLE transport layer are independent of the
11//! document format.
12//!
13//! ## Usage
14//!
15//! ```ignore
16//! use hive_btle::document_sync::DocumentSync;
17//! use hive_btle::NodeId;
18//!
19//! let sync = DocumentSync::new(NodeId::new(0x12345678), "SOLDIER-1");
20//!
21//! // Trigger an emergency
22//! let doc_bytes = sync.send_emergency();
23//! // ... broadcast doc_bytes over BLE
24//!
25//! // Handle received document
26//! if let Some(result) = sync.merge_document(&received_data) {
27//!     if result.is_emergency() {
28//!         println!("EMERGENCY from {:08X}", result.source_node.as_u32());
29//!     }
30//! }
31//! ```
32
33#[cfg(not(feature = "std"))]
34use alloc::{string::String, vec::Vec};
35#[cfg(feature = "std")]
36use std::sync::RwLock;
37
38#[cfg(not(feature = "std"))]
39use spin::RwLock;
40
41use core::sync::atomic::{AtomicU32, Ordering};
42
43use crate::document::{HiveDocument, MergeResult};
44use crate::sync::crdt::{EmergencyEvent, EventType, GCounter, Peripheral, PeripheralType};
45use crate::NodeId;
46
47/// Document synchronization manager for HIVE-Lite nodes
48///
49/// Manages the local CRDT state and handles document serialization/merging.
50/// Thread-safe for use from multiple BLE callbacks.
51///
52/// ## Integration with Full HIVE
53///
54/// This implementation uses a simple GCounter suitable for embedded devices.
55/// For integration with the larger HIVE project using AutomergeIroh:
56/// - The `build_document()` output can be wrapped in an Automerge-compatible format
57/// - The observer events (Emergency, Ack, DocumentSynced) work with any backend
58/// - The BLE transport layer is document-format agnostic
59pub struct DocumentSync {
60    /// Our node ID
61    node_id: NodeId,
62
63    /// CRDT G-Counter for mesh activity tracking
64    counter: RwLock<GCounter>,
65
66    /// Peripheral data (callsign, type, location)
67    peripheral: RwLock<Peripheral>,
68
69    /// Active emergency event with ACK tracking (CRDT)
70    emergency: RwLock<Option<EmergencyEvent>>,
71
72    /// Document version (monotonically increasing)
73    version: AtomicU32,
74}
75
76impl DocumentSync {
77    /// Create a new document sync manager
78    pub fn new(node_id: NodeId, callsign: &str) -> Self {
79        let peripheral = Peripheral::new(node_id.as_u32(), PeripheralType::SoldierSensor)
80            .with_callsign(callsign);
81
82        Self {
83            node_id,
84            counter: RwLock::new(GCounter::new()),
85            peripheral: RwLock::new(peripheral),
86            emergency: RwLock::new(None),
87            version: AtomicU32::new(1),
88        }
89    }
90
91    /// Create with a specific peripheral type
92    pub fn with_peripheral_type(node_id: NodeId, callsign: &str, ptype: PeripheralType) -> Self {
93        let peripheral = Peripheral::new(node_id.as_u32(), ptype).with_callsign(callsign);
94
95        Self {
96            node_id,
97            counter: RwLock::new(GCounter::new()),
98            peripheral: RwLock::new(peripheral),
99            emergency: RwLock::new(None),
100            version: AtomicU32::new(1),
101        }
102    }
103
104    /// Get our node ID
105    pub fn node_id(&self) -> NodeId {
106        self.node_id
107    }
108
109    /// Get the current document version
110    pub fn version(&self) -> u32 {
111        self.version.load(Ordering::Relaxed)
112    }
113
114    /// Get the total counter value
115    pub fn total_count(&self) -> u64 {
116        self.counter.read().unwrap().value()
117    }
118
119    /// Get our counter contribution
120    pub fn local_count(&self) -> u64 {
121        self.counter.read().unwrap().node_count(&self.node_id)
122    }
123
124    /// Get current event type (if any)
125    pub fn current_event(&self) -> Option<EventType> {
126        self.peripheral
127            .read()
128            .unwrap()
129            .last_event
130            .as_ref()
131            .map(|e| e.event_type)
132    }
133
134    /// Check if we're in emergency state
135    pub fn is_emergency_active(&self) -> bool {
136        self.current_event() == Some(EventType::Emergency)
137    }
138
139    /// Check if we've sent an ACK
140    pub fn is_ack_active(&self) -> bool {
141        self.current_event() == Some(EventType::Ack)
142    }
143
144    /// Get the callsign
145    pub fn callsign(&self) -> String {
146        self.peripheral.read().unwrap().callsign_str().to_string()
147    }
148
149    // ==================== State Mutations ====================
150
151    /// Send an emergency - returns the document bytes to broadcast
152    pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
153        // Set emergency event
154        {
155            let mut peripheral = self.peripheral.write().unwrap();
156            peripheral.set_event(EventType::Emergency, timestamp);
157        }
158
159        // Increment counter
160        self.increment_counter_internal();
161
162        // Build and return document
163        self.build_document()
164    }
165
166    /// Send an ACK - returns the document bytes to broadcast
167    pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
168        // Set ACK event
169        {
170            let mut peripheral = self.peripheral.write().unwrap();
171            peripheral.set_event(EventType::Ack, timestamp);
172        }
173
174        // Increment counter
175        self.increment_counter_internal();
176
177        // Build and return document
178        self.build_document()
179    }
180
181    /// Clear the current event
182    pub fn clear_event(&self) {
183        let mut peripheral = self.peripheral.write().unwrap();
184        peripheral.clear_event();
185        self.bump_version();
186    }
187
188    /// Increment the counter (for periodic sync)
189    pub fn increment_counter(&self) {
190        self.increment_counter_internal();
191    }
192
193    /// Update health status (battery percentage)
194    pub fn update_health(&self, battery_percent: u8) {
195        let mut peripheral = self.peripheral.write().unwrap();
196        peripheral.health.battery_percent = battery_percent;
197        self.bump_version();
198    }
199
200    /// Update activity level (0=still, 1=walking, 2=running, 3=fall)
201    pub fn update_activity(&self, activity: u8) {
202        let mut peripheral = self.peripheral.write().unwrap();
203        peripheral.health.activity = activity;
204        self.bump_version();
205    }
206
207    /// Update full health status (battery and activity)
208    pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
209        let mut peripheral = self.peripheral.write().unwrap();
210        peripheral.health.battery_percent = battery_percent;
211        peripheral.health.activity = activity;
212        self.bump_version();
213    }
214
215    // ==================== Emergency Management ====================
216
217    /// Start a new emergency event
218    ///
219    /// Creates an emergency event that tracks ACKs from all known peers.
220    /// Returns the document bytes to broadcast.
221    pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
222        // Create emergency event with our node as source
223        {
224            let mut emergency = self.emergency.write().unwrap();
225            *emergency = Some(EmergencyEvent::new(
226                self.node_id.as_u32(),
227                timestamp,
228                known_peers,
229            ));
230        }
231
232        // Also set peripheral event for backward compatibility
233        {
234            let mut peripheral = self.peripheral.write().unwrap();
235            peripheral.set_event(EventType::Emergency, timestamp);
236        }
237
238        self.increment_counter_internal();
239        self.build_document()
240    }
241
242    /// Record our ACK for the current emergency
243    ///
244    /// Returns the document bytes to broadcast, or None if no emergency is active.
245    pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
246        let changed = {
247            let mut emergency = self.emergency.write().unwrap();
248            if let Some(ref mut e) = *emergency {
249                e.ack(self.node_id.as_u32())
250            } else {
251                return None;
252            }
253        };
254
255        if changed {
256            // Also set peripheral event for backward compatibility
257            {
258                let mut peripheral = self.peripheral.write().unwrap();
259                peripheral.set_event(EventType::Ack, timestamp);
260            }
261
262            self.increment_counter_internal();
263        }
264
265        Some(self.build_document())
266    }
267
268    /// Clear the current emergency event
269    pub fn clear_emergency(&self) {
270        let mut emergency = self.emergency.write().unwrap();
271        if emergency.is_some() {
272            *emergency = None;
273            drop(emergency);
274
275            // Also clear peripheral event
276            let mut peripheral = self.peripheral.write().unwrap();
277            peripheral.clear_event();
278
279            self.bump_version();
280        }
281    }
282
283    /// Check if there's an active emergency
284    pub fn has_active_emergency(&self) -> bool {
285        self.emergency.read().unwrap().is_some()
286    }
287
288    /// Get emergency status info
289    ///
290    /// Returns (source_node, timestamp, acked_count, pending_count) if emergency is active.
291    pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
292        let emergency = self.emergency.read().unwrap();
293        emergency.as_ref().map(|e| {
294            (
295                e.source_node(),
296                e.timestamp(),
297                e.ack_count(),
298                e.pending_nodes().len(),
299            )
300        })
301    }
302
303    /// Check if a specific peer has ACKed the current emergency
304    pub fn has_peer_acked(&self, peer_id: u32) -> bool {
305        let emergency = self.emergency.read().unwrap();
306        emergency
307            .as_ref()
308            .map(|e| e.has_acked(peer_id))
309            .unwrap_or(false)
310    }
311
312    /// Check if all peers have ACKed the current emergency
313    pub fn all_peers_acked(&self) -> bool {
314        let emergency = self.emergency.read().unwrap();
315        emergency.as_ref().map(|e| e.all_acked()).unwrap_or(true)
316    }
317
318    // ==================== Document I/O ====================
319
320    /// Build the document for transmission
321    ///
322    /// Returns the encoded bytes ready for BLE GATT write.
323    pub fn build_document(&self) -> Vec<u8> {
324        let counter = self.counter.read().unwrap().clone();
325        let peripheral = self.peripheral.read().unwrap().clone();
326        let emergency = self.emergency.read().unwrap().clone();
327
328        let doc = HiveDocument {
329            version: self.version.load(Ordering::Relaxed),
330            node_id: self.node_id,
331            counter,
332            peripheral: Some(peripheral),
333            emergency,
334        };
335
336        doc.encode()
337    }
338
339    /// Merge a received document
340    ///
341    /// Returns `Some(MergeResult)` if the document was valid, `None` otherwise.
342    /// The result contains information about what changed and any events.
343    pub fn merge_document(&self, data: &[u8]) -> Option<MergeResult> {
344        let received = HiveDocument::decode(data)?;
345
346        // Don't process our own documents
347        if received.node_id == self.node_id {
348            return None;
349        }
350
351        // Merge the counter
352        let counter_changed = {
353            let mut counter = self.counter.write().unwrap();
354            let old_value = counter.value();
355            counter.merge(&received.counter);
356            counter.value() != old_value
357        };
358
359        // Merge emergency event (CRDT merge)
360        let emergency_changed = if let Some(ref received_emergency) = received.emergency {
361            let mut emergency = self.emergency.write().unwrap();
362            match &mut *emergency {
363                Some(ref mut our_emergency) => our_emergency.merge(received_emergency),
364                None => {
365                    *emergency = Some(received_emergency.clone());
366                    true
367                }
368            }
369        } else {
370            false
371        };
372
373        if counter_changed || emergency_changed {
374            self.bump_version();
375        }
376
377        // Extract event from received document
378        let event = received
379            .peripheral
380            .as_ref()
381            .and_then(|p| p.last_event.clone());
382
383        Some(MergeResult {
384            source_node: received.node_id,
385            event,
386            counter_changed,
387            emergency_changed,
388            total_count: self.total_count(),
389        })
390    }
391
392    /// Create a document from raw bytes (for inspection without merging)
393    pub fn decode_document(data: &[u8]) -> Option<HiveDocument> {
394        HiveDocument::decode(data)
395    }
396
397    // ==================== Internal Helpers ====================
398
399    fn increment_counter_internal(&self) {
400        let mut counter = self.counter.write().unwrap();
401        counter.increment(&self.node_id, 1);
402        drop(counter);
403        self.bump_version();
404    }
405
406    fn bump_version(&self) {
407        self.version.fetch_add(1, Ordering::Relaxed);
408    }
409}
410
411/// Result from checking if a document contains an emergency
412#[derive(Debug, Clone)]
413pub struct DocumentCheck {
414    /// Node ID from the document
415    pub node_id: NodeId,
416    /// Whether this document contains an emergency
417    pub is_emergency: bool,
418    /// Whether this document contains an ACK
419    pub is_ack: bool,
420}
421
422impl DocumentCheck {
423    /// Quick check of a document without full parsing
424    pub fn from_document(data: &[u8]) -> Option<Self> {
425        let doc = HiveDocument::decode(data)?;
426
427        let (is_emergency, is_ack) = doc
428            .peripheral
429            .as_ref()
430            .and_then(|p| p.last_event.as_ref())
431            .map(|e| {
432                (
433                    e.event_type == EventType::Emergency,
434                    e.event_type == EventType::Ack,
435                )
436            })
437            .unwrap_or((false, false));
438
439        Some(Self {
440            node_id: doc.node_id,
441            is_emergency,
442            is_ack,
443        })
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450
451    #[test]
452    fn test_document_sync_new() {
453        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
454
455        assert_eq!(sync.node_id().as_u32(), 0x12345678);
456        assert_eq!(sync.version(), 1);
457        assert_eq!(sync.total_count(), 0);
458        assert_eq!(sync.callsign(), "ALPHA-1");
459        assert!(sync.current_event().is_none());
460    }
461
462    #[test]
463    fn test_send_emergency() {
464        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
465
466        let doc_bytes = sync.send_emergency(1234567890);
467
468        assert!(!doc_bytes.is_empty());
469        assert_eq!(sync.total_count(), 1);
470        assert!(sync.is_emergency_active());
471        assert!(!sync.is_ack_active());
472
473        // Verify we can decode what we sent
474        let doc = HiveDocument::decode(&doc_bytes).unwrap();
475        assert_eq!(doc.node_id.as_u32(), 0x12345678);
476        assert!(doc.peripheral.is_some());
477        let event = doc.peripheral.unwrap().last_event.unwrap();
478        assert_eq!(event.event_type, EventType::Emergency);
479    }
480
481    #[test]
482    fn test_send_ack() {
483        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
484
485        let doc_bytes = sync.send_ack(1234567890);
486
487        assert!(!doc_bytes.is_empty());
488        assert_eq!(sync.total_count(), 1);
489        assert!(sync.is_ack_active());
490        assert!(!sync.is_emergency_active());
491    }
492
493    #[test]
494    fn test_clear_event() {
495        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
496
497        sync.send_emergency(1000);
498        assert!(sync.is_emergency_active());
499
500        sync.clear_event();
501        assert!(sync.current_event().is_none());
502    }
503
504    #[test]
505    fn test_merge_document() {
506        let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
507        let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
508
509        // sync2 sends emergency
510        let doc_bytes = sync2.send_emergency(1000);
511
512        // sync1 receives and merges
513        let result = sync1.merge_document(&doc_bytes);
514        assert!(result.is_some());
515
516        let result = result.unwrap();
517        assert_eq!(result.source_node.as_u32(), 0x22222222);
518        assert!(result.is_emergency());
519        assert!(result.counter_changed);
520        assert_eq!(result.total_count, 1);
521
522        // sync1's local count is still 0, but total includes sync2's contribution
523        assert_eq!(sync1.local_count(), 0);
524        assert_eq!(sync1.total_count(), 1);
525    }
526
527    #[test]
528    fn test_merge_own_document_ignored() {
529        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
530
531        let doc_bytes = sync.send_emergency(1000);
532
533        // Merging our own document should be ignored
534        let result = sync.merge_document(&doc_bytes);
535        assert!(result.is_none());
536    }
537
538    #[test]
539    fn test_version_increments() {
540        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
541
542        assert_eq!(sync.version(), 1);
543
544        sync.increment_counter();
545        assert_eq!(sync.version(), 2);
546
547        sync.send_emergency(1000);
548        assert_eq!(sync.version(), 3);
549
550        sync.clear_event();
551        assert_eq!(sync.version(), 4);
552    }
553
554    #[test]
555    fn test_document_check() {
556        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
557
558        let emergency_doc = sync.send_emergency(1000);
559        let check = DocumentCheck::from_document(&emergency_doc).unwrap();
560        assert_eq!(check.node_id.as_u32(), 0x12345678);
561        assert!(check.is_emergency);
562        assert!(!check.is_ack);
563
564        sync.clear_event();
565        let ack_doc = sync.send_ack(2000);
566        let check = DocumentCheck::from_document(&ack_doc).unwrap();
567        assert!(!check.is_emergency);
568        assert!(check.is_ack);
569    }
570
571    #[test]
572    fn test_counter_merge_idempotent() {
573        let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
574        let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
575
576        // sync2 sends something
577        let doc_bytes = sync2.send_emergency(1000);
578
579        // sync1 merges twice - second should not change counter
580        let result1 = sync1.merge_document(&doc_bytes).unwrap();
581        assert!(result1.counter_changed);
582        assert_eq!(sync1.total_count(), 1);
583
584        let result2 = sync1.merge_document(&doc_bytes).unwrap();
585        assert!(!result2.counter_changed); // No change on re-merge
586        assert_eq!(sync1.total_count(), 1);
587    }
588}