Skip to main content

hive_btle/sync/
crdt.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! CRDT (Conflict-free Replicated Data Types) for HIVE-Lite
17//!
18//! Provides lightweight CRDT implementations optimized for BLE sync:
19//! - LWW-Register: Last-Writer-Wins for single values
20//! - G-Counter: Grow-only counter for metrics
21//!
22//! These are designed for minimal memory footprint and efficient
23//! serialization over constrained BLE connections.
24
25#[cfg(not(feature = "std"))]
26use alloc::{collections::BTreeMap, string::String, string::ToString, vec, vec::Vec};
27#[cfg(feature = "std")]
28use std::collections::BTreeMap;
29
30use crate::NodeId;
31
32/// Timestamp for CRDT operations (milliseconds since epoch or monotonic)
33pub type Timestamp = u64;
34
35/// A Last-Writer-Wins Register
36///
37/// Stores a single value where concurrent writes are resolved by timestamp.
38/// Higher timestamp wins. In case of tie, higher node ID wins.
39#[derive(Debug, Clone, PartialEq)]
40pub struct LwwRegister<T: Clone> {
41    /// Current value
42    value: T,
43    /// Timestamp when value was set
44    timestamp: Timestamp,
45    /// Node that set the value
46    node_id: NodeId,
47}
48
49impl<T: Clone + Default> Default for LwwRegister<T> {
50    fn default() -> Self {
51        Self {
52            value: T::default(),
53            timestamp: 0,
54            node_id: NodeId::default(),
55        }
56    }
57}
58
59impl<T: Clone> LwwRegister<T> {
60    /// Create a new register with an initial value
61    pub fn new(value: T, timestamp: Timestamp, node_id: NodeId) -> Self {
62        Self {
63            value,
64            timestamp,
65            node_id,
66        }
67    }
68
69    /// Get the current value
70    pub fn get(&self) -> &T {
71        &self.value
72    }
73
74    /// Get the timestamp
75    pub fn timestamp(&self) -> Timestamp {
76        self.timestamp
77    }
78
79    /// Get the node that set the value
80    pub fn node_id(&self) -> &NodeId {
81        &self.node_id
82    }
83
84    /// Set a new value if it has a higher timestamp
85    ///
86    /// Returns true if the value was updated
87    pub fn set(&mut self, value: T, timestamp: Timestamp, node_id: NodeId) -> bool {
88        if self.should_update(timestamp, &node_id) {
89            self.value = value;
90            self.timestamp = timestamp;
91            self.node_id = node_id;
92            true
93        } else {
94            false
95        }
96    }
97
98    /// Merge with another register (LWW semantics)
99    ///
100    /// Returns true if our value was updated
101    pub fn merge(&mut self, other: &LwwRegister<T>) -> bool {
102        if self.should_update(other.timestamp, &other.node_id) {
103            self.value = other.value.clone();
104            self.timestamp = other.timestamp;
105            self.node_id = other.node_id;
106            true
107        } else {
108            false
109        }
110    }
111
112    /// Check if we should update based on timestamp/node_id
113    fn should_update(&self, timestamp: Timestamp, node_id: &NodeId) -> bool {
114        timestamp > self.timestamp
115            || (timestamp == self.timestamp && node_id.as_u32() > self.node_id.as_u32())
116    }
117}
118
119/// A Grow-only Counter (G-Counter)
120///
121/// Each node maintains its own count, total is the sum of all counts.
122/// Only supports increment operations.
123#[derive(Debug, Clone, Default)]
124pub struct GCounter {
125    /// Per-node counts
126    counts: BTreeMap<u32, u64>,
127}
128
129impl GCounter {
130    /// Create a new empty counter
131    pub fn new() -> Self {
132        Self {
133            counts: BTreeMap::new(),
134        }
135    }
136
137    /// Get the total count
138    pub fn value(&self) -> u64 {
139        self.counts.values().sum()
140    }
141
142    /// Increment the counter for a node
143    pub fn increment(&mut self, node_id: &NodeId, amount: u64) {
144        let count = self.counts.entry(node_id.as_u32()).or_insert(0);
145        *count = count.saturating_add(amount);
146    }
147
148    /// Get the count for a specific node
149    pub fn node_count(&self, node_id: &NodeId) -> u64 {
150        self.counts.get(&node_id.as_u32()).copied().unwrap_or(0)
151    }
152
153    /// Merge with another counter
154    ///
155    /// Takes the max of each node's count
156    pub fn merge(&mut self, other: &GCounter) {
157        for (&node_id, &count) in &other.counts {
158            let our_count = self.counts.entry(node_id).or_insert(0);
159            *our_count = (*our_count).max(count);
160        }
161    }
162
163    /// Get the number of nodes that have contributed
164    pub fn node_count_total(&self) -> usize {
165        self.counts.len()
166    }
167
168    /// Encode to bytes for transmission
169    pub fn encode(&self) -> Vec<u8> {
170        let mut buf = Vec::with_capacity(4 + self.counts.len() * 12);
171        // Number of entries
172        buf.extend_from_slice(&(self.counts.len() as u32).to_le_bytes());
173        // Each entry: node_id (4 bytes) + count (8 bytes)
174        for (&node_id, &count) in &self.counts {
175            buf.extend_from_slice(&node_id.to_le_bytes());
176            buf.extend_from_slice(&count.to_le_bytes());
177        }
178        buf
179    }
180
181    /// Decode from bytes
182    pub fn decode(data: &[u8]) -> Option<Self> {
183        if data.len() < 4 {
184            return None;
185        }
186        let num_entries = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
187        if data.len() < 4 + num_entries * 12 {
188            return None;
189        }
190
191        let mut counts = BTreeMap::new();
192        let mut offset = 4;
193        for _ in 0..num_entries {
194            let node_id = u32::from_le_bytes([
195                data[offset],
196                data[offset + 1],
197                data[offset + 2],
198                data[offset + 3],
199            ]);
200            let count = u64::from_le_bytes([
201                data[offset + 4],
202                data[offset + 5],
203                data[offset + 6],
204                data[offset + 7],
205                data[offset + 8],
206                data[offset + 9],
207                data[offset + 10],
208                data[offset + 11],
209            ]);
210            counts.insert(node_id, count);
211            offset += 12;
212        }
213
214        Some(Self { counts })
215    }
216}
217
218/// Position data with LWW semantics
219#[derive(Debug, Clone, Default, PartialEq)]
220pub struct Position {
221    /// Latitude in degrees
222    pub latitude: f32,
223    /// Longitude in degrees
224    pub longitude: f32,
225    /// Altitude in meters (optional)
226    pub altitude: Option<f32>,
227    /// Accuracy in meters (optional)
228    pub accuracy: Option<f32>,
229}
230
231impl Position {
232    /// Create a new position
233    pub fn new(latitude: f32, longitude: f32) -> Self {
234        Self {
235            latitude,
236            longitude,
237            altitude: None,
238            accuracy: None,
239        }
240    }
241
242    /// Create with altitude
243    pub fn with_altitude(mut self, altitude: f32) -> Self {
244        self.altitude = Some(altitude);
245        self
246    }
247
248    /// Create with accuracy
249    pub fn with_accuracy(mut self, accuracy: f32) -> Self {
250        self.accuracy = Some(accuracy);
251        self
252    }
253
254    /// Encode to bytes (12-20 bytes)
255    pub fn encode(&self) -> Vec<u8> {
256        let mut buf = Vec::with_capacity(20);
257        buf.extend_from_slice(&self.latitude.to_le_bytes());
258        buf.extend_from_slice(&self.longitude.to_le_bytes());
259
260        // Flags byte: bit 0 = has altitude, bit 1 = has accuracy
261        let mut flags = 0u8;
262        if self.altitude.is_some() {
263            flags |= 0x01;
264        }
265        if self.accuracy.is_some() {
266            flags |= 0x02;
267        }
268        buf.push(flags);
269
270        if let Some(alt) = self.altitude {
271            buf.extend_from_slice(&alt.to_le_bytes());
272        }
273        if let Some(acc) = self.accuracy {
274            buf.extend_from_slice(&acc.to_le_bytes());
275        }
276        buf
277    }
278
279    /// Decode from bytes
280    pub fn decode(data: &[u8]) -> Option<Self> {
281        if data.len() < 9 {
282            return None;
283        }
284
285        let latitude = f32::from_le_bytes([data[0], data[1], data[2], data[3]]);
286        let longitude = f32::from_le_bytes([data[4], data[5], data[6], data[7]]);
287        let flags = data[8];
288
289        let mut pos = Self::new(latitude, longitude);
290        let mut offset = 9;
291
292        if flags & 0x01 != 0 {
293            if data.len() < offset + 4 {
294                return None;
295            }
296            pos.altitude = Some(f32::from_le_bytes([
297                data[offset],
298                data[offset + 1],
299                data[offset + 2],
300                data[offset + 3],
301            ]));
302            offset += 4;
303        }
304
305        if flags & 0x02 != 0 {
306            if data.len() < offset + 4 {
307                return None;
308            }
309            pos.accuracy = Some(f32::from_le_bytes([
310                data[offset],
311                data[offset + 1],
312                data[offset + 2],
313                data[offset + 3],
314            ]));
315        }
316
317        Some(pos)
318    }
319}
320
321/// Health status data with LWW semantics
322#[derive(Debug, Clone, Default, PartialEq)]
323pub struct HealthStatus {
324    /// Battery percentage (0-100)
325    pub battery_percent: u8,
326    /// Heart rate BPM (optional)
327    pub heart_rate: Option<u8>,
328    /// Activity level (0=still, 1=walking, 2=running, 3=vehicle)
329    pub activity: u8,
330    /// Alert status flags
331    pub alerts: u8,
332}
333
334impl HealthStatus {
335    /// Alert flag: Man down
336    pub const ALERT_MAN_DOWN: u8 = 0x01;
337    /// Alert flag: Low battery
338    pub const ALERT_LOW_BATTERY: u8 = 0x02;
339    /// Alert flag: Out of range
340    pub const ALERT_OUT_OF_RANGE: u8 = 0x04;
341    /// Alert flag: Custom alert 1
342    pub const ALERT_CUSTOM_1: u8 = 0x08;
343
344    /// Create a new health status
345    pub fn new(battery_percent: u8) -> Self {
346        Self {
347            battery_percent,
348            heart_rate: None,
349            activity: 0,
350            alerts: 0,
351        }
352    }
353
354    /// Set heart rate
355    pub fn with_heart_rate(mut self, hr: u8) -> Self {
356        self.heart_rate = Some(hr);
357        self
358    }
359
360    /// Set activity level
361    pub fn with_activity(mut self, activity: u8) -> Self {
362        self.activity = activity;
363        self
364    }
365
366    /// Set alert flag
367    pub fn set_alert(&mut self, flag: u8) {
368        self.alerts |= flag;
369    }
370
371    /// Clear alert flag
372    pub fn clear_alert(&mut self, flag: u8) {
373        self.alerts &= !flag;
374    }
375
376    /// Check if alert is set
377    pub fn has_alert(&self, flag: u8) -> bool {
378        self.alerts & flag != 0
379    }
380
381    /// Encode to bytes (3-4 bytes)
382    pub fn encode(&self) -> Vec<u8> {
383        vec![
384            self.battery_percent,
385            self.activity,
386            self.alerts,
387            // Heart rate: 0 means not present, otherwise value
388            self.heart_rate.unwrap_or(0),
389        ]
390    }
391
392    /// Decode from bytes
393    pub fn decode(data: &[u8]) -> Option<Self> {
394        if data.len() < 4 {
395            return None;
396        }
397        let mut status = Self::new(data[0]);
398        status.activity = data[1];
399        status.alerts = data[2];
400        if data[3] > 0 {
401            status.heart_rate = Some(data[3]);
402        }
403        Some(status)
404    }
405}
406
407// ============================================================================
408// Peripheral (Sub-node) Types - for soldier-attached devices like M5Stack Core2
409// ============================================================================
410
411/// Type of peripheral device
412#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
413#[repr(u8)]
414pub enum PeripheralType {
415    /// Unknown/unspecified
416    #[default]
417    Unknown = 0,
418    /// Soldier-worn sensor (e.g., M5Stack Core2)
419    SoldierSensor = 1,
420    /// Fixed/stationary sensor
421    FixedSensor = 2,
422    /// Mesh relay only (no sensors)
423    Relay = 3,
424}
425
426impl PeripheralType {
427    /// Convert from u8 value
428    pub fn from_u8(v: u8) -> Self {
429        match v {
430            1 => Self::SoldierSensor,
431            2 => Self::FixedSensor,
432            3 => Self::Relay,
433            _ => Self::Unknown,
434        }
435    }
436}
437
438/// Event types that a peripheral can emit (e.g., from tap input)
439#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
440#[repr(u8)]
441pub enum EventType {
442    /// No event / cleared
443    #[default]
444    None = 0,
445    /// "I'm OK" ping
446    Ping = 1,
447    /// Request assistance
448    NeedAssist = 2,
449    /// Emergency / SOS
450    Emergency = 3,
451    /// Moving / in transit
452    Moving = 4,
453    /// In position / stationary
454    InPosition = 5,
455    /// Acknowledged / copy
456    Ack = 6,
457}
458
459impl EventType {
460    /// Convert from u8 value
461    pub fn from_u8(v: u8) -> Self {
462        match v {
463            1 => Self::Ping,
464            2 => Self::NeedAssist,
465            3 => Self::Emergency,
466            4 => Self::Moving,
467            5 => Self::InPosition,
468            6 => Self::Ack,
469            _ => Self::None,
470        }
471    }
472
473    /// Human-readable label for display
474    pub fn label(&self) -> &'static str {
475        match self {
476            Self::None => "",
477            Self::Ping => "PING",
478            Self::NeedAssist => "NEED ASSIST",
479            Self::Emergency => "EMERGENCY",
480            Self::Moving => "MOVING",
481            Self::InPosition => "IN POSITION",
482            Self::Ack => "ACK",
483        }
484    }
485}
486
487/// An event emitted by a peripheral (e.g., tap on Core2)
488#[derive(Debug, Clone, Default, PartialEq)]
489pub struct PeripheralEvent {
490    /// Type of event
491    pub event_type: EventType,
492    /// Timestamp when event occurred (ms since epoch or boot)
493    pub timestamp: u64,
494}
495
496impl PeripheralEvent {
497    /// Create a new peripheral event
498    pub fn new(event_type: EventType, timestamp: u64) -> Self {
499        Self {
500            event_type,
501            timestamp,
502        }
503    }
504
505    /// Encode to bytes (9 bytes)
506    pub fn encode(&self) -> Vec<u8> {
507        let mut buf = Vec::with_capacity(9);
508        buf.push(self.event_type as u8);
509        buf.extend_from_slice(&self.timestamp.to_le_bytes());
510        buf
511    }
512
513    /// Decode from bytes
514    pub fn decode(data: &[u8]) -> Option<Self> {
515        if data.len() < 9 {
516            return None;
517        }
518        Some(Self {
519            event_type: EventType::from_u8(data[0]),
520            timestamp: u64::from_le_bytes([
521                data[1], data[2], data[3], data[4], data[5], data[6], data[7], data[8],
522            ]),
523        })
524    }
525}
526
527/// An emergency event with acknowledgment tracking (CRDT)
528///
529/// Represents a single emergency incident with distributed ACK tracking.
530/// Each node in the mesh can acknowledge the emergency, and this state
531/// is replicated across all nodes using CRDT semantics.
532///
533/// ## CRDT Semantics
534///
535/// - **Identity**: Events are uniquely identified by (source_node, timestamp)
536/// - **Merge for same event**: ACK maps merge with OR (once acked, stays acked)
537/// - **Merge for different events**: Higher timestamp wins (newer emergency replaces older)
538/// - **Monotonic**: ACK state only moves from false → true, never back
539///
540/// ## Wire Format
541///
542/// ```text
543/// source_node: 4 bytes (LE)
544/// timestamp:   8 bytes (LE)
545/// num_acks:    4 bytes (LE)
546/// acks[N]:
547///   node_id:   4 bytes (LE)
548///   acked:     1 byte (0 or 1)
549/// ```
550#[derive(Debug, Clone, PartialEq, Default)]
551pub struct EmergencyEvent {
552    /// Node that triggered the emergency
553    source_node: u32,
554    /// Timestamp when emergency was triggered (for uniqueness)
555    timestamp: u64,
556    /// ACK status for each known peer: node_id -> has_acked
557    acks: BTreeMap<u32, bool>,
558}
559
560impl EmergencyEvent {
561    /// Create a new emergency event
562    ///
563    /// # Arguments
564    /// * `source_node` - Node ID that triggered the emergency
565    /// * `timestamp` - When the emergency was triggered
566    /// * `known_peers` - List of peer node IDs to track for ACKs
567    ///
568    /// The source node is automatically marked as acknowledged.
569    pub fn new(source_node: u32, timestamp: u64, known_peers: &[u32]) -> Self {
570        let mut acks = BTreeMap::new();
571
572        // Source node implicitly ACKs their own emergency
573        acks.insert(source_node, true);
574
575        // All other known peers start as not-acked
576        for &peer_id in known_peers {
577            if peer_id != source_node {
578                acks.entry(peer_id).or_insert(false);
579            }
580        }
581
582        Self {
583            source_node,
584            timestamp,
585            acks,
586        }
587    }
588
589    /// Get the source node that triggered the emergency
590    pub fn source_node(&self) -> u32 {
591        self.source_node
592    }
593
594    /// Get the timestamp when the emergency was triggered
595    pub fn timestamp(&self) -> u64 {
596        self.timestamp
597    }
598
599    /// Check if a specific node has acknowledged
600    pub fn has_acked(&self, node_id: u32) -> bool {
601        self.acks.get(&node_id).copied().unwrap_or(false)
602    }
603
604    /// Record an acknowledgment from a node
605    ///
606    /// Returns true if this was a new ACK (state changed)
607    pub fn ack(&mut self, node_id: u32) -> bool {
608        let entry = self.acks.entry(node_id).or_insert(false);
609        if !*entry {
610            *entry = true;
611            true
612        } else {
613            false
614        }
615    }
616
617    /// Add a peer to track (if not already present)
618    ///
619    /// New peers start as not-acked. This is useful when discovering
620    /// new peers after the emergency was created.
621    pub fn add_peer(&mut self, node_id: u32) {
622        self.acks.entry(node_id).or_insert(false);
623    }
624
625    /// Get list of nodes that have acknowledged
626    pub fn acked_nodes(&self) -> Vec<u32> {
627        self.acks
628            .iter()
629            .filter(|(_, &acked)| acked)
630            .map(|(&node_id, _)| node_id)
631            .collect()
632    }
633
634    /// Get list of nodes that have NOT acknowledged
635    pub fn pending_nodes(&self) -> Vec<u32> {
636        self.acks
637            .iter()
638            .filter(|(_, &acked)| !acked)
639            .map(|(&node_id, _)| node_id)
640            .collect()
641    }
642
643    /// Check if all tracked nodes have acknowledged
644    pub fn all_acked(&self) -> bool {
645        !self.acks.is_empty() && self.acks.values().all(|&acked| acked)
646    }
647
648    /// Get the total number of tracked nodes
649    pub fn peer_count(&self) -> usize {
650        self.acks.len()
651    }
652
653    /// Get the number of nodes that have acknowledged
654    pub fn ack_count(&self) -> usize {
655        self.acks.values().filter(|&&acked| acked).count()
656    }
657
658    /// Merge with another emergency event (CRDT semantics)
659    ///
660    /// # Returns
661    /// `true` if our state changed
662    ///
663    /// # Semantics
664    /// - Same event (source_node, timestamp): merge ACK maps with OR
665    /// - Different event: take the one with higher timestamp
666    pub fn merge(&mut self, other: &EmergencyEvent) -> bool {
667        // Different emergency - take newer one
668        if self.source_node != other.source_node || self.timestamp != other.timestamp {
669            if other.timestamp > self.timestamp {
670                *self = other.clone();
671                return true;
672            }
673            return false;
674        }
675
676        // Same emergency - merge ACK maps with OR
677        let mut changed = false;
678        for (&node_id, &other_acked) in &other.acks {
679            let entry = self.acks.entry(node_id).or_insert(false);
680            if other_acked && !*entry {
681                *entry = true;
682                changed = true;
683            }
684        }
685        changed
686    }
687
688    /// Encode to bytes for transmission
689    ///
690    /// Format: source_node(4) + timestamp(8) + num_acks(4) + acks[N](5 each)
691    pub fn encode(&self) -> Vec<u8> {
692        let mut buf = Vec::with_capacity(16 + self.acks.len() * 5);
693
694        buf.extend_from_slice(&self.source_node.to_le_bytes());
695        buf.extend_from_slice(&self.timestamp.to_le_bytes());
696        buf.extend_from_slice(&(self.acks.len() as u32).to_le_bytes());
697
698        for (&node_id, &acked) in &self.acks {
699            buf.extend_from_slice(&node_id.to_le_bytes());
700            buf.push(if acked { 1 } else { 0 });
701        }
702
703        buf
704    }
705
706    /// Decode from bytes
707    pub fn decode(data: &[u8]) -> Option<Self> {
708        if data.len() < 16 {
709            return None;
710        }
711
712        let source_node = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
713        let timestamp = u64::from_le_bytes([
714            data[4], data[5], data[6], data[7], data[8], data[9], data[10], data[11],
715        ]);
716        let num_acks = u32::from_le_bytes([data[12], data[13], data[14], data[15]]) as usize;
717
718        if data.len() < 16 + num_acks * 5 {
719            return None;
720        }
721
722        let mut acks = BTreeMap::new();
723        let mut offset = 16;
724        for _ in 0..num_acks {
725            let node_id = u32::from_le_bytes([
726                data[offset],
727                data[offset + 1],
728                data[offset + 2],
729                data[offset + 3],
730            ]);
731            let acked = data[offset + 4] != 0;
732            acks.insert(node_id, acked);
733            offset += 5;
734        }
735
736        Some(Self {
737            source_node,
738            timestamp,
739            acks,
740        })
741    }
742}
743
744/// A peripheral device attached to a Node (soldier)
745///
746/// Peripherals are sub-tier devices that augment a soldier's capabilities
747/// with sensors and input (e.g., M5Stack Core2 wearable).
748#[derive(Debug, Clone, Default)]
749pub struct Peripheral {
750    /// Unique peripheral ID (derived from device MAC or similar)
751    pub id: u32,
752    /// Parent Node ID this peripheral is attached to (0 if not paired)
753    pub parent_node: u32,
754    /// Type of peripheral
755    pub peripheral_type: PeripheralType,
756    /// Callsign/name (inherited from parent or configured)
757    pub callsign: [u8; 12],
758    /// Current health status
759    pub health: HealthStatus,
760    /// Most recent event (if any)
761    pub last_event: Option<PeripheralEvent>,
762    /// Last update timestamp
763    pub timestamp: u64,
764}
765
766impl Peripheral {
767    /// Create a new peripheral
768    pub fn new(id: u32, peripheral_type: PeripheralType) -> Self {
769        Self {
770            id,
771            parent_node: 0,
772            peripheral_type,
773            callsign: [0u8; 12],
774            health: HealthStatus::default(),
775            last_event: None,
776            timestamp: 0,
777        }
778    }
779
780    /// Set the callsign (truncated to 12 bytes)
781    pub fn with_callsign(mut self, callsign: &str) -> Self {
782        let bytes = callsign.as_bytes();
783        let len = bytes.len().min(12);
784        self.callsign[..len].copy_from_slice(&bytes[..len]);
785        self
786    }
787
788    /// Get callsign as string
789    pub fn callsign_str(&self) -> &str {
790        let len = self.callsign.iter().position(|&b| b == 0).unwrap_or(12);
791        core::str::from_utf8(&self.callsign[..len]).unwrap_or("")
792    }
793
794    /// Set parent node
795    pub fn with_parent(mut self, parent_node: u32) -> Self {
796        self.parent_node = parent_node;
797        self
798    }
799
800    /// Record an event
801    pub fn set_event(&mut self, event_type: EventType, timestamp: u64) {
802        self.last_event = Some(PeripheralEvent::new(event_type, timestamp));
803        self.timestamp = timestamp;
804    }
805
806    /// Clear the last event
807    pub fn clear_event(&mut self) {
808        self.last_event = None;
809    }
810
811    /// Encode to bytes for BLE transmission
812    /// Format: [id:4][parent:4][type:1][callsign:12][health:4][has_event:1][event:9?][timestamp:8]
813    /// Size: 34 bytes without event, 43 bytes with event
814    pub fn encode(&self) -> Vec<u8> {
815        let mut buf = Vec::with_capacity(43);
816        buf.extend_from_slice(&self.id.to_le_bytes());
817        buf.extend_from_slice(&self.parent_node.to_le_bytes());
818        buf.push(self.peripheral_type as u8);
819        buf.extend_from_slice(&self.callsign);
820        buf.extend_from_slice(&self.health.encode());
821
822        if let Some(ref event) = self.last_event {
823            buf.push(1); // has event
824            buf.extend_from_slice(&event.encode());
825        } else {
826            buf.push(0); // no event
827        }
828
829        buf.extend_from_slice(&self.timestamp.to_le_bytes());
830        buf
831    }
832
833    /// Decode from bytes
834    pub fn decode(data: &[u8]) -> Option<Self> {
835        if data.len() < 34 {
836            return None;
837        }
838
839        let id = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
840        let parent_node = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
841        let peripheral_type = PeripheralType::from_u8(data[8]);
842
843        let mut callsign = [0u8; 12];
844        callsign.copy_from_slice(&data[9..21]);
845
846        let health = HealthStatus::decode(&data[21..25])?;
847
848        let has_event = data[25] != 0;
849        let (last_event, timestamp_offset) = if has_event {
850            if data.len() < 43 {
851                return None;
852            }
853            (PeripheralEvent::decode(&data[26..35]), 35)
854        } else {
855            (None, 26)
856        };
857
858        if data.len() < timestamp_offset + 8 {
859            return None;
860        }
861
862        let timestamp = u64::from_le_bytes([
863            data[timestamp_offset],
864            data[timestamp_offset + 1],
865            data[timestamp_offset + 2],
866            data[timestamp_offset + 3],
867            data[timestamp_offset + 4],
868            data[timestamp_offset + 5],
869            data[timestamp_offset + 6],
870            data[timestamp_offset + 7],
871        ]);
872
873        Some(Self {
874            id,
875            parent_node,
876            peripheral_type,
877            callsign,
878            health,
879            last_event,
880            timestamp,
881        })
882    }
883}
884
885/// CRDT operation types for sync
886#[derive(Debug, Clone)]
887pub enum CrdtOperation {
888    /// Update a position register
889    UpdatePosition {
890        /// Node ID that owns this position
891        node_id: NodeId,
892        /// Position data
893        position: Position,
894        /// Timestamp of the update
895        timestamp: Timestamp,
896    },
897    /// Update health status register
898    UpdateHealth {
899        /// Node ID that owns this status
900        node_id: NodeId,
901        /// Health status data
902        status: HealthStatus,
903        /// Timestamp of the update
904        timestamp: Timestamp,
905    },
906    /// Increment a counter
907    IncrementCounter {
908        /// Counter identifier
909        counter_id: u8,
910        /// Node performing the increment
911        node_id: NodeId,
912        /// Amount to increment
913        amount: u64,
914    },
915    /// Generic LWW update (key-value)
916    UpdateRegister {
917        /// Key for the register
918        key: String,
919        /// Value data
920        value: Vec<u8>,
921        /// Timestamp of the update
922        timestamp: Timestamp,
923        /// Node that set the value
924        node_id: NodeId,
925    },
926}
927
928impl CrdtOperation {
929    /// Get the approximate size in bytes
930    pub fn size(&self) -> usize {
931        match self {
932            CrdtOperation::UpdatePosition { position, .. } => 4 + 8 + position.encode().len(),
933            CrdtOperation::UpdateHealth { status, .. } => 4 + 8 + status.encode().len(),
934            CrdtOperation::IncrementCounter { .. } => 1 + 4 + 8,
935            CrdtOperation::UpdateRegister { key, value, .. } => 4 + 8 + key.len() + value.len(),
936        }
937    }
938
939    /// Encode to bytes
940    pub fn encode(&self) -> Vec<u8> {
941        let mut buf = Vec::new();
942        match self {
943            CrdtOperation::UpdatePosition {
944                node_id,
945                position,
946                timestamp,
947            } => {
948                buf.push(0x01); // Type tag
949                buf.extend_from_slice(&node_id.as_u32().to_le_bytes());
950                buf.extend_from_slice(&timestamp.to_le_bytes());
951                buf.extend_from_slice(&position.encode());
952            }
953            CrdtOperation::UpdateHealth {
954                node_id,
955                status,
956                timestamp,
957            } => {
958                buf.push(0x02); // Type tag
959                buf.extend_from_slice(&node_id.as_u32().to_le_bytes());
960                buf.extend_from_slice(&timestamp.to_le_bytes());
961                buf.extend_from_slice(&status.encode());
962            }
963            CrdtOperation::IncrementCounter {
964                counter_id,
965                node_id,
966                amount,
967            } => {
968                buf.push(0x03); // Type tag
969                buf.push(*counter_id);
970                buf.extend_from_slice(&node_id.as_u32().to_le_bytes());
971                buf.extend_from_slice(&amount.to_le_bytes());
972            }
973            CrdtOperation::UpdateRegister {
974                key,
975                value,
976                timestamp,
977                node_id,
978            } => {
979                buf.push(0x04); // Type tag
980                buf.extend_from_slice(&node_id.as_u32().to_le_bytes());
981                buf.extend_from_slice(&timestamp.to_le_bytes());
982                buf.push(key.len() as u8);
983                buf.extend_from_slice(key.as_bytes());
984                buf.extend_from_slice(&(value.len() as u16).to_le_bytes());
985                buf.extend_from_slice(value);
986            }
987        }
988        buf
989    }
990
991    /// Decode from bytes
992    pub fn decode(data: &[u8]) -> Option<Self> {
993        if data.is_empty() {
994            return None;
995        }
996
997        match data[0] {
998            0x01 => {
999                // UpdatePosition
1000                if data.len() < 13 {
1001                    return None;
1002                }
1003                let node_id = NodeId::new(u32::from_le_bytes([data[1], data[2], data[3], data[4]]));
1004                let timestamp = u64::from_le_bytes([
1005                    data[5], data[6], data[7], data[8], data[9], data[10], data[11], data[12],
1006                ]);
1007                let position = Position::decode(&data[13..])?;
1008                Some(CrdtOperation::UpdatePosition {
1009                    node_id,
1010                    position,
1011                    timestamp,
1012                })
1013            }
1014            0x02 => {
1015                // UpdateHealth
1016                if data.len() < 13 {
1017                    return None;
1018                }
1019                let node_id = NodeId::new(u32::from_le_bytes([data[1], data[2], data[3], data[4]]));
1020                let timestamp = u64::from_le_bytes([
1021                    data[5], data[6], data[7], data[8], data[9], data[10], data[11], data[12],
1022                ]);
1023                let status = HealthStatus::decode(&data[13..])?;
1024                Some(CrdtOperation::UpdateHealth {
1025                    node_id,
1026                    status,
1027                    timestamp,
1028                })
1029            }
1030            0x03 => {
1031                // IncrementCounter
1032                if data.len() < 14 {
1033                    return None;
1034                }
1035                let counter_id = data[1];
1036                let node_id = NodeId::new(u32::from_le_bytes([data[2], data[3], data[4], data[5]]));
1037                let amount = u64::from_le_bytes([
1038                    data[6], data[7], data[8], data[9], data[10], data[11], data[12], data[13],
1039                ]);
1040                Some(CrdtOperation::IncrementCounter {
1041                    counter_id,
1042                    node_id,
1043                    amount,
1044                })
1045            }
1046            0x04 => {
1047                // UpdateRegister
1048                if data.len() < 14 {
1049                    return None;
1050                }
1051                let node_id = NodeId::new(u32::from_le_bytes([data[1], data[2], data[3], data[4]]));
1052                let timestamp = u64::from_le_bytes([
1053                    data[5], data[6], data[7], data[8], data[9], data[10], data[11], data[12],
1054                ]);
1055                let key_len = data[13] as usize;
1056                if data.len() < 14 + key_len + 2 {
1057                    return None;
1058                }
1059                let key = core::str::from_utf8(&data[14..14 + key_len])
1060                    .ok()?
1061                    .to_string();
1062                let value_len =
1063                    u16::from_le_bytes([data[14 + key_len], data[15 + key_len]]) as usize;
1064                if data.len() < 16 + key_len + value_len {
1065                    return None;
1066                }
1067                let value = data[16 + key_len..16 + key_len + value_len].to_vec();
1068                Some(CrdtOperation::UpdateRegister {
1069                    key,
1070                    value,
1071                    timestamp,
1072                    node_id,
1073                })
1074            }
1075            _ => None,
1076        }
1077    }
1078}
1079
1080#[cfg(test)]
1081mod tests {
1082    use super::*;
1083
1084    #[test]
1085    fn test_lww_register_basic() {
1086        let mut reg = LwwRegister::new(42u32, 100, NodeId::new(1));
1087        assert_eq!(*reg.get(), 42);
1088        assert_eq!(reg.timestamp(), 100);
1089
1090        // Higher timestamp wins
1091        assert!(reg.set(99, 200, NodeId::new(2)));
1092        assert_eq!(*reg.get(), 99);
1093
1094        // Lower timestamp loses
1095        assert!(!reg.set(50, 150, NodeId::new(3)));
1096        assert_eq!(*reg.get(), 99);
1097    }
1098
1099    #[test]
1100    fn test_lww_register_tiebreak() {
1101        let mut reg = LwwRegister::new(1u32, 100, NodeId::new(1));
1102
1103        // Same timestamp, higher node_id wins
1104        assert!(reg.set(2, 100, NodeId::new(2)));
1105        assert_eq!(*reg.get(), 2);
1106
1107        // Same timestamp, lower node_id loses
1108        assert!(!reg.set(3, 100, NodeId::new(1)));
1109        assert_eq!(*reg.get(), 2);
1110    }
1111
1112    #[test]
1113    fn test_lww_register_merge() {
1114        let mut reg1 = LwwRegister::new(1u32, 100, NodeId::new(1));
1115        let reg2 = LwwRegister::new(2u32, 200, NodeId::new(2));
1116
1117        assert!(reg1.merge(&reg2));
1118        assert_eq!(*reg1.get(), 2);
1119    }
1120
1121    #[test]
1122    fn test_gcounter_basic() {
1123        let mut counter = GCounter::new();
1124        let node1 = NodeId::new(1);
1125        let node2 = NodeId::new(2);
1126
1127        counter.increment(&node1, 5);
1128        counter.increment(&node2, 3);
1129        counter.increment(&node1, 2);
1130
1131        assert_eq!(counter.value(), 10);
1132        assert_eq!(counter.node_count(&node1), 7);
1133        assert_eq!(counter.node_count(&node2), 3);
1134    }
1135
1136    #[test]
1137    fn test_gcounter_merge() {
1138        let mut counter1 = GCounter::new();
1139        let mut counter2 = GCounter::new();
1140        let node1 = NodeId::new(1);
1141        let node2 = NodeId::new(2);
1142
1143        counter1.increment(&node1, 5);
1144        counter2.increment(&node1, 3);
1145        counter2.increment(&node2, 4);
1146
1147        counter1.merge(&counter2);
1148
1149        assert_eq!(counter1.value(), 9); // max(5,3) + 4
1150        assert_eq!(counter1.node_count(&node1), 5);
1151        assert_eq!(counter1.node_count(&node2), 4);
1152    }
1153
1154    #[test]
1155    fn test_gcounter_encode_decode() {
1156        let mut counter = GCounter::new();
1157        counter.increment(&NodeId::new(1), 5);
1158        counter.increment(&NodeId::new(2), 10);
1159
1160        let encoded = counter.encode();
1161        let decoded = GCounter::decode(&encoded).unwrap();
1162
1163        assert_eq!(decoded.value(), counter.value());
1164        assert_eq!(decoded.node_count(&NodeId::new(1)), 5);
1165        assert_eq!(decoded.node_count(&NodeId::new(2)), 10);
1166    }
1167
1168    #[test]
1169    fn test_position_encode_decode() {
1170        let pos = Position::new(37.7749, -122.4194)
1171            .with_altitude(100.0)
1172            .with_accuracy(5.0);
1173
1174        let encoded = pos.encode();
1175        let decoded = Position::decode(&encoded).unwrap();
1176
1177        assert_eq!(decoded.latitude, pos.latitude);
1178        assert_eq!(decoded.longitude, pos.longitude);
1179        assert_eq!(decoded.altitude, pos.altitude);
1180        assert_eq!(decoded.accuracy, pos.accuracy);
1181    }
1182
1183    #[test]
1184    fn test_position_minimal_encode() {
1185        let pos = Position::new(0.0, 0.0);
1186        let encoded = pos.encode();
1187        assert_eq!(encoded.len(), 9); // lat + lon + flags
1188
1189        let pos_with_alt = Position::new(0.0, 0.0).with_altitude(0.0);
1190        let encoded_alt = pos_with_alt.encode();
1191        assert_eq!(encoded_alt.len(), 13);
1192    }
1193
1194    #[test]
1195    fn test_health_status() {
1196        let mut status = HealthStatus::new(85).with_heart_rate(72).with_activity(1);
1197
1198        assert_eq!(status.battery_percent, 85);
1199        assert_eq!(status.heart_rate, Some(72));
1200        assert!(!status.has_alert(HealthStatus::ALERT_MAN_DOWN));
1201
1202        status.set_alert(HealthStatus::ALERT_MAN_DOWN);
1203        assert!(status.has_alert(HealthStatus::ALERT_MAN_DOWN));
1204
1205        let encoded = status.encode();
1206        let decoded = HealthStatus::decode(&encoded).unwrap();
1207        assert_eq!(decoded.battery_percent, 85);
1208        assert_eq!(decoded.heart_rate, Some(72));
1209        assert!(decoded.has_alert(HealthStatus::ALERT_MAN_DOWN));
1210    }
1211
1212    #[test]
1213    fn test_crdt_operation_position() {
1214        let op = CrdtOperation::UpdatePosition {
1215            node_id: NodeId::new(0x1234),
1216            position: Position::new(37.0, -122.0),
1217            timestamp: 1000,
1218        };
1219
1220        let encoded = op.encode();
1221        let decoded = CrdtOperation::decode(&encoded).unwrap();
1222
1223        if let CrdtOperation::UpdatePosition {
1224            node_id,
1225            position,
1226            timestamp,
1227        } = decoded
1228        {
1229            assert_eq!(node_id.as_u32(), 0x1234);
1230            assert_eq!(timestamp, 1000);
1231            assert_eq!(position.latitude, 37.0);
1232        } else {
1233            panic!("Wrong operation type");
1234        }
1235    }
1236
1237    #[test]
1238    fn test_crdt_operation_counter() {
1239        let op = CrdtOperation::IncrementCounter {
1240            counter_id: 1,
1241            node_id: NodeId::new(0x5678),
1242            amount: 42,
1243        };
1244
1245        let encoded = op.encode();
1246        let decoded = CrdtOperation::decode(&encoded).unwrap();
1247
1248        if let CrdtOperation::IncrementCounter {
1249            counter_id,
1250            node_id,
1251            amount,
1252        } = decoded
1253        {
1254            assert_eq!(counter_id, 1);
1255            assert_eq!(node_id.as_u32(), 0x5678);
1256            assert_eq!(amount, 42);
1257        } else {
1258            panic!("Wrong operation type");
1259        }
1260    }
1261
1262    #[test]
1263    fn test_crdt_operation_size() {
1264        let pos_op = CrdtOperation::UpdatePosition {
1265            node_id: NodeId::new(1),
1266            position: Position::new(0.0, 0.0),
1267            timestamp: 0,
1268        };
1269        assert!(pos_op.size() > 0);
1270
1271        let counter_op = CrdtOperation::IncrementCounter {
1272            counter_id: 0,
1273            node_id: NodeId::new(1),
1274            amount: 1,
1275        };
1276        assert_eq!(counter_op.size(), 13);
1277    }
1278
1279    // ============================================================================
1280    // Peripheral Tests
1281    // ============================================================================
1282
1283    #[test]
1284    fn test_peripheral_type_from_u8() {
1285        assert_eq!(PeripheralType::from_u8(0), PeripheralType::Unknown);
1286        assert_eq!(PeripheralType::from_u8(1), PeripheralType::SoldierSensor);
1287        assert_eq!(PeripheralType::from_u8(2), PeripheralType::FixedSensor);
1288        assert_eq!(PeripheralType::from_u8(3), PeripheralType::Relay);
1289        assert_eq!(PeripheralType::from_u8(99), PeripheralType::Unknown);
1290    }
1291
1292    #[test]
1293    fn test_event_type_from_u8() {
1294        assert_eq!(EventType::from_u8(0), EventType::None);
1295        assert_eq!(EventType::from_u8(1), EventType::Ping);
1296        assert_eq!(EventType::from_u8(2), EventType::NeedAssist);
1297        assert_eq!(EventType::from_u8(3), EventType::Emergency);
1298        assert_eq!(EventType::from_u8(4), EventType::Moving);
1299        assert_eq!(EventType::from_u8(5), EventType::InPosition);
1300        assert_eq!(EventType::from_u8(6), EventType::Ack);
1301        assert_eq!(EventType::from_u8(99), EventType::None);
1302    }
1303
1304    #[test]
1305    fn test_event_type_labels() {
1306        assert_eq!(EventType::None.label(), "");
1307        assert_eq!(EventType::Emergency.label(), "EMERGENCY");
1308        assert_eq!(EventType::Ping.label(), "PING");
1309    }
1310
1311    #[test]
1312    fn test_peripheral_event_encode_decode() {
1313        let event = PeripheralEvent::new(EventType::Emergency, 1234567890);
1314        let encoded = event.encode();
1315        assert_eq!(encoded.len(), 9);
1316
1317        let decoded = PeripheralEvent::decode(&encoded).unwrap();
1318        assert_eq!(decoded.event_type, EventType::Emergency);
1319        assert_eq!(decoded.timestamp, 1234567890);
1320    }
1321
1322    #[test]
1323    fn test_peripheral_new() {
1324        let peripheral = Peripheral::new(0x12345678, PeripheralType::SoldierSensor);
1325        assert_eq!(peripheral.id, 0x12345678);
1326        assert_eq!(peripheral.peripheral_type, PeripheralType::SoldierSensor);
1327        assert_eq!(peripheral.parent_node, 0);
1328        assert!(peripheral.last_event.is_none());
1329    }
1330
1331    #[test]
1332    fn test_peripheral_with_callsign() {
1333        let peripheral = Peripheral::new(1, PeripheralType::SoldierSensor).with_callsign("ALPHA-1");
1334        assert_eq!(peripheral.callsign_str(), "ALPHA-1");
1335
1336        // Test truncation
1337        let peripheral2 = Peripheral::new(2, PeripheralType::SoldierSensor)
1338            .with_callsign("THIS_IS_A_VERY_LONG_CALLSIGN");
1339        assert_eq!(peripheral2.callsign_str(), "THIS_IS_A_VE");
1340    }
1341
1342    #[test]
1343    fn test_peripheral_set_event() {
1344        let mut peripheral = Peripheral::new(1, PeripheralType::SoldierSensor);
1345        peripheral.set_event(EventType::Emergency, 1000);
1346
1347        assert!(peripheral.last_event.is_some());
1348        let event = peripheral.last_event.as_ref().unwrap();
1349        assert_eq!(event.event_type, EventType::Emergency);
1350        assert_eq!(event.timestamp, 1000);
1351        assert_eq!(peripheral.timestamp, 1000);
1352
1353        peripheral.clear_event();
1354        assert!(peripheral.last_event.is_none());
1355    }
1356
1357    #[test]
1358    fn test_peripheral_encode_decode_without_event() {
1359        let peripheral = Peripheral::new(0xAABBCCDD, PeripheralType::SoldierSensor)
1360            .with_callsign("BRAVO-2")
1361            .with_parent(0x11223344);
1362
1363        let encoded = peripheral.encode();
1364        assert_eq!(encoded.len(), 34); // No event
1365
1366        let decoded = Peripheral::decode(&encoded).unwrap();
1367        assert_eq!(decoded.id, 0xAABBCCDD);
1368        assert_eq!(decoded.parent_node, 0x11223344);
1369        assert_eq!(decoded.peripheral_type, PeripheralType::SoldierSensor);
1370        assert_eq!(decoded.callsign_str(), "BRAVO-2");
1371        assert!(decoded.last_event.is_none());
1372    }
1373
1374    #[test]
1375    fn test_peripheral_encode_decode_with_event() {
1376        let mut peripheral = Peripheral::new(0x12345678, PeripheralType::SoldierSensor)
1377            .with_callsign("CHARLIE")
1378            .with_parent(0x87654321);
1379        peripheral.health = HealthStatus::new(85);
1380        peripheral.set_event(EventType::NeedAssist, 9999);
1381
1382        let encoded = peripheral.encode();
1383        assert_eq!(encoded.len(), 43); // With event
1384
1385        let decoded = Peripheral::decode(&encoded).unwrap();
1386        assert_eq!(decoded.id, 0x12345678);
1387        assert_eq!(decoded.parent_node, 0x87654321);
1388        assert_eq!(decoded.callsign_str(), "CHARLIE");
1389        assert_eq!(decoded.health.battery_percent, 85);
1390        assert!(decoded.last_event.is_some());
1391        let event = decoded.last_event.as_ref().unwrap();
1392        assert_eq!(event.event_type, EventType::NeedAssist);
1393        assert_eq!(event.timestamp, 9999);
1394    }
1395
1396    #[test]
1397    fn test_peripheral_decode_invalid_data() {
1398        // Too short
1399        assert!(Peripheral::decode(&[0u8; 10]).is_none());
1400
1401        // Valid length but no event
1402        let mut data = vec![0u8; 34];
1403        data[25] = 0; // no event flag
1404        assert!(Peripheral::decode(&data).is_some());
1405
1406        // Claims to have event but too short
1407        data[25] = 1; // has event flag
1408        assert!(Peripheral::decode(&data).is_none());
1409    }
1410
1411    // ============================================================================
1412    // EmergencyEvent Tests
1413    // ============================================================================
1414
1415    #[test]
1416    fn test_emergency_event_new() {
1417        let peers = vec![0x22222222, 0x33333333];
1418        let event = EmergencyEvent::new(0x11111111, 1000, &peers);
1419
1420        assert_eq!(event.source_node(), 0x11111111);
1421        assert_eq!(event.timestamp(), 1000);
1422        assert_eq!(event.peer_count(), 3); // source + 2 peers
1423
1424        // Source is auto-acked
1425        assert!(event.has_acked(0x11111111));
1426        // Others are not
1427        assert!(!event.has_acked(0x22222222));
1428        assert!(!event.has_acked(0x33333333));
1429    }
1430
1431    #[test]
1432    fn test_emergency_event_ack() {
1433        let peers = vec![0x22222222, 0x33333333];
1434        let mut event = EmergencyEvent::new(0x11111111, 1000, &peers);
1435
1436        assert_eq!(event.ack_count(), 1); // just source
1437        assert!(!event.all_acked());
1438
1439        // ACK from first peer
1440        assert!(event.ack(0x22222222)); // returns true - new ack
1441        assert_eq!(event.ack_count(), 2);
1442        assert!(!event.all_acked());
1443
1444        // Duplicate ACK
1445        assert!(!event.ack(0x22222222)); // returns false - already acked
1446
1447        // ACK from second peer
1448        assert!(event.ack(0x33333333));
1449        assert_eq!(event.ack_count(), 3);
1450        assert!(event.all_acked());
1451    }
1452
1453    #[test]
1454    fn test_emergency_event_pending_nodes() {
1455        let peers = vec![0x22222222, 0x33333333];
1456        let mut event = EmergencyEvent::new(0x11111111, 1000, &peers);
1457
1458        let pending = event.pending_nodes();
1459        assert_eq!(pending.len(), 2);
1460        assert!(pending.contains(&0x22222222));
1461        assert!(pending.contains(&0x33333333));
1462
1463        event.ack(0x22222222);
1464        let pending = event.pending_nodes();
1465        assert_eq!(pending.len(), 1);
1466        assert!(pending.contains(&0x33333333));
1467    }
1468
1469    #[test]
1470    fn test_emergency_event_encode_decode() {
1471        let peers = vec![0x22222222, 0x33333333];
1472        let mut event = EmergencyEvent::new(0x11111111, 1234567890, &peers);
1473        event.ack(0x22222222);
1474
1475        let encoded = event.encode();
1476        let decoded = EmergencyEvent::decode(&encoded).unwrap();
1477
1478        assert_eq!(decoded.source_node(), 0x11111111);
1479        assert_eq!(decoded.timestamp(), 1234567890);
1480        assert!(decoded.has_acked(0x11111111));
1481        assert!(decoded.has_acked(0x22222222));
1482        assert!(!decoded.has_acked(0x33333333));
1483    }
1484
1485    #[test]
1486    fn test_emergency_event_merge_same_event() {
1487        // Two nodes have the same emergency, different ack states
1488        let peers = vec![0x22222222, 0x33333333];
1489        let mut event1 = EmergencyEvent::new(0x11111111, 1000, &peers);
1490        let mut event2 = EmergencyEvent::new(0x11111111, 1000, &peers);
1491
1492        event1.ack(0x22222222);
1493        event2.ack(0x33333333);
1494
1495        // Merge event2 into event1
1496        let changed = event1.merge(&event2);
1497        assert!(changed);
1498        assert!(event1.has_acked(0x22222222));
1499        assert!(event1.has_acked(0x33333333));
1500        assert!(event1.all_acked());
1501    }
1502
1503    #[test]
1504    fn test_emergency_event_merge_different_events() {
1505        // Old emergency
1506        let mut old_event = EmergencyEvent::new(0x11111111, 1000, &[0x22222222]);
1507        old_event.ack(0x22222222);
1508
1509        // New emergency from different source
1510        let new_event = EmergencyEvent::new(0x33333333, 2000, &[0x11111111, 0x22222222]);
1511
1512        // Merge new into old - should replace
1513        let changed = old_event.merge(&new_event);
1514        assert!(changed);
1515        assert_eq!(old_event.source_node(), 0x33333333);
1516        assert_eq!(old_event.timestamp(), 2000);
1517        // Old ack state should be gone
1518        assert!(!old_event.has_acked(0x22222222));
1519    }
1520
1521    #[test]
1522    fn test_emergency_event_merge_older_event_ignored() {
1523        // Current emergency
1524        let mut current = EmergencyEvent::new(0x11111111, 2000, &[0x22222222]);
1525
1526        // Older emergency
1527        let older = EmergencyEvent::new(0x33333333, 1000, &[0x11111111]);
1528
1529        // Merge older into current - should NOT replace
1530        let changed = current.merge(&older);
1531        assert!(!changed);
1532        assert_eq!(current.source_node(), 0x11111111);
1533        assert_eq!(current.timestamp(), 2000);
1534    }
1535
1536    #[test]
1537    fn test_emergency_event_add_peer() {
1538        let mut event = EmergencyEvent::new(0x11111111, 1000, &[]);
1539
1540        // Add a peer discovered after emergency started
1541        event.add_peer(0x22222222);
1542        assert!(!event.has_acked(0x22222222));
1543        assert_eq!(event.peer_count(), 2);
1544
1545        // Adding same peer again doesn't change ack status
1546        event.ack(0x22222222);
1547        event.add_peer(0x22222222);
1548        assert!(event.has_acked(0x22222222)); // still acked
1549    }
1550
1551    #[test]
1552    fn test_emergency_event_decode_invalid() {
1553        // Too short
1554        assert!(EmergencyEvent::decode(&[0u8; 10]).is_none());
1555
1556        // Valid header but claims more acks than data
1557        let mut data = vec![0u8; 16];
1558        data[12] = 5; // claims 5 ack entries
1559        assert!(EmergencyEvent::decode(&data).is_none());
1560    }
1561}