hive_btle/
document_sync.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Document synchronization for HIVE BLE mesh
17//!
18//! This module provides centralized document state management for HIVE-Lite nodes.
19//! It manages the local CRDT state (GCounter) and handles merging with received documents.
20//!
21//! ## Design Notes
22//!
23//! This implementation uses a simple GCounter for resource-constrained devices (ESP32,
24//! smartwatches). For full HIVE nodes using AutomergeIroh, this component can be replaced
25//! or extended - the observer pattern and BLE transport layer are independent of the
26//! document format.
27//!
28//! ## Usage
29//!
30//! ```ignore
31//! use hive_btle::document_sync::DocumentSync;
32//! use hive_btle::NodeId;
33//!
34//! let sync = DocumentSync::new(NodeId::new(0x12345678), "SOLDIER-1");
35//!
36//! // Trigger an emergency
37//! let doc_bytes = sync.send_emergency();
38//! // ... broadcast doc_bytes over BLE
39//!
40//! // Handle received document
41//! if let Some(result) = sync.merge_document(&received_data) {
42//!     if result.is_emergency() {
43//!         println!("EMERGENCY from {:08X}", result.source_node.as_u32());
44//!     }
45//! }
46//! ```
47
48#[cfg(not(feature = "std"))]
49use alloc::{string::String, vec::Vec};
50#[cfg(feature = "std")]
51use std::sync::RwLock;
52
53#[cfg(not(feature = "std"))]
54use spin::RwLock;
55
56use core::sync::atomic::{AtomicU32, Ordering};
57
58use crate::document::{HiveDocument, MergeResult};
59use crate::sync::crdt::{
60    ChatCRDT, ChatMessage, EmergencyEvent, EventType, GCounter, Peripheral, PeripheralType,
61};
62use crate::NodeId;
63
64/// Document synchronization manager for HIVE-Lite nodes
65///
66/// Manages the local CRDT state and handles document serialization/merging.
67/// Thread-safe for use from multiple BLE callbacks.
68///
69/// ## Integration with Full HIVE
70///
71/// This implementation uses a simple GCounter suitable for embedded devices.
72/// For integration with the larger HIVE project using AutomergeIroh:
73/// - The `build_document()` output can be wrapped in an Automerge-compatible format
74/// - The observer events (Emergency, Ack, DocumentSynced) work with any backend
75/// - The BLE transport layer is document-format agnostic
76pub struct DocumentSync {
77    /// Our node ID
78    node_id: NodeId,
79
80    /// CRDT G-Counter for mesh activity tracking
81    counter: RwLock<GCounter>,
82
83    /// Peripheral data (callsign, type, location)
84    peripheral: RwLock<Peripheral>,
85
86    /// Active emergency event with ACK tracking (CRDT)
87    emergency: RwLock<Option<EmergencyEvent>>,
88
89    /// Chat CRDT for mesh-wide messaging
90    chat: RwLock<Option<ChatCRDT>>,
91
92    /// Document version (monotonically increasing)
93    version: AtomicU32,
94}
95
96impl DocumentSync {
97    /// Create a new document sync manager
98    pub fn new(node_id: NodeId, callsign: &str) -> Self {
99        let peripheral = Peripheral::new(node_id.as_u32(), PeripheralType::SoldierSensor)
100            .with_callsign(callsign);
101
102        Self {
103            node_id,
104            counter: RwLock::new(GCounter::new()),
105            peripheral: RwLock::new(peripheral),
106            emergency: RwLock::new(None),
107            chat: RwLock::new(None),
108            version: AtomicU32::new(1),
109        }
110    }
111
112    /// Create with a specific peripheral type
113    pub fn with_peripheral_type(node_id: NodeId, callsign: &str, ptype: PeripheralType) -> Self {
114        let peripheral = Peripheral::new(node_id.as_u32(), ptype).with_callsign(callsign);
115
116        Self {
117            node_id,
118            counter: RwLock::new(GCounter::new()),
119            peripheral: RwLock::new(peripheral),
120            emergency: RwLock::new(None),
121            chat: RwLock::new(None),
122            version: AtomicU32::new(1),
123        }
124    }
125
126    /// Get our node ID
127    pub fn node_id(&self) -> NodeId {
128        self.node_id
129    }
130
131    /// Get the current document version
132    pub fn version(&self) -> u32 {
133        self.version.load(Ordering::Relaxed)
134    }
135
136    /// Get the total counter value
137    pub fn total_count(&self) -> u64 {
138        self.counter.read().unwrap().value()
139    }
140
141    /// Get our counter contribution
142    pub fn local_count(&self) -> u64 {
143        self.counter.read().unwrap().node_count(&self.node_id)
144    }
145
146    /// Get current event type (if any)
147    pub fn current_event(&self) -> Option<EventType> {
148        self.peripheral
149            .read()
150            .unwrap()
151            .last_event
152            .as_ref()
153            .map(|e| e.event_type)
154    }
155
156    /// Check if we're in emergency state
157    pub fn is_emergency_active(&self) -> bool {
158        self.current_event() == Some(EventType::Emergency)
159    }
160
161    /// Check if we've sent an ACK
162    pub fn is_ack_active(&self) -> bool {
163        self.current_event() == Some(EventType::Ack)
164    }
165
166    /// Get the callsign
167    pub fn callsign(&self) -> String {
168        self.peripheral.read().unwrap().callsign_str().to_string()
169    }
170
171    // ==================== State Mutations ====================
172
173    /// Send an emergency - returns the document bytes to broadcast
174    pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
175        // Set emergency event
176        {
177            let mut peripheral = self.peripheral.write().unwrap();
178            peripheral.set_event(EventType::Emergency, timestamp);
179        }
180
181        // Increment counter
182        self.increment_counter_internal();
183
184        // Build and return document
185        self.build_document()
186    }
187
188    /// Send an ACK - returns the document bytes to broadcast
189    pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
190        // Set ACK event
191        {
192            let mut peripheral = self.peripheral.write().unwrap();
193            peripheral.set_event(EventType::Ack, timestamp);
194        }
195
196        // Increment counter
197        self.increment_counter_internal();
198
199        // Build and return document
200        self.build_document()
201    }
202
203    /// Clear the current event
204    pub fn clear_event(&self) {
205        let mut peripheral = self.peripheral.write().unwrap();
206        peripheral.clear_event();
207        self.bump_version();
208    }
209
210    /// Increment the counter (for periodic sync)
211    pub fn increment_counter(&self) {
212        self.increment_counter_internal();
213    }
214
215    /// Update health status (battery percentage)
216    pub fn update_health(&self, battery_percent: u8) {
217        let mut peripheral = self.peripheral.write().unwrap();
218        peripheral.health.battery_percent = battery_percent;
219        self.bump_version();
220    }
221
222    /// Update activity level (0=still, 1=walking, 2=running, 3=fall)
223    pub fn update_activity(&self, activity: u8) {
224        let mut peripheral = self.peripheral.write().unwrap();
225        peripheral.health.activity = activity;
226        self.bump_version();
227    }
228
229    /// Update full health status (battery and activity)
230    pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
231        let mut peripheral = self.peripheral.write().unwrap();
232        peripheral.health.battery_percent = battery_percent;
233        peripheral.health.activity = activity;
234        self.bump_version();
235    }
236
237    // ==================== Emergency Management ====================
238
239    /// Start a new emergency event
240    ///
241    /// Creates an emergency event that tracks ACKs from all known peers.
242    /// Returns the document bytes to broadcast.
243    pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
244        // Create emergency event with our node as source
245        {
246            let mut emergency = self.emergency.write().unwrap();
247            *emergency = Some(EmergencyEvent::new(
248                self.node_id.as_u32(),
249                timestamp,
250                known_peers,
251            ));
252        }
253
254        // Also set peripheral event for backward compatibility
255        {
256            let mut peripheral = self.peripheral.write().unwrap();
257            peripheral.set_event(EventType::Emergency, timestamp);
258        }
259
260        self.increment_counter_internal();
261        self.build_document()
262    }
263
264    /// Record our ACK for the current emergency
265    ///
266    /// Returns the document bytes to broadcast, or None if no emergency is active.
267    pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
268        let changed = {
269            let mut emergency = self.emergency.write().unwrap();
270            if let Some(ref mut e) = *emergency {
271                e.ack(self.node_id.as_u32())
272            } else {
273                return None;
274            }
275        };
276
277        if changed {
278            // Also set peripheral event for backward compatibility
279            {
280                let mut peripheral = self.peripheral.write().unwrap();
281                peripheral.set_event(EventType::Ack, timestamp);
282            }
283
284            self.increment_counter_internal();
285        }
286
287        Some(self.build_document())
288    }
289
290    /// Clear the current emergency event
291    pub fn clear_emergency(&self) {
292        let mut emergency = self.emergency.write().unwrap();
293        if emergency.is_some() {
294            *emergency = None;
295            drop(emergency);
296
297            // Also clear peripheral event
298            let mut peripheral = self.peripheral.write().unwrap();
299            peripheral.clear_event();
300
301            self.bump_version();
302        }
303    }
304
305    /// Check if there's an active emergency
306    pub fn has_active_emergency(&self) -> bool {
307        self.emergency.read().unwrap().is_some()
308    }
309
310    /// Get emergency status info
311    ///
312    /// Returns (source_node, timestamp, acked_count, pending_count) if emergency is active.
313    pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
314        let emergency = self.emergency.read().unwrap();
315        emergency.as_ref().map(|e| {
316            (
317                e.source_node(),
318                e.timestamp(),
319                e.ack_count(),
320                e.pending_nodes().len(),
321            )
322        })
323    }
324
325    /// Check if a specific peer has ACKed the current emergency
326    pub fn has_peer_acked(&self, peer_id: u32) -> bool {
327        let emergency = self.emergency.read().unwrap();
328        emergency
329            .as_ref()
330            .map(|e| e.has_acked(peer_id))
331            .unwrap_or(false)
332    }
333
334    /// Check if all peers have ACKed the current emergency
335    pub fn all_peers_acked(&self) -> bool {
336        let emergency = self.emergency.read().unwrap();
337        emergency.as_ref().map(|e| e.all_acked()).unwrap_or(true)
338    }
339
340    // ==================== Chat Methods ====================
341
342    /// Add a chat message to the local CRDT
343    ///
344    /// Returns true if the message was new (not a duplicate).
345    pub fn add_chat_message(&self, sender: &str, text: &str, timestamp: u64) -> bool {
346        let mut chat = self.chat.write().unwrap();
347
348        let our_chat = chat.get_or_insert_with(ChatCRDT::new);
349        let msg = ChatMessage::new(self.node_id.as_u32(), timestamp, sender, text);
350
351        if our_chat.add_message(msg) {
352            self.bump_version();
353            true
354        } else {
355            false
356        }
357    }
358
359    /// Add a chat reply to the local CRDT
360    ///
361    /// Returns true if the message was new.
362    pub fn add_chat_reply(
363        &self,
364        sender: &str,
365        text: &str,
366        reply_to_node: u32,
367        reply_to_timestamp: u64,
368        timestamp: u64,
369    ) -> bool {
370        let mut chat = self.chat.write().unwrap();
371
372        let our_chat = chat.get_or_insert_with(ChatCRDT::new);
373        let mut msg = ChatMessage::new(self.node_id.as_u32(), timestamp, sender, text);
374        msg.set_reply_to(reply_to_node, reply_to_timestamp);
375
376        if our_chat.add_message(msg) {
377            self.bump_version();
378            true
379        } else {
380            false
381        }
382    }
383
384    /// Get the number of chat messages
385    pub fn chat_count(&self) -> usize {
386        self.chat.read().unwrap().as_ref().map_or(0, |c| c.len())
387    }
388
389    /// Get chat messages newer than a timestamp
390    ///
391    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
392    pub fn chat_messages_since(
393        &self,
394        since_timestamp: u64,
395    ) -> Vec<(u32, u64, String, String, u32, u64)> {
396        let chat = self.chat.read().unwrap();
397        chat.as_ref()
398            .map(|c| {
399                c.messages_since(since_timestamp)
400                    .map(|m| {
401                        (
402                            m.origin_node,
403                            m.timestamp,
404                            m.sender().to_string(),
405                            m.text().to_string(),
406                            m.reply_to_node,
407                            m.reply_to_timestamp,
408                        )
409                    })
410                    .collect()
411            })
412            .unwrap_or_default()
413    }
414
415    /// Get all chat messages
416    ///
417    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
418    pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
419        self.chat_messages_since(0)
420    }
421
422    /// Get a snapshot of the chat CRDT
423    pub fn chat_snapshot(&self) -> Option<ChatCRDT> {
424        self.chat.read().unwrap().clone()
425    }
426
427    // ==================== Delta Document Support ====================
428
429    /// Get all counter entries for delta document building
430    ///
431    /// Returns a vector of (node_id, count) pairs for all nodes
432    /// that have contributed to the counter.
433    pub fn counter_entries(&self) -> Vec<(u32, u64)> {
434        self.counter.read().unwrap().entries().collect()
435    }
436
437    /// Get a clone of the peripheral state
438    ///
439    /// Used for building delta documents with peripheral updates.
440    pub fn peripheral_snapshot(&self) -> Peripheral {
441        self.peripheral.read().unwrap().clone()
442    }
443
444    /// Get a clone of the emergency state
445    ///
446    /// Used for building delta documents with emergency data.
447    pub fn emergency_snapshot(&self) -> Option<EmergencyEvent> {
448        self.emergency.read().unwrap().clone()
449    }
450
451    // ==================== Document I/O ====================
452
453    /// Build the document for transmission
454    ///
455    /// Returns the encoded bytes ready for BLE GATT write.
456    pub fn build_document(&self) -> Vec<u8> {
457        let counter = self.counter.read().unwrap().clone();
458        let peripheral = self.peripheral.read().unwrap().clone();
459        let emergency = self.emergency.read().unwrap().clone();
460
461        // Use for_sync() to limit chat messages in the document
462        // This prevents exceeding BLE MTU limits while keeping full history locally
463        let chat = self.chat.read().unwrap().as_ref().map(|c| c.for_sync());
464
465        let doc = HiveDocument {
466            version: self.version.load(Ordering::Relaxed),
467            node_id: self.node_id,
468            counter,
469            peripheral: Some(peripheral),
470            emergency,
471            chat,
472        };
473
474        doc.encode()
475    }
476
477    /// Merge a received document
478    ///
479    /// Returns `Some(MergeResult)` if the document was valid, `None` otherwise.
480    /// The result contains information about what changed and any events.
481    pub fn merge_document(&self, data: &[u8]) -> Option<MergeResult> {
482        let received = HiveDocument::decode(data)?;
483
484        // Don't process our own documents
485        if received.node_id == self.node_id {
486            return None;
487        }
488
489        // Merge the counter
490        let counter_changed = {
491            let mut counter = self.counter.write().unwrap();
492            let old_value = counter.value();
493            counter.merge(&received.counter);
494            counter.value() != old_value
495        };
496
497        // Merge emergency event (CRDT merge)
498        let emergency_changed = if let Some(ref received_emergency) = received.emergency {
499            let mut emergency = self.emergency.write().unwrap();
500            match &mut *emergency {
501                Some(ref mut our_emergency) => our_emergency.merge(received_emergency),
502                None => {
503                    *emergency = Some(received_emergency.clone());
504                    true
505                }
506            }
507        } else {
508            false
509        };
510
511        // Merge chat CRDT
512        let chat_changed = if let Some(ref received_chat) = received.chat {
513            if !received_chat.is_empty() {
514                let mut chat = self.chat.write().unwrap();
515                match &mut *chat {
516                    Some(ref mut our_chat) => our_chat.merge(received_chat),
517                    None => {
518                        *chat = Some(received_chat.clone());
519                        true
520                    }
521                }
522            } else {
523                false
524            }
525        } else {
526            false
527        };
528
529        if counter_changed || emergency_changed || chat_changed {
530            self.bump_version();
531        }
532
533        // Extract event from received document
534        let event = received
535            .peripheral
536            .as_ref()
537            .and_then(|p| p.last_event.clone());
538
539        Some(MergeResult {
540            source_node: received.node_id,
541            event,
542            counter_changed,
543            emergency_changed,
544            chat_changed,
545            total_count: self.total_count(),
546        })
547    }
548
549    /// Create a document from raw bytes (for inspection without merging)
550    pub fn decode_document(data: &[u8]) -> Option<HiveDocument> {
551        HiveDocument::decode(data)
552    }
553
554    // ==================== Internal Helpers ====================
555
556    fn increment_counter_internal(&self) {
557        let mut counter = self.counter.write().unwrap();
558        counter.increment(&self.node_id, 1);
559        drop(counter);
560        self.bump_version();
561    }
562
563    fn bump_version(&self) {
564        self.version.fetch_add(1, Ordering::Relaxed);
565    }
566}
567
568/// Result from checking if a document contains an emergency
569#[derive(Debug, Clone)]
570pub struct DocumentCheck {
571    /// Node ID from the document
572    pub node_id: NodeId,
573    /// Whether this document contains an emergency
574    pub is_emergency: bool,
575    /// Whether this document contains an ACK
576    pub is_ack: bool,
577}
578
579impl DocumentCheck {
580    /// Quick check of a document without full parsing
581    pub fn from_document(data: &[u8]) -> Option<Self> {
582        let doc = HiveDocument::decode(data)?;
583
584        let (is_emergency, is_ack) = doc
585            .peripheral
586            .as_ref()
587            .and_then(|p| p.last_event.as_ref())
588            .map(|e| {
589                (
590                    e.event_type == EventType::Emergency,
591                    e.event_type == EventType::Ack,
592                )
593            })
594            .unwrap_or((false, false));
595
596        Some(Self {
597            node_id: doc.node_id,
598            is_emergency,
599            is_ack,
600        })
601    }
602}
603
604#[cfg(test)]
605mod tests {
606    use super::*;
607
608    // Valid timestamp for testing (2024-01-15 00:00:00 UTC)
609    const TEST_TIMESTAMP: u64 = 1705276800000;
610
611    #[test]
612    fn test_document_sync_new() {
613        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
614
615        assert_eq!(sync.node_id().as_u32(), 0x12345678);
616        assert_eq!(sync.version(), 1);
617        assert_eq!(sync.total_count(), 0);
618        assert_eq!(sync.callsign(), "ALPHA-1");
619        assert!(sync.current_event().is_none());
620    }
621
622    #[test]
623    fn test_send_emergency() {
624        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
625
626        let doc_bytes = sync.send_emergency(TEST_TIMESTAMP);
627
628        assert!(!doc_bytes.is_empty());
629        assert_eq!(sync.total_count(), 1);
630        assert!(sync.is_emergency_active());
631        assert!(!sync.is_ack_active());
632
633        // Verify we can decode what we sent
634        let doc = HiveDocument::decode(&doc_bytes).unwrap();
635        assert_eq!(doc.node_id.as_u32(), 0x12345678);
636        assert!(doc.peripheral.is_some());
637        let event = doc.peripheral.unwrap().last_event.unwrap();
638        assert_eq!(event.event_type, EventType::Emergency);
639    }
640
641    #[test]
642    fn test_send_ack() {
643        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
644
645        let doc_bytes = sync.send_ack(TEST_TIMESTAMP);
646
647        assert!(!doc_bytes.is_empty());
648        assert_eq!(sync.total_count(), 1);
649        assert!(sync.is_ack_active());
650        assert!(!sync.is_emergency_active());
651    }
652
653    #[test]
654    fn test_clear_event() {
655        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
656
657        sync.send_emergency(TEST_TIMESTAMP);
658        assert!(sync.is_emergency_active());
659
660        sync.clear_event();
661        assert!(sync.current_event().is_none());
662    }
663
664    #[test]
665    fn test_merge_document() {
666        let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
667        let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
668
669        // sync2 sends emergency
670        let doc_bytes = sync2.send_emergency(TEST_TIMESTAMP);
671
672        // sync1 receives and merges
673        let result = sync1.merge_document(&doc_bytes);
674        assert!(result.is_some());
675
676        let result = result.unwrap();
677        assert_eq!(result.source_node.as_u32(), 0x22222222);
678        assert!(result.is_emergency());
679        assert!(result.counter_changed);
680        assert_eq!(result.total_count, 1);
681
682        // sync1's local count is still 0, but total includes sync2's contribution
683        assert_eq!(sync1.local_count(), 0);
684        assert_eq!(sync1.total_count(), 1);
685    }
686
687    #[test]
688    fn test_merge_own_document_ignored() {
689        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
690
691        let doc_bytes = sync.send_emergency(TEST_TIMESTAMP);
692
693        // Merging our own document should be ignored
694        let result = sync.merge_document(&doc_bytes);
695        assert!(result.is_none());
696    }
697
698    #[test]
699    fn test_version_increments() {
700        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
701
702        assert_eq!(sync.version(), 1);
703
704        sync.increment_counter();
705        assert_eq!(sync.version(), 2);
706
707        sync.send_emergency(TEST_TIMESTAMP);
708        assert_eq!(sync.version(), 3);
709
710        sync.clear_event();
711        assert_eq!(sync.version(), 4);
712    }
713
714    #[test]
715    fn test_document_check() {
716        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
717
718        let emergency_doc = sync.send_emergency(TEST_TIMESTAMP);
719        let check = DocumentCheck::from_document(&emergency_doc).unwrap();
720        assert_eq!(check.node_id.as_u32(), 0x12345678);
721        assert!(check.is_emergency);
722        assert!(!check.is_ack);
723
724        sync.clear_event();
725        let ack_doc = sync.send_ack(TEST_TIMESTAMP + 1000);
726        let check = DocumentCheck::from_document(&ack_doc).unwrap();
727        assert!(!check.is_emergency);
728        assert!(check.is_ack);
729    }
730
731    #[test]
732    fn test_counter_merge_idempotent() {
733        let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
734        let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
735
736        // sync2 sends something
737        let doc_bytes = sync2.send_emergency(TEST_TIMESTAMP);
738
739        // sync1 merges twice - second should not change counter
740        let result1 = sync1.merge_document(&doc_bytes).unwrap();
741        assert!(result1.counter_changed);
742        assert_eq!(sync1.total_count(), 1);
743
744        let result2 = sync1.merge_document(&doc_bytes).unwrap();
745        assert!(!result2.counter_changed); // No change on re-merge
746        assert_eq!(sync1.total_count(), 1);
747    }
748}