Skip to main content

hive_btle/
document_sync.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Document synchronization for HIVE BLE mesh
17//!
18//! This module provides centralized document state management for HIVE-Lite nodes.
19//! It manages the local CRDT state (GCounter) and handles merging with received documents.
20//!
21//! ## Design Notes
22//!
23//! This implementation uses a simple GCounter for resource-constrained devices (ESP32,
24//! smartwatches). For full HIVE nodes using AutomergeIroh, this component can be replaced
25//! or extended - the observer pattern and BLE transport layer are independent of the
26//! document format.
27//!
28//! ## Usage
29//!
30//! ```ignore
31//! use hive_btle::document_sync::DocumentSync;
32//! use hive_btle::NodeId;
33//!
34//! let sync = DocumentSync::new(NodeId::new(0x12345678), "SOLDIER-1");
35//!
36//! // Trigger an emergency
37//! let doc_bytes = sync.send_emergency();
38//! // ... broadcast doc_bytes over BLE
39//!
40//! // Handle received document
41//! if let Some(result) = sync.merge_document(&received_data) {
42//!     if result.is_emergency() {
43//!         println!("EMERGENCY from {:08X}", result.source_node.as_u32());
44//!     }
45//! }
46//! ```
47
48#[cfg(not(feature = "std"))]
49use alloc::{string::String, vec::Vec};
50#[cfg(feature = "std")]
51use std::sync::RwLock;
52
53#[cfg(not(feature = "std"))]
54use spin::RwLock;
55
56use core::sync::atomic::{AtomicU32, Ordering};
57
58use crate::document::{HiveDocument, MergeResult};
59use crate::sync::crdt::{
60    ChatCRDT, ChatMessage, EmergencyEvent, EventType, GCounter, Peripheral, PeripheralType,
61};
62use crate::NodeId;
63
64/// Document synchronization manager for HIVE-Lite nodes
65///
66/// Manages the local CRDT state and handles document serialization/merging.
67/// Thread-safe for use from multiple BLE callbacks.
68///
69/// ## Integration with Full HIVE
70///
71/// This implementation uses a simple GCounter suitable for embedded devices.
72/// For integration with the larger HIVE project using AutomergeIroh:
73/// - The `build_document()` output can be wrapped in an Automerge-compatible format
74/// - The observer events (Emergency, Ack, DocumentSynced) work with any backend
75/// - The BLE transport layer is document-format agnostic
76pub struct DocumentSync {
77    /// Our node ID
78    node_id: NodeId,
79
80    /// CRDT G-Counter for mesh activity tracking
81    counter: RwLock<GCounter>,
82
83    /// Peripheral data (callsign, type, location)
84    peripheral: RwLock<Peripheral>,
85
86    /// Active emergency event with ACK tracking (CRDT)
87    emergency: RwLock<Option<EmergencyEvent>>,
88
89    /// Chat CRDT for mesh-wide messaging
90    chat: RwLock<Option<ChatCRDT>>,
91
92    /// Document version (monotonically increasing)
93    version: AtomicU32,
94}
95
96impl DocumentSync {
97    /// Create a new document sync manager
98    pub fn new(node_id: NodeId, callsign: &str) -> Self {
99        let peripheral = Peripheral::new(node_id.as_u32(), PeripheralType::SoldierSensor)
100            .with_callsign(callsign);
101
102        Self {
103            node_id,
104            counter: RwLock::new(GCounter::new()),
105            peripheral: RwLock::new(peripheral),
106            emergency: RwLock::new(None),
107            chat: RwLock::new(None),
108            version: AtomicU32::new(1),
109        }
110    }
111
112    /// Create with a specific peripheral type
113    pub fn with_peripheral_type(node_id: NodeId, callsign: &str, ptype: PeripheralType) -> Self {
114        let peripheral = Peripheral::new(node_id.as_u32(), ptype).with_callsign(callsign);
115
116        Self {
117            node_id,
118            counter: RwLock::new(GCounter::new()),
119            peripheral: RwLock::new(peripheral),
120            emergency: RwLock::new(None),
121            chat: RwLock::new(None),
122            version: AtomicU32::new(1),
123        }
124    }
125
126    /// Get our node ID
127    pub fn node_id(&self) -> NodeId {
128        self.node_id
129    }
130
131    /// Get the current document version
132    pub fn version(&self) -> u32 {
133        self.version.load(Ordering::Relaxed)
134    }
135
136    /// Get the total counter value
137    pub fn total_count(&self) -> u64 {
138        self.counter.read().unwrap().value()
139    }
140
141    /// Get our counter contribution
142    pub fn local_count(&self) -> u64 {
143        self.counter.read().unwrap().node_count(&self.node_id)
144    }
145
146    /// Get current event type (if any)
147    pub fn current_event(&self) -> Option<EventType> {
148        self.peripheral
149            .read()
150            .unwrap()
151            .last_event
152            .as_ref()
153            .map(|e| e.event_type)
154    }
155
156    /// Check if we're in emergency state
157    pub fn is_emergency_active(&self) -> bool {
158        self.current_event() == Some(EventType::Emergency)
159    }
160
161    /// Check if we've sent an ACK
162    pub fn is_ack_active(&self) -> bool {
163        self.current_event() == Some(EventType::Ack)
164    }
165
166    /// Get the callsign
167    pub fn callsign(&self) -> String {
168        self.peripheral.read().unwrap().callsign_str().to_string()
169    }
170
171    // ==================== State Mutations ====================
172
173    /// Send an emergency - returns the document bytes to broadcast
174    pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
175        // Set emergency event
176        {
177            let mut peripheral = self.peripheral.write().unwrap();
178            peripheral.set_event(EventType::Emergency, timestamp);
179        }
180
181        // Increment counter
182        self.increment_counter_internal();
183
184        // Build and return document
185        self.build_document()
186    }
187
188    /// Send an ACK - returns the document bytes to broadcast
189    pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
190        // Set ACK event
191        {
192            let mut peripheral = self.peripheral.write().unwrap();
193            peripheral.set_event(EventType::Ack, timestamp);
194        }
195
196        // Increment counter
197        self.increment_counter_internal();
198
199        // Build and return document
200        self.build_document()
201    }
202
203    /// Clear the current event
204    pub fn clear_event(&self) {
205        let mut peripheral = self.peripheral.write().unwrap();
206        peripheral.clear_event();
207        self.bump_version();
208    }
209
210    /// Increment the counter (for periodic sync)
211    pub fn increment_counter(&self) {
212        self.increment_counter_internal();
213    }
214
215    /// Update health status (battery percentage)
216    pub fn update_health(&self, battery_percent: u8) {
217        let mut peripheral = self.peripheral.write().unwrap();
218        peripheral.health.battery_percent = battery_percent;
219        self.bump_version();
220    }
221
222    /// Update activity level (0=still, 1=walking, 2=running, 3=fall)
223    pub fn update_activity(&self, activity: u8) {
224        let mut peripheral = self.peripheral.write().unwrap();
225        peripheral.health.activity = activity;
226        self.bump_version();
227    }
228
229    /// Update full health status (battery and activity)
230    pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
231        let mut peripheral = self.peripheral.write().unwrap();
232        peripheral.health.battery_percent = battery_percent;
233        peripheral.health.activity = activity;
234        self.bump_version();
235    }
236
237    /// Update heart rate
238    pub fn update_heart_rate(&self, heart_rate: u8) {
239        let mut peripheral = self.peripheral.write().unwrap();
240        peripheral.health.heart_rate = Some(heart_rate);
241        self.bump_version();
242    }
243
244    /// Update location
245    pub fn update_location(&self, latitude: f32, longitude: f32, altitude: Option<f32>) {
246        let mut peripheral = self.peripheral.write().unwrap();
247        peripheral.set_location(latitude, longitude, altitude);
248        self.bump_version();
249    }
250
251    /// Clear location
252    pub fn clear_location(&self) {
253        let mut peripheral = self.peripheral.write().unwrap();
254        peripheral.clear_location();
255        self.bump_version();
256    }
257
258    /// Update callsign
259    pub fn update_callsign(&self, callsign: &str) {
260        let mut peripheral = self.peripheral.write().unwrap();
261        peripheral.set_callsign(callsign);
262        self.bump_version();
263    }
264
265    /// Set peripheral event type
266    pub fn set_peripheral_event(&self, event_type: EventType, timestamp: u64) {
267        let mut peripheral = self.peripheral.write().unwrap();
268        peripheral.set_event(event_type, timestamp);
269        self.bump_version();
270    }
271
272    /// Clear peripheral event
273    pub fn clear_peripheral_event(&self) {
274        let mut peripheral = self.peripheral.write().unwrap();
275        peripheral.clear_event();
276        self.bump_version();
277    }
278
279    /// Update full peripheral state in one call
280    ///
281    /// Takes many parameters for efficiency - allows updating all state in one call
282    /// rather than multiple JNI calls from Android.
283    #[allow(clippy::too_many_arguments)]
284    pub fn update_peripheral_state(
285        &self,
286        callsign: &str,
287        battery_percent: u8,
288        heart_rate: Option<u8>,
289        latitude: Option<f32>,
290        longitude: Option<f32>,
291        altitude: Option<f32>,
292        event_type: Option<EventType>,
293        timestamp: u64,
294    ) {
295        let mut peripheral = self.peripheral.write().unwrap();
296        peripheral.set_callsign(callsign);
297        peripheral.health.battery_percent = battery_percent;
298        if let Some(hr) = heart_rate {
299            peripheral.health.heart_rate = Some(hr);
300        }
301        if let (Some(lat), Some(lon)) = (latitude, longitude) {
302            peripheral.set_location(lat, lon, altitude);
303        } else {
304            peripheral.clear_location();
305        }
306        if let Some(evt) = event_type {
307            peripheral.set_event(evt, timestamp);
308        }
309        peripheral.timestamp = timestamp;
310        drop(peripheral);
311        self.bump_version();
312    }
313
314    // ==================== Emergency Management ====================
315
316    /// Start a new emergency event
317    ///
318    /// Creates an emergency event that tracks ACKs from all known peers.
319    /// Returns the document bytes to broadcast.
320    pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
321        // Create emergency event with our node as source
322        {
323            let mut emergency = self.emergency.write().unwrap();
324            *emergency = Some(EmergencyEvent::new(
325                self.node_id.as_u32(),
326                timestamp,
327                known_peers,
328            ));
329        }
330
331        // Also set peripheral event for backward compatibility
332        {
333            let mut peripheral = self.peripheral.write().unwrap();
334            peripheral.set_event(EventType::Emergency, timestamp);
335        }
336
337        self.increment_counter_internal();
338        self.build_document()
339    }
340
341    /// Record our ACK for the current emergency
342    ///
343    /// Returns the document bytes to broadcast, or None if no emergency is active.
344    pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
345        let changed = {
346            let mut emergency = self.emergency.write().unwrap();
347            if let Some(ref mut e) = *emergency {
348                e.ack(self.node_id.as_u32())
349            } else {
350                return None;
351            }
352        };
353
354        if changed {
355            // Also set peripheral event for backward compatibility
356            {
357                let mut peripheral = self.peripheral.write().unwrap();
358                peripheral.set_event(EventType::Ack, timestamp);
359            }
360
361            self.increment_counter_internal();
362        }
363
364        Some(self.build_document())
365    }
366
367    /// Clear the current emergency event
368    pub fn clear_emergency(&self) {
369        let mut emergency = self.emergency.write().unwrap();
370        if emergency.is_some() {
371            *emergency = None;
372            drop(emergency);
373
374            // Also clear peripheral event
375            let mut peripheral = self.peripheral.write().unwrap();
376            peripheral.clear_event();
377
378            self.bump_version();
379        }
380    }
381
382    /// Check if there's an active emergency
383    pub fn has_active_emergency(&self) -> bool {
384        self.emergency.read().unwrap().is_some()
385    }
386
387    /// Get emergency status info
388    ///
389    /// Returns (source_node, timestamp, acked_count, pending_count) if emergency is active.
390    pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
391        let emergency = self.emergency.read().unwrap();
392        emergency.as_ref().map(|e| {
393            (
394                e.source_node(),
395                e.timestamp(),
396                e.ack_count(),
397                e.pending_nodes().len(),
398            )
399        })
400    }
401
402    /// Check if a specific peer has ACKed the current emergency
403    pub fn has_peer_acked(&self, peer_id: u32) -> bool {
404        let emergency = self.emergency.read().unwrap();
405        emergency
406            .as_ref()
407            .map(|e| e.has_acked(peer_id))
408            .unwrap_or(false)
409    }
410
411    /// Check if all peers have ACKed the current emergency
412    pub fn all_peers_acked(&self) -> bool {
413        let emergency = self.emergency.read().unwrap();
414        emergency.as_ref().map(|e| e.all_acked()).unwrap_or(true)
415    }
416
417    // ==================== Chat Methods ====================
418
419    /// Add a chat message to the local CRDT
420    ///
421    /// Returns true if the message was new (not a duplicate).
422    pub fn add_chat_message(&self, sender: &str, text: &str, timestamp: u64) -> bool {
423        let mut chat = self.chat.write().unwrap();
424
425        let our_chat = chat.get_or_insert_with(ChatCRDT::new);
426        let msg = ChatMessage::new(self.node_id.as_u32(), timestamp, sender, text);
427
428        if our_chat.add_message(msg) {
429            self.bump_version();
430            true
431        } else {
432            false
433        }
434    }
435
436    /// Add a chat reply to the local CRDT
437    ///
438    /// Returns true if the message was new.
439    pub fn add_chat_reply(
440        &self,
441        sender: &str,
442        text: &str,
443        reply_to_node: u32,
444        reply_to_timestamp: u64,
445        timestamp: u64,
446    ) -> bool {
447        let mut chat = self.chat.write().unwrap();
448
449        let our_chat = chat.get_or_insert_with(ChatCRDT::new);
450        let mut msg = ChatMessage::new(self.node_id.as_u32(), timestamp, sender, text);
451        msg.set_reply_to(reply_to_node, reply_to_timestamp);
452
453        if our_chat.add_message(msg) {
454            self.bump_version();
455            true
456        } else {
457            false
458        }
459    }
460
461    /// Get the number of chat messages
462    pub fn chat_count(&self) -> usize {
463        self.chat.read().unwrap().as_ref().map_or(0, |c| c.len())
464    }
465
466    /// Get chat messages newer than a timestamp
467    ///
468    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
469    pub fn chat_messages_since(
470        &self,
471        since_timestamp: u64,
472    ) -> Vec<(u32, u64, String, String, u32, u64)> {
473        let chat = self.chat.read().unwrap();
474        chat.as_ref()
475            .map(|c| {
476                c.messages_since(since_timestamp)
477                    .map(|m| {
478                        (
479                            m.origin_node,
480                            m.timestamp,
481                            m.sender().to_string(),
482                            m.text().to_string(),
483                            m.reply_to_node,
484                            m.reply_to_timestamp,
485                        )
486                    })
487                    .collect()
488            })
489            .unwrap_or_default()
490    }
491
492    /// Get all chat messages
493    ///
494    /// Returns a vector of (origin_node, timestamp, sender, text, reply_to_node, reply_to_timestamp) tuples.
495    pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
496        self.chat_messages_since(0)
497    }
498
499    /// Get a snapshot of the chat CRDT
500    pub fn chat_snapshot(&self) -> Option<ChatCRDT> {
501        self.chat.read().unwrap().clone()
502    }
503
504    // ==================== Delta Document Support ====================
505
506    /// Get all counter entries for delta document building
507    ///
508    /// Returns a vector of (node_id, count) pairs for all nodes
509    /// that have contributed to the counter.
510    pub fn counter_entries(&self) -> Vec<(u32, u64)> {
511        self.counter.read().unwrap().entries().collect()
512    }
513
514    /// Get a clone of the peripheral state
515    ///
516    /// Used for building delta documents with peripheral updates.
517    pub fn peripheral_snapshot(&self) -> Peripheral {
518        self.peripheral.read().unwrap().clone()
519    }
520
521    /// Get a clone of the emergency state
522    ///
523    /// Used for building delta documents with emergency data.
524    pub fn emergency_snapshot(&self) -> Option<EmergencyEvent> {
525        self.emergency.read().unwrap().clone()
526    }
527
528    // ==================== Document I/O ====================
529
530    /// Build the document for transmission
531    ///
532    /// Returns the encoded bytes ready for BLE GATT write.
533    pub fn build_document(&self) -> Vec<u8> {
534        let counter = self.counter.read().unwrap().clone();
535        let peripheral = self.peripheral.read().unwrap().clone();
536        let emergency = self.emergency.read().unwrap().clone();
537
538        // Use for_sync() to limit chat messages in the document
539        // This prevents exceeding BLE MTU limits while keeping full history locally
540        let chat = self.chat.read().unwrap().as_ref().map(|c| c.for_sync());
541
542        let doc = HiveDocument {
543            version: self.version.load(Ordering::Relaxed),
544            node_id: self.node_id,
545            counter,
546            peripheral: Some(peripheral),
547            emergency,
548            chat,
549        };
550
551        doc.encode()
552    }
553
554    /// Merge a received document
555    ///
556    /// Returns `Some(MergeResult)` if the document was valid, `None` otherwise.
557    /// The result contains information about what changed and any events.
558    pub fn merge_document(&self, data: &[u8]) -> Option<MergeResult> {
559        let received = HiveDocument::decode(data)?;
560
561        // Don't process our own documents
562        if received.node_id == self.node_id {
563            return None;
564        }
565
566        // Merge the counter
567        let counter_changed = {
568            let mut counter = self.counter.write().unwrap();
569            let old_value = counter.value();
570            counter.merge(&received.counter);
571            counter.value() != old_value
572        };
573
574        // Merge emergency event (CRDT merge)
575        let emergency_changed = if let Some(ref received_emergency) = received.emergency {
576            let mut emergency = self.emergency.write().unwrap();
577            match &mut *emergency {
578                Some(ref mut our_emergency) => our_emergency.merge(received_emergency),
579                None => {
580                    *emergency = Some(received_emergency.clone());
581                    true
582                }
583            }
584        } else {
585            false
586        };
587
588        // Merge chat CRDT
589        let chat_changed = if let Some(ref received_chat) = received.chat {
590            if !received_chat.is_empty() {
591                let mut chat = self.chat.write().unwrap();
592                match &mut *chat {
593                    Some(ref mut our_chat) => our_chat.merge(received_chat),
594                    None => {
595                        *chat = Some(received_chat.clone());
596                        true
597                    }
598                }
599            } else {
600                false
601            }
602        } else {
603            false
604        };
605
606        if counter_changed || emergency_changed || chat_changed {
607            self.bump_version();
608        }
609
610        // Extract event from received document
611        let event = received
612            .peripheral
613            .as_ref()
614            .and_then(|p| p.last_event.clone());
615
616        Some(MergeResult {
617            source_node: received.node_id,
618            event,
619            peer_peripheral: received.peripheral,
620            counter_changed,
621            emergency_changed,
622            chat_changed,
623            total_count: self.total_count(),
624        })
625    }
626
627    /// Create a document from raw bytes (for inspection without merging)
628    pub fn decode_document(data: &[u8]) -> Option<HiveDocument> {
629        HiveDocument::decode(data)
630    }
631
632    // ==================== Internal Helpers ====================
633
634    fn increment_counter_internal(&self) {
635        let mut counter = self.counter.write().unwrap();
636        counter.increment(&self.node_id, 1);
637        drop(counter);
638        self.bump_version();
639    }
640
641    fn bump_version(&self) {
642        self.version.fetch_add(1, Ordering::Relaxed);
643    }
644}
645
646/// Result from checking if a document contains an emergency
647#[derive(Debug, Clone)]
648pub struct DocumentCheck {
649    /// Node ID from the document
650    pub node_id: NodeId,
651    /// Whether this document contains an emergency
652    pub is_emergency: bool,
653    /// Whether this document contains an ACK
654    pub is_ack: bool,
655}
656
657impl DocumentCheck {
658    /// Quick check of a document without full parsing
659    pub fn from_document(data: &[u8]) -> Option<Self> {
660        let doc = HiveDocument::decode(data)?;
661
662        let (is_emergency, is_ack) = doc
663            .peripheral
664            .as_ref()
665            .and_then(|p| p.last_event.as_ref())
666            .map(|e| {
667                (
668                    e.event_type == EventType::Emergency,
669                    e.event_type == EventType::Ack,
670                )
671            })
672            .unwrap_or((false, false));
673
674        Some(Self {
675            node_id: doc.node_id,
676            is_emergency,
677            is_ack,
678        })
679    }
680}
681
682#[cfg(test)]
683mod tests {
684    use super::*;
685
686    // Valid timestamp for testing (2024-01-15 00:00:00 UTC)
687    const TEST_TIMESTAMP: u64 = 1705276800000;
688
689    #[test]
690    fn test_document_sync_new() {
691        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
692
693        assert_eq!(sync.node_id().as_u32(), 0x12345678);
694        assert_eq!(sync.version(), 1);
695        assert_eq!(sync.total_count(), 0);
696        assert_eq!(sync.callsign(), "ALPHA-1");
697        assert!(sync.current_event().is_none());
698    }
699
700    #[test]
701    fn test_send_emergency() {
702        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
703
704        let doc_bytes = sync.send_emergency(TEST_TIMESTAMP);
705
706        assert!(!doc_bytes.is_empty());
707        assert_eq!(sync.total_count(), 1);
708        assert!(sync.is_emergency_active());
709        assert!(!sync.is_ack_active());
710
711        // Verify we can decode what we sent
712        let doc = HiveDocument::decode(&doc_bytes).unwrap();
713        assert_eq!(doc.node_id.as_u32(), 0x12345678);
714        assert!(doc.peripheral.is_some());
715        let event = doc.peripheral.unwrap().last_event.unwrap();
716        assert_eq!(event.event_type, EventType::Emergency);
717    }
718
719    #[test]
720    fn test_send_ack() {
721        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
722
723        let doc_bytes = sync.send_ack(TEST_TIMESTAMP);
724
725        assert!(!doc_bytes.is_empty());
726        assert_eq!(sync.total_count(), 1);
727        assert!(sync.is_ack_active());
728        assert!(!sync.is_emergency_active());
729    }
730
731    #[test]
732    fn test_clear_event() {
733        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
734
735        sync.send_emergency(TEST_TIMESTAMP);
736        assert!(sync.is_emergency_active());
737
738        sync.clear_event();
739        assert!(sync.current_event().is_none());
740    }
741
742    #[test]
743    fn test_merge_document() {
744        let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
745        let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
746
747        // sync2 sends emergency
748        let doc_bytes = sync2.send_emergency(TEST_TIMESTAMP);
749
750        // sync1 receives and merges
751        let result = sync1.merge_document(&doc_bytes);
752        assert!(result.is_some());
753
754        let result = result.unwrap();
755        assert_eq!(result.source_node.as_u32(), 0x22222222);
756        assert!(result.is_emergency());
757        assert!(result.counter_changed);
758        assert_eq!(result.total_count, 1);
759
760        // sync1's local count is still 0, but total includes sync2's contribution
761        assert_eq!(sync1.local_count(), 0);
762        assert_eq!(sync1.total_count(), 1);
763    }
764
765    #[test]
766    fn test_merge_own_document_ignored() {
767        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
768
769        let doc_bytes = sync.send_emergency(TEST_TIMESTAMP);
770
771        // Merging our own document should be ignored
772        let result = sync.merge_document(&doc_bytes);
773        assert!(result.is_none());
774    }
775
776    #[test]
777    fn test_version_increments() {
778        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
779
780        assert_eq!(sync.version(), 1);
781
782        sync.increment_counter();
783        assert_eq!(sync.version(), 2);
784
785        sync.send_emergency(TEST_TIMESTAMP);
786        assert_eq!(sync.version(), 3);
787
788        sync.clear_event();
789        assert_eq!(sync.version(), 4);
790    }
791
792    #[test]
793    fn test_document_check() {
794        let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
795
796        let emergency_doc = sync.send_emergency(TEST_TIMESTAMP);
797        let check = DocumentCheck::from_document(&emergency_doc).unwrap();
798        assert_eq!(check.node_id.as_u32(), 0x12345678);
799        assert!(check.is_emergency);
800        assert!(!check.is_ack);
801
802        sync.clear_event();
803        let ack_doc = sync.send_ack(TEST_TIMESTAMP + 1000);
804        let check = DocumentCheck::from_document(&ack_doc).unwrap();
805        assert!(!check.is_emergency);
806        assert!(check.is_ack);
807    }
808
809    #[test]
810    fn test_counter_merge_idempotent() {
811        let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
812        let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
813
814        // sync2 sends something
815        let doc_bytes = sync2.send_emergency(TEST_TIMESTAMP);
816
817        // sync1 merges twice - second should not change counter
818        let result1 = sync1.merge_document(&doc_bytes).unwrap();
819        assert!(result1.counter_changed);
820        assert_eq!(sync1.total_count(), 1);
821
822        let result2 = sync1.merge_document(&doc_bytes).unwrap();
823        assert!(!result2.counter_changed); // No change on re-merge
824        assert_eq!(sync1.total_count(), 1);
825    }
826}