Skip to main content

hive_btle/
document_sync.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Document synchronization for HIVE BLE mesh
17//!
18//! This module provides centralized document state management for HIVE-Lite nodes.
19//! It manages the local CRDT state (GCounter) and handles merging with received documents.
20//!
21//! ## Design Notes
22//!
23//! This implementation uses a simple GCounter for resource-constrained devices (ESP32,
24//! smartwatches). For full HIVE nodes using AutomergeIroh, this component can be replaced
25//! or extended - the observer pattern and BLE transport layer are independent of the
26//! document format.
27//!
28//! ## Usage
29//!
30//! ```ignore
31//! use hive_btle::document_sync::DocumentSync;
32//! use hive_btle::NodeId;
33//!
34//! let sync = DocumentSync::new(NodeId::new(0x12345678), "SOLDIER-1");
35//!
36//! // Trigger an emergency
37//! let doc_bytes = sync.send_emergency();
38//! // ... broadcast doc_bytes over BLE
39//!
40//! // Handle received document
41//! if let Some(result) = sync.merge_document(&received_data) {
42//!     if result.is_emergency() {
43//!         println!("EMERGENCY from {:08X}", result.source_node.as_u32());
44//!     }
45//! }
46//! ```
47
48#[cfg(not(feature = "std"))]
49use alloc::{string::String, vec::Vec};
50#[cfg(feature = "std")]
51use std::sync::RwLock;
52
53#[cfg(not(feature = "std"))]
54use spin::RwLock;
55
56use core::sync::atomic::{AtomicU32, Ordering};
57
58use crate::document::{HiveDocument, MergeResult};
59#[cfg(feature = "legacy-chat")]
60use crate::sync::crdt::{ChatCRDT, ChatMessage};
61use crate::sync::crdt::{EmergencyEvent, EventType, GCounter, Peripheral, PeripheralType};
62use crate::NodeId;
63
64/// Document synchronization manager for HIVE-Lite nodes
65///
66/// Manages the local CRDT state and handles document serialization/merging.
67/// Thread-safe for use from multiple BLE callbacks.
68///
69/// ## Integration with Full HIVE
70///
71/// This implementation uses a simple GCounter suitable for embedded devices.
72/// For integration with the larger HIVE project using AutomergeIroh:
73/// - The `build_document()` output can be wrapped in an Automerge-compatible format
74/// - The observer events (Emergency, Ack, DocumentSynced) work with any backend
75/// - The BLE transport layer is document-format agnostic
76pub struct DocumentSync {
77    /// Our node ID
78    node_id: NodeId,
79
80    /// CRDT G-Counter for mesh activity tracking
81    counter: RwLock<GCounter>,
82
83    /// Peripheral data (callsign, type, location)
84    peripheral: RwLock<Peripheral>,
85
86    /// Active emergency event with ACK tracking (CRDT)
87    emergency: RwLock<Option<EmergencyEvent>>,
88
89    /// Chat CRDT for mesh-wide messaging (requires `legacy-chat` feature)
90    #[cfg(feature = "legacy-chat")]
91    chat: RwLock<Option<ChatCRDT>>,
92
93    /// Document version (monotonically increasing)
94    version: AtomicU32,
95}
96
97impl DocumentSync {
98    /// Create a new document sync manager
99    pub fn new(node_id: NodeId, callsign: &str) -> Self {
100        let peripheral = Peripheral::new(node_id.as_u32(), PeripheralType::SoldierSensor)
101            .with_callsign(callsign);
102
103        Self {
104            node_id,
105            counter: RwLock::new(GCounter::new()),
106            peripheral: RwLock::new(peripheral),
107            emergency: RwLock::new(None),
108            #[cfg(feature = "legacy-chat")]
109            chat: RwLock::new(None),
110            version: AtomicU32::new(1),
111        }
112    }
113
114    /// Create with a specific peripheral type
115    pub fn with_peripheral_type(node_id: NodeId, callsign: &str, ptype: PeripheralType) -> Self {
116        let peripheral = Peripheral::new(node_id.as_u32(), ptype).with_callsign(callsign);
117
118        Self {
119            node_id,
120            counter: RwLock::new(GCounter::new()),
121            peripheral: RwLock::new(peripheral),
122            emergency: RwLock::new(None),
123            #[cfg(feature = "legacy-chat")]
124            chat: RwLock::new(None),
125            version: AtomicU32::new(1),
126        }
127    }
128
129    /// Get our node ID
130    pub fn node_id(&self) -> NodeId {
131        self.node_id
132    }
133
134    /// Get the current document version
135    pub fn version(&self) -> u32 {
136        self.version.load(Ordering::Relaxed)
137    }
138
139    /// Get the total counter value
140    pub fn total_count(&self) -> u64 {
141        self.counter.read().unwrap().value()
142    }
143
144    /// Get our counter contribution
145    pub fn local_count(&self) -> u64 {
146        self.counter.read().unwrap().node_count(&self.node_id)
147    }
148
149    /// Get current event type (if any)
150    pub fn current_event(&self) -> Option<EventType> {
151        self.peripheral
152            .read()
153            .unwrap()
154            .last_event
155            .as_ref()
156            .map(|e| e.event_type)
157    }
158
159    /// Check if we're in emergency state
160    pub fn is_emergency_active(&self) -> bool {
161        self.current_event() == Some(EventType::Emergency)
162    }
163
164    /// Check if we've sent an ACK
165    pub fn is_ack_active(&self) -> bool {
166        self.current_event() == Some(EventType::Ack)
167    }
168
169    /// Get the callsign
170    pub fn callsign(&self) -> String {
171        self.peripheral.read().unwrap().callsign_str().to_string()
172    }
173
174    // ==================== State Mutations ====================
175
176    /// Send an emergency - returns the document bytes to broadcast
177    pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
178        // Set emergency event
179        {
180            let mut peripheral = self.peripheral.write().unwrap();
181            peripheral.set_event(EventType::Emergency, timestamp);
182        }
183
184        // Increment counter
185        self.increment_counter_internal();
186
187        // Build and return document
188        self.build_document()
189    }
190
191    /// Send an ACK - returns the document bytes to broadcast
192    pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
193        // Set ACK event
194        {
195            let mut peripheral = self.peripheral.write().unwrap();
196            peripheral.set_event(EventType::Ack, timestamp);
197        }
198
199        // Increment counter
200        self.increment_counter_internal();
201
202        // Build and return document
203        self.build_document()
204    }
205
206    /// Clear the current event
207    pub fn clear_event(&self) {
208        let mut peripheral = self.peripheral.write().unwrap();
209        peripheral.clear_event();
210        self.bump_version();
211    }
212
213    /// Increment the counter (for periodic sync)
214    pub fn increment_counter(&self) {
215        self.increment_counter_internal();
216    }
217
218    /// Update health status (battery percentage)
219    pub fn update_health(&self, battery_percent: u8) {
220        let mut peripheral = self.peripheral.write().unwrap();
221        peripheral.health.battery_percent = battery_percent;
222        self.bump_version();
223    }
224
225    /// Update activity level (0=still, 1=walking, 2=running, 3=fall)
226    pub fn update_activity(&self, activity: u8) {
227        let mut peripheral = self.peripheral.write().unwrap();
228        peripheral.health.activity = activity;
229        self.bump_version();
230    }
231
232    /// Update full health status (battery and activity)
233    pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
234        let mut peripheral = self.peripheral.write().unwrap();
235        peripheral.health.battery_percent = battery_percent;
236        peripheral.health.activity = activity;
237        self.bump_version();
238    }
239
240    /// Update heart rate
241    pub fn update_heart_rate(&self, heart_rate: u8) {
242        let mut peripheral = self.peripheral.write().unwrap();
243        peripheral.health.heart_rate = Some(heart_rate);
244        self.bump_version();
245    }
246
247    /// Update location
248    pub fn update_location(&self, latitude: f32, longitude: f32, altitude: Option<f32>) {
249        let mut peripheral = self.peripheral.write().unwrap();
250        peripheral.set_location(latitude, longitude, altitude);
251        self.bump_version();
252    }
253
254    /// Clear location
255    pub fn clear_location(&self) {
256        let mut peripheral = self.peripheral.write().unwrap();
257        peripheral.clear_location();
258        self.bump_version();
259    }
260
261    /// Update callsign
262    pub fn update_callsign(&self, callsign: &str) {
263        let mut peripheral = self.peripheral.write().unwrap();
264        peripheral.set_callsign(callsign);
265        self.bump_version();
266    }
267
268    /// Set peripheral event type
269    pub fn set_peripheral_event(&self, event_type: EventType, timestamp: u64) {
270        let mut peripheral = self.peripheral.write().unwrap();
271        peripheral.set_event(event_type, timestamp);
272        self.bump_version();
273    }
274
275    /// Clear peripheral event
276    pub fn clear_peripheral_event(&self) {
277        let mut peripheral = self.peripheral.write().unwrap();
278        peripheral.clear_event();
279        self.bump_version();
280    }
281
282    /// Update full peripheral state in one call
283    ///
284    /// Takes many parameters for efficiency - allows updating all state in one call
285    /// rather than multiple JNI calls from Android.
286    #[allow(clippy::too_many_arguments)]
287    pub fn update_peripheral_state(
288        &self,
289        callsign: &str,
290        battery_percent: u8,
291        heart_rate: Option<u8>,
292        latitude: Option<f32>,
293        longitude: Option<f32>,
294        altitude: Option<f32>,
295        event_type: Option<EventType>,
296        timestamp: u64,
297    ) {
298        let mut peripheral = self.peripheral.write().unwrap();
299        peripheral.set_callsign(callsign);
300        peripheral.health.battery_percent = battery_percent;
301        if let Some(hr) = heart_rate {
302            peripheral.health.heart_rate = Some(hr);
303        }
304        if let (Some(lat), Some(lon)) = (latitude, longitude) {
305            peripheral.set_location(lat, lon, altitude);
306        } else {
307            peripheral.clear_location();
308        }
309        if let Some(evt) = event_type {
310            peripheral.set_event(evt, timestamp);
311        }
312        peripheral.timestamp = timestamp;
313        drop(peripheral);
314        self.bump_version();
315    }
316
317    // ==================== Emergency Management ====================
318
319    /// Start a new emergency event
320    ///
321    /// Creates an emergency event that tracks ACKs from all known peers.
322    /// Returns the document bytes to broadcast.
323    pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
324        // Create emergency event with our node as source
325        {
326            let mut emergency = self.emergency.write().unwrap();
327            *emergency = Some(EmergencyEvent::new(
328                self.node_id.as_u32(),
329                timestamp,
330                known_peers,
331            ));
332        }
333
334        // Also set peripheral event for backward compatibility
335        {
336            let mut peripheral = self.peripheral.write().unwrap();
337            peripheral.set_event(EventType::Emergency, timestamp);
338        }
339
340        self.increment_counter_internal();
341        self.build_document()
342    }
343
344    /// Record our ACK for the current emergency
345    ///
346    /// Returns the document bytes to broadcast, or None if no emergency is active.
347    pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
348        let changed = {
349            let mut emergency = self.emergency.write().unwrap();
350            if let Some(ref mut e) = *emergency {
351                e.ack(self.node_id.as_u32())
352            } else {
353                return None;
354            }
355        };
356
357        if changed {
358            // Also set peripheral event for backward compatibility
359            {
360                let mut peripheral = self.peripheral.write().unwrap();
361                peripheral.set_event(EventType::Ack, timestamp);
362            }
363
364            self.increment_counter_internal();
365        }
366
367        Some(self.build_document())
368    }
369
370    /// Clear the current emergency event
371    pub fn clear_emergency(&self) {
372        let mut emergency = self.emergency.write().unwrap();
373        if emergency.is_some() {
374            *emergency = None;
375            drop(emergency);
376
377            // Also clear peripheral event
378            let mut peripheral = self.peripheral.write().unwrap();
379            peripheral.clear_event();
380
381            self.bump_version();
382        }
383    }
384
385    /// Check if there's an active emergency
386    pub fn has_active_emergency(&self) -> bool {
387        self.emergency.read().unwrap().is_some()
388    }
389
390    /// Get emergency status info
391    ///
392    /// Returns (source_node, timestamp, acked_count, pending_count) if emergency is active.
393    pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
394        let emergency = self.emergency.read().unwrap();
395        emergency.as_ref().map(|e| {
396            (
397                e.source_node(),
398                e.timestamp(),
399                e.ack_count(),
400                e.pending_nodes().len(),
401            )
402        })
403    }
404
405    /// Check if a specific peer has ACKed the current emergency
406    pub fn has_peer_acked(&self, peer_id: u32) -> bool {
407        let emergency = self.emergency.read().unwrap();
408        emergency
409            .as_ref()
410            .map(|e| e.has_acked(peer_id))
411            .unwrap_or(false)
412    }
413
414    /// Check if all peers have ACKed the current emergency
415    pub fn all_peers_acked(&self) -> bool {
416        let emergency = self.emergency.read().unwrap();
417        emergency.as_ref().map(|e| e.all_acked()).unwrap_or(true)
418    }
419
420    // ==================== Chat Methods (requires `legacy-chat` feature) ====================
421
422    /// Add a chat message to the local CRDT
423    ///
424    /// Returns true if the message was new (not a duplicate).
425    #[cfg(feature = "legacy-chat")]
426    pub fn add_chat_message(&self, sender: &str, text: &str, timestamp: u64) -> bool {
427        let mut chat = self.chat.write().unwrap();
428
429        let our_chat = chat.get_or_insert_with(ChatCRDT::new);
430        let msg = ChatMessage::new(self.node_id.as_u32(), timestamp, sender, text);
431
432        if our_chat.add_message(msg) {
433            self.bump_version();
434            true
435        } else {
436            false
437        }
438    }
439
440    /// Add a chat reply to the local CRDT
441    ///
442    /// Returns true if the message was new.
443    #[cfg(feature = "legacy-chat")]
444    pub fn add_chat_reply(
445        &self,
446        sender: &str,
447        text: &str,
448        reply_to_node: u32,
449        reply_to_timestamp: u64,
450        timestamp: u64,
451    ) -> bool {
452        let mut chat = self.chat.write().unwrap();
453
454        let our_chat = chat.get_or_insert_with(ChatCRDT::new);
455        let mut msg = ChatMessage::new(self.node_id.as_u32(), timestamp, sender, text);
456        msg.set_reply_to(reply_to_node, reply_to_timestamp);
457
458        if our_chat.add_message(msg) {
459            self.bump_version();
460            true
461        } else {
462            false
463        }
464    }
465
466    /// Get the number of chat messages
467    #[cfg(feature = "legacy-chat")]
468    pub fn chat_count(&self) -> usize {
469        self.chat.read().unwrap().as_ref().map_or(0, |c| c.len())
470    }
471
472    /// Get chat messages newer than a timestamp
473    ///
474    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
475    #[cfg(feature = "legacy-chat")]
476    pub fn chat_messages_since(
477        &self,
478        since_timestamp: u64,
479    ) -> Vec<(u32, u64, String, String, u32, u64)> {
480        let chat = self.chat.read().unwrap();
481        chat.as_ref()
482            .map(|c| {
483                c.messages_since(since_timestamp)
484                    .map(|m| {
485                        (
486                            m.origin_node,
487                            m.timestamp,
488                            m.sender().to_string(),
489                            m.text().to_string(),
490                            m.reply_to_node,
491                            m.reply_to_timestamp,
492                        )
493                    })
494                    .collect()
495            })
496            .unwrap_or_default()
497    }
498
499    /// Get all chat messages
500    ///
501    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
502    #[cfg(feature = "legacy-chat")]
503    pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
504        self.chat_messages_since(0)
505    }
506
507    /// Get a snapshot of the chat CRDT
508    #[cfg(feature = "legacy-chat")]
509    pub fn chat_snapshot(&self) -> Option<ChatCRDT> {
510        self.chat.read().unwrap().clone()
511    }
512
513    // ==================== Delta Document Support ====================
514
515    /// Get all counter entries for delta document building
516    ///
517    /// Returns a vector of (node_id, count) pairs for all nodes
518    /// that have contributed to the counter.
519    pub fn counter_entries(&self) -> Vec<(u32, u64)> {
520        self.counter.read().unwrap().entries().collect()
521    }
522
523    /// Get a clone of the peripheral state
524    ///
525    /// Used for building delta documents with peripheral updates.
526    pub fn peripheral_snapshot(&self) -> Peripheral {
527        self.peripheral.read().unwrap().clone()
528    }
529
530    /// Get a clone of the emergency state
531    ///
532    /// Used for building delta documents with emergency data.
533    pub fn emergency_snapshot(&self) -> Option<EmergencyEvent> {
534        self.emergency.read().unwrap().clone()
535    }
536
537    // ==================== Document I/O ====================
538
539    /// Build the document for transmission
540    ///
541    /// Returns the encoded bytes ready for BLE GATT write.
542    pub fn build_document(&self) -> Vec<u8> {
543        let counter = self.counter.read().unwrap().clone();
544        let peripheral = self.peripheral.read().unwrap().clone();
545        let emergency = self.emergency.read().unwrap().clone();
546
547        // Use for_sync() to limit chat messages in the document
548        // This prevents exceeding BLE MTU limits while keeping full history locally
549        #[cfg(feature = "legacy-chat")]
550        let chat = self.chat.read().unwrap().as_ref().map(|c| c.for_sync());
551
552        let doc = HiveDocument {
553            version: self.version.load(Ordering::Relaxed),
554            node_id: self.node_id,
555            counter,
556            peripheral: Some(peripheral),
557            emergency,
558            #[cfg(feature = "legacy-chat")]
559            chat,
560        };
561
562        doc.encode()
563    }
564
565    /// Merge a received document
566    ///
567    /// Returns `Some(MergeResult)` if the document was valid, `None` otherwise.
568    /// The result contains information about what changed and any events.
569    pub fn merge_document(&self, data: &[u8]) -> Option<MergeResult> {
570        let received = HiveDocument::decode(data)?;
571
572        // Don't process our own documents
573        if received.node_id == self.node_id {
574            return None;
575        }
576
577        // Merge the counter
578        let counter_changed = {
579            let mut counter = self.counter.write().unwrap();
580            let old_value = counter.value();
581            counter.merge(&received.counter);
582            counter.value() != old_value
583        };
584
585        // Merge emergency event (CRDT merge)
586        let emergency_changed = if let Some(ref received_emergency) = received.emergency {
587            let mut emergency = self.emergency.write().unwrap();
588            match &mut *emergency {
589                Some(ref mut our_emergency) => our_emergency.merge(received_emergency),
590                None => {
591                    *emergency = Some(received_emergency.clone());
592                    true
593                }
594            }
595        } else {
596            false
597        };
598
599        // Merge chat CRDT
600        #[cfg(feature = "legacy-chat")]
601        let chat_changed = if let Some(ref received_chat) = received.chat {
602            if !received_chat.is_empty() {
603                let mut chat = self.chat.write().unwrap();
604                match &mut *chat {
605                    Some(ref mut our_chat) => our_chat.merge(received_chat),
606                    None => {
607                        *chat = Some(received_chat.clone());
608                        true
609                    }
610                }
611            } else {
612                false
613            }
614        } else {
615            false
616        };
617        #[cfg(not(feature = "legacy-chat"))]
618        let chat_changed = false;
619
620        if counter_changed || emergency_changed || chat_changed {
621            self.bump_version();
622        }
623
624        // Extract event from received document
625        let event = received
626            .peripheral
627            .as_ref()
628            .and_then(|p| p.last_event.clone());
629
630        Some(MergeResult {
631            source_node: received.node_id,
632            event,
633            peer_peripheral: received.peripheral,
634            counter_changed,
635            emergency_changed,
636            chat_changed,
637            total_count: self.total_count(),
638        })
639    }
640
641    /// Create a document from raw bytes (for inspection without merging)
642    pub fn decode_document(data: &[u8]) -> Option<HiveDocument> {
643        HiveDocument::decode(data)
644    }
645
646    // ==================== Internal Helpers ====================
647
648    fn increment_counter_internal(&self) {
649        let mut counter = self.counter.write().unwrap();
650        counter.increment(&self.node_id, 1);
651        drop(counter);
652        self.bump_version();
653    }
654
655    fn bump_version(&self) {
656        self.version.fetch_add(1, Ordering::Relaxed);
657    }
658}
659
660/// Result from checking if a document contains an emergency
661#[derive(Debug, Clone)]
662pub struct DocumentCheck {
663    /// Node ID from the document
664    pub node_id: NodeId,
665    /// Whether this document contains an emergency
666    pub is_emergency: bool,
667    /// Whether this document contains an ACK
668    pub is_ack: bool,
669}
670
671impl DocumentCheck {
672    /// Quick check of a document without full parsing
673    pub fn from_document(data: &[u8]) -> Option<Self> {
674        let doc = HiveDocument::decode(data)?;
675
676        let (is_emergency, is_ack) = doc
677            .peripheral
678            .as_ref()
679            .and_then(|p| p.last_event.as_ref())
680            .map(|e| {
681                (
682                    e.event_type == EventType::Emergency,
683                    e.event_type == EventType::Ack,
684                )
685            })
686            .unwrap_or((false, false));
687
688        Some(Self {
689            node_id: doc.node_id,
690            is_emergency,
691            is_ack,
692        })
693    }
694}
695
696#[cfg(test)]
697mod tests {
698    use super::*;
699
700    // Valid timestamp for testing (2024-01-15 00:00:00 UTC)
701    const TEST_TIMESTAMP: u64 = 1705276800000;
702
703    #[test]
704    fn test_document_sync_new() {
705        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
706
707        assert_eq!(sync.node_id().as_u32(), 0x12345678);
708        assert_eq!(sync.version(), 1);
709        assert_eq!(sync.total_count(), 0);
710        assert_eq!(sync.callsign(), "ALPHA-1");
711        assert!(sync.current_event().is_none());
712    }
713
714    #[test]
715    fn test_send_emergency() {
716        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
717
718        let doc_bytes = sync.send_emergency(TEST_TIMESTAMP);
719
720        assert!(!doc_bytes.is_empty());
721        assert_eq!(sync.total_count(), 1);
722        assert!(sync.is_emergency_active());
723        assert!(!sync.is_ack_active());
724
725        // Verify we can decode what we sent
726        let doc = HiveDocument::decode(&doc_bytes).unwrap();
727        assert_eq!(doc.node_id.as_u32(), 0x12345678);
728        assert!(doc.peripheral.is_some());
729        let event = doc.peripheral.unwrap().last_event.unwrap();
730        assert_eq!(event.event_type, EventType::Emergency);
731    }
732
733    #[test]
734    fn test_send_ack() {
735        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
736
737        let doc_bytes = sync.send_ack(TEST_TIMESTAMP);
738
739        assert!(!doc_bytes.is_empty());
740        assert_eq!(sync.total_count(), 1);
741        assert!(sync.is_ack_active());
742        assert!(!sync.is_emergency_active());
743    }
744
745    #[test]
746    fn test_clear_event() {
747        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
748
749        sync.send_emergency(TEST_TIMESTAMP);
750        assert!(sync.is_emergency_active());
751
752        sync.clear_event();
753        assert!(sync.current_event().is_none());
754    }
755
756    #[test]
757    fn test_merge_document() {
758        let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
759        let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
760
761        // sync2 sends emergency
762        let doc_bytes = sync2.send_emergency(TEST_TIMESTAMP);
763
764        // sync1 receives and merges
765        let result = sync1.merge_document(&doc_bytes);
766        assert!(result.is_some());
767
768        let result = result.unwrap();
769        assert_eq!(result.source_node.as_u32(), 0x22222222);
770        assert!(result.is_emergency());
771        assert!(result.counter_changed);
772        assert_eq!(result.total_count, 1);
773
774        // sync1's local count is still 0, but total includes sync2's contribution
775        assert_eq!(sync1.local_count(), 0);
776        assert_eq!(sync1.total_count(), 1);
777    }
778
779    #[test]
780    fn test_merge_own_document_ignored() {
781        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
782
783        let doc_bytes = sync.send_emergency(TEST_TIMESTAMP);
784
785        // Merging our own document should be ignored
786        let result = sync.merge_document(&doc_bytes);
787        assert!(result.is_none());
788    }
789
790    #[test]
791    fn test_version_increments() {
792        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
793
794        assert_eq!(sync.version(), 1);
795
796        sync.increment_counter();
797        assert_eq!(sync.version(), 2);
798
799        sync.send_emergency(TEST_TIMESTAMP);
800        assert_eq!(sync.version(), 3);
801
802        sync.clear_event();
803        assert_eq!(sync.version(), 4);
804    }
805
806    #[test]
807    fn test_document_check() {
808        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
809
810        let emergency_doc = sync.send_emergency(TEST_TIMESTAMP);
811        let check = DocumentCheck::from_document(&emergency_doc).unwrap();
812        assert_eq!(check.node_id.as_u32(), 0x12345678);
813        assert!(check.is_emergency);
814        assert!(!check.is_ack);
815
816        sync.clear_event();
817        let ack_doc = sync.send_ack(TEST_TIMESTAMP + 1000);
818        let check = DocumentCheck::from_document(&ack_doc).unwrap();
819        assert!(!check.is_emergency);
820        assert!(check.is_ack);
821    }
822
823    #[test]
824    fn test_counter_merge_idempotent() {
825        let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
826        let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
827
828        // sync2 sends something
829        let doc_bytes = sync2.send_emergency(TEST_TIMESTAMP);
830
831        // sync1 merges twice - second should not change counter
832        let result1 = sync1.merge_document(&doc_bytes).unwrap();
833        assert!(result1.counter_changed);
834        assert_eq!(sync1.total_count(), 1);
835
836        let result2 = sync1.merge_document(&doc_bytes).unwrap();
837        assert!(!result2.counter_changed); // No change on re-merge
838        assert_eq!(sync1.total_count(), 1);
839    }
840}