hive_btle/sync/
crdt.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! CRDT (Conflict-free Replicated Data Types) for HIVE-Lite
17//!
18//! Provides lightweight CRDT implementations optimized for BLE sync:
19//! - LWW-Register: Last-Writer-Wins for single values
20//! - G-Counter: Grow-only counter for metrics
21//!
22//! These are designed for minimal memory footprint and efficient
23//! serialization over constrained BLE connections.
24
25#[cfg(not(feature = "std"))]
26use alloc::{collections::BTreeMap, string::String, string::ToString, vec, vec::Vec};
27#[cfg(feature = "std")]
28use std::collections::BTreeMap;
29
30use crate::NodeId;
31
32/// Timestamp for CRDT operations (milliseconds since epoch or monotonic)
33pub type Timestamp = u64;
34
35/// A Last-Writer-Wins Register
36///
37/// Stores a single value where concurrent writes are resolved by timestamp.
38/// Higher timestamp wins. In case of tie, higher node ID wins.
39#[derive(Debug, Clone, PartialEq)]
40pub struct LwwRegister<T: Clone> {
41    /// Current value
42    value: T,
43    /// Timestamp when value was set
44    timestamp: Timestamp,
45    /// Node that set the value
46    node_id: NodeId,
47}
48
49impl<T: Clone + Default> Default for LwwRegister<T> {
50    fn default() -> Self {
51        Self {
52            value: T::default(),
53            timestamp: 0,
54            node_id: NodeId::default(),
55        }
56    }
57}
58
59impl<T: Clone> LwwRegister<T> {
60    /// Create a new register with an initial value
61    pub fn new(value: T, timestamp: Timestamp, node_id: NodeId) -> Self {
62        Self {
63            value,
64            timestamp,
65            node_id,
66        }
67    }
68
69    /// Get the current value
70    pub fn get(&self) -> &T {
71        &self.value
72    }
73
74    /// Get the timestamp
75    pub fn timestamp(&self) -> Timestamp {
76        self.timestamp
77    }
78
79    /// Get the node that set the value
80    pub fn node_id(&self) -> &NodeId {
81        &self.node_id
82    }
83
84    /// Set a new value if it has a higher timestamp
85    ///
86    /// Returns true if the value was updated
87    pub fn set(&mut self, value: T, timestamp: Timestamp, node_id: NodeId) -> bool {
88        if self.should_update(timestamp, &node_id) {
89            self.value = value;
90            self.timestamp = timestamp;
91            self.node_id = node_id;
92            true
93        } else {
94            false
95        }
96    }
97
98    /// Merge with another register (LWW semantics)
99    ///
100    /// Returns true if our value was updated
101    pub fn merge(&mut self, other: &LwwRegister<T>) -> bool {
102        if self.should_update(other.timestamp, &other.node_id) {
103            self.value = other.value.clone();
104            self.timestamp = other.timestamp;
105            self.node_id = other.node_id;
106            true
107        } else {
108            false
109        }
110    }
111
112    /// Check if we should update based on timestamp/node_id
113    fn should_update(&self, timestamp: Timestamp, node_id: &NodeId) -> bool {
114        timestamp > self.timestamp
115            || (timestamp == self.timestamp && node_id.as_u32() > self.node_id.as_u32())
116    }
117}
118
119/// A Grow-only Counter (G-Counter)
120///
121/// Each node maintains its own count, total is the sum of all counts.
122/// Only supports increment operations.
123#[derive(Debug, Clone, Default)]
124pub struct GCounter {
125    /// Per-node counts
126    counts: BTreeMap<u32, u64>,
127}
128
129impl GCounter {
130    /// Create a new empty counter
131    pub fn new() -> Self {
132        Self {
133            counts: BTreeMap::new(),
134        }
135    }
136
137    /// Get the total count
138    pub fn value(&self) -> u64 {
139        self.counts.values().sum()
140    }
141
142    /// Increment the counter for a node
143    pub fn increment(&mut self, node_id: &NodeId, amount: u64) {
144        let count = self.counts.entry(node_id.as_u32()).or_insert(0);
145        *count = count.saturating_add(amount);
146    }
147
148    /// Get the count for a specific node
149    pub fn node_count(&self, node_id: &NodeId) -> u64 {
150        self.counts.get(&node_id.as_u32()).copied().unwrap_or(0)
151    }
152
153    /// Merge with another counter
154    ///
155    /// Takes the max of each node's count
156    pub fn merge(&mut self, other: &GCounter) {
157        for (&node_id, &count) in &other.counts {
158            let our_count = self.counts.entry(node_id).or_insert(0);
159            *our_count = (*our_count).max(count);
160        }
161    }
162
163    /// Get the number of nodes that have contributed
164    pub fn node_count_total(&self) -> usize {
165        self.counts.len()
166    }
167
168    /// Get all entries as (node_id, count) pairs
169    ///
170    /// Returns an iterator over all nodes and their counts.
171    /// Used for building delta documents.
172    pub fn entries(&self) -> impl Iterator<Item = (u32, u64)> + '_ {
173        self.counts.iter().map(|(&k, &v)| (k, v))
174    }
175
176    /// Encode to bytes for transmission
177    pub fn encode(&self) -> Vec<u8> {
178        let mut buf = Vec::with_capacity(4 + self.counts.len() * 12);
179        // Number of entries
180        buf.extend_from_slice(&(self.counts.len() as u32).to_le_bytes());
181        // Each entry: node_id (4 bytes) + count (8 bytes)
182        for (&node_id, &count) in &self.counts {
183            buf.extend_from_slice(&node_id.to_le_bytes());
184            buf.extend_from_slice(&count.to_le_bytes());
185        }
186        buf
187    }
188
189    /// Decode from bytes
190    pub fn decode(data: &[u8]) -> Option<Self> {
191        if data.len() < 4 {
192            return None;
193        }
194        let num_entries = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
195        if data.len() < 4 + num_entries * 12 {
196            return None;
197        }
198
199        let mut counts = BTreeMap::new();
200        let mut offset = 4;
201        for _ in 0..num_entries {
202            let node_id = u32::from_le_bytes([
203                data[offset],
204                data[offset + 1],
205                data[offset + 2],
206                data[offset + 3],
207            ]);
208            let count = u64::from_le_bytes([
209                data[offset + 4],
210                data[offset + 5],
211                data[offset + 6],
212                data[offset + 7],
213                data[offset + 8],
214                data[offset + 9],
215                data[offset + 10],
216                data[offset + 11],
217            ]);
218            counts.insert(node_id, count);
219            offset += 12;
220        }
221
222        Some(Self { counts })
223    }
224}
225
226/// Position data with LWW semantics
227#[derive(Debug, Clone, Default, PartialEq)]
228pub struct Position {
229    /// Latitude in degrees
230    pub latitude: f32,
231    /// Longitude in degrees
232    pub longitude: f32,
233    /// Altitude in meters (optional)
234    pub altitude: Option<f32>,
235    /// Accuracy in meters (optional)
236    pub accuracy: Option<f32>,
237}
238
239impl Position {
240    /// Create a new position
241    pub fn new(latitude: f32, longitude: f32) -> Self {
242        Self {
243            latitude,
244            longitude,
245            altitude: None,
246            accuracy: None,
247        }
248    }
249
250    /// Create with altitude
251    pub fn with_altitude(mut self, altitude: f32) -> Self {
252        self.altitude = Some(altitude);
253        self
254    }
255
256    /// Create with accuracy
257    pub fn with_accuracy(mut self, accuracy: f32) -> Self {
258        self.accuracy = Some(accuracy);
259        self
260    }
261
262    /// Encode to bytes (12-20 bytes)
263    pub fn encode(&self) -> Vec<u8> {
264        let mut buf = Vec::with_capacity(20);
265        buf.extend_from_slice(&self.latitude.to_le_bytes());
266        buf.extend_from_slice(&self.longitude.to_le_bytes());
267
268        // Flags byte: bit 0 = has altitude, bit 1 = has accuracy
269        let mut flags = 0u8;
270        if self.altitude.is_some() {
271            flags |= 0x01;
272        }
273        if self.accuracy.is_some() {
274            flags |= 0x02;
275        }
276        buf.push(flags);
277
278        if let Some(alt) = self.altitude {
279            buf.extend_from_slice(&alt.to_le_bytes());
280        }
281        if let Some(acc) = self.accuracy {
282            buf.extend_from_slice(&acc.to_le_bytes());
283        }
284        buf
285    }
286
287    /// Decode from bytes
288    pub fn decode(data: &[u8]) -> Option<Self> {
289        if data.len() < 9 {
290            return None;
291        }
292
293        let latitude = f32::from_le_bytes([data[0], data[1], data[2], data[3]]);
294        let longitude = f32::from_le_bytes([data[4], data[5], data[6], data[7]]);
295        let flags = data[8];
296
297        let mut pos = Self::new(latitude, longitude);
298        let mut offset = 9;
299
300        if flags & 0x01 != 0 {
301            if data.len() < offset + 4 {
302                return None;
303            }
304            pos.altitude = Some(f32::from_le_bytes([
305                data[offset],
306                data[offset + 1],
307                data[offset + 2],
308                data[offset + 3],
309            ]));
310            offset += 4;
311        }
312
313        if flags & 0x02 != 0 {
314            if data.len() < offset + 4 {
315                return None;
316            }
317            pos.accuracy = Some(f32::from_le_bytes([
318                data[offset],
319                data[offset + 1],
320                data[offset + 2],
321                data[offset + 3],
322            ]));
323        }
324
325        Some(pos)
326    }
327}
328
329/// Health status data with LWW semantics
330#[derive(Debug, Clone, Default, PartialEq)]
331pub struct HealthStatus {
332    /// Battery percentage (0-100)
333    pub battery_percent: u8,
334    /// Heart rate BPM (optional)
335    pub heart_rate: Option<u8>,
336    /// Activity level (0=still, 1=walking, 2=running, 3=vehicle)
337    pub activity: u8,
338    /// Alert status flags
339    pub alerts: u8,
340}
341
342impl HealthStatus {
343    /// Alert flag: Man down
344    pub const ALERT_MAN_DOWN: u8 = 0x01;
345    /// Alert flag: Low battery
346    pub const ALERT_LOW_BATTERY: u8 = 0x02;
347    /// Alert flag: Out of range
348    pub const ALERT_OUT_OF_RANGE: u8 = 0x04;
349    /// Alert flag: Custom alert 1
350    pub const ALERT_CUSTOM_1: u8 = 0x08;
351
352    /// Create a new health status
353    pub fn new(battery_percent: u8) -> Self {
354        Self {
355            battery_percent,
356            heart_rate: None,
357            activity: 0,
358            alerts: 0,
359        }
360    }
361
362    /// Set heart rate
363    pub fn with_heart_rate(mut self, hr: u8) -> Self {
364        self.heart_rate = Some(hr);
365        self
366    }
367
368    /// Set activity level
369    pub fn with_activity(mut self, activity: u8) -> Self {
370        self.activity = activity;
371        self
372    }
373
374    /// Set alert flag
375    pub fn set_alert(&mut self, flag: u8) {
376        self.alerts |= flag;
377    }
378
379    /// Clear alert flag
380    pub fn clear_alert(&mut self, flag: u8) {
381        self.alerts &= !flag;
382    }
383
384    /// Check if alert is set
385    pub fn has_alert(&self, flag: u8) -> bool {
386        self.alerts & flag != 0
387    }
388
389    /// Encode to bytes (3-4 bytes)
390    pub fn encode(&self) -> Vec<u8> {
391        vec![
392            self.battery_percent,
393            self.activity,
394            self.alerts,
395            // Heart rate: 0 means not present, otherwise value
396            self.heart_rate.unwrap_or(0),
397        ]
398    }
399
400    /// Decode from bytes
401    pub fn decode(data: &[u8]) -> Option<Self> {
402        if data.len() < 4 {
403            return None;
404        }
405        let mut status = Self::new(data[0]);
406        status.activity = data[1];
407        status.alerts = data[2];
408        if data[3] > 0 {
409            status.heart_rate = Some(data[3]);
410        }
411        Some(status)
412    }
413}
414
415// ============================================================================
416// Peripheral (Sub-node) Types - for soldier-attached devices like M5Stack Core2
417// ============================================================================
418
419/// Type of peripheral device
420#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
421#[repr(u8)]
422pub enum PeripheralType {
423    /// Unknown/unspecified
424    #[default]
425    Unknown = 0,
426    /// Soldier-worn sensor (e.g., M5Stack Core2)
427    SoldierSensor = 1,
428    /// Fixed/stationary sensor
429    FixedSensor = 2,
430    /// Mesh relay only (no sensors)
431    Relay = 3,
432}
433
434impl PeripheralType {
435    /// Convert from u8 value
436    pub fn from_u8(v: u8) -> Self {
437        match v {
438            1 => Self::SoldierSensor,
439            2 => Self::FixedSensor,
440            3 => Self::Relay,
441            _ => Self::Unknown,
442        }
443    }
444}
445
446/// Event types that a peripheral can emit (e.g., from tap input)
447#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
448#[repr(u8)]
449pub enum EventType {
450    /// No event / cleared
451    #[default]
452    None = 0,
453    /// "I'm OK" ping
454    Ping = 1,
455    /// Request assistance
456    NeedAssist = 2,
457    /// Emergency / SOS
458    Emergency = 3,
459    /// Moving / in transit
460    Moving = 4,
461    /// In position / stationary
462    InPosition = 5,
463    /// Acknowledged / copy
464    Ack = 6,
465}
466
467impl EventType {
468    /// Convert from u8 value
469    pub fn from_u8(v: u8) -> Self {
470        match v {
471            1 => Self::Ping,
472            2 => Self::NeedAssist,
473            3 => Self::Emergency,
474            4 => Self::Moving,
475            5 => Self::InPosition,
476            6 => Self::Ack,
477            _ => Self::None,
478        }
479    }
480
481    /// Human-readable label for display
482    pub fn label(&self) -> &'static str {
483        match self {
484            Self::None => "",
485            Self::Ping => "PING",
486            Self::NeedAssist => "NEED ASSIST",
487            Self::Emergency => "EMERGENCY",
488            Self::Moving => "MOVING",
489            Self::InPosition => "IN POSITION",
490            Self::Ack => "ACK",
491        }
492    }
493}
494
495/// An event emitted by a peripheral (e.g., tap on Core2)
496#[derive(Debug, Clone, Default, PartialEq)]
497pub struct PeripheralEvent {
498    /// Type of event
499    pub event_type: EventType,
500    /// Timestamp when event occurred (ms since epoch or boot)
501    pub timestamp: u64,
502}
503
504impl PeripheralEvent {
505    /// Create a new peripheral event
506    pub fn new(event_type: EventType, timestamp: u64) -> Self {
507        Self {
508            event_type,
509            timestamp,
510        }
511    }
512
513    /// Encode to bytes (9 bytes)
514    pub fn encode(&self) -> Vec<u8> {
515        let mut buf = Vec::with_capacity(9);
516        buf.push(self.event_type as u8);
517        buf.extend_from_slice(&self.timestamp.to_le_bytes());
518        buf
519    }
520
521    /// Decode from bytes
522    pub fn decode(data: &[u8]) -> Option<Self> {
523        if data.len() < 9 {
524            return None;
525        }
526        Some(Self {
527            event_type: EventType::from_u8(data[0]),
528            timestamp: u64::from_le_bytes([
529                data[1], data[2], data[3], data[4], data[5], data[6], data[7], data[8],
530            ]),
531        })
532    }
533}
534
535/// An emergency event with acknowledgment tracking (CRDT)
536///
537/// Represents a single emergency incident with distributed ACK tracking.
538/// Each node in the mesh can acknowledge the emergency, and this state
539/// is replicated across all nodes using CRDT semantics.
540///
541/// ## CRDT Semantics
542///
543/// - **Identity**: Events are uniquely identified by (source_node, timestamp)
544/// - **Merge for same event**: ACK maps merge with OR (once acked, stays acked)
545/// - **Merge for different events**: Higher timestamp wins (newer emergency replaces older)
546/// - **Monotonic**: ACK state only moves from false → true, never back
547///
548/// ## Wire Format
549///
550/// ```text
551/// source_node: 4 bytes (LE)
552/// timestamp:   8 bytes (LE)
553/// num_acks:    4 bytes (LE)
554/// acks[N]:
555///   node_id:   4 bytes (LE)
556///   acked:     1 byte (0 or 1)
557/// ```
558#[derive(Debug, Clone, PartialEq, Default)]
559pub struct EmergencyEvent {
560    /// Node that triggered the emergency
561    source_node: u32,
562    /// Timestamp when emergency was triggered (for uniqueness)
563    timestamp: u64,
564    /// ACK status for each known peer: node_id -> has_acked
565    acks: BTreeMap<u32, bool>,
566}
567
568impl EmergencyEvent {
569    /// Create a new emergency event
570    ///
571    /// # Arguments
572    /// * `source_node` - Node ID that triggered the emergency
573    /// * `timestamp` - When the emergency was triggered
574    /// * `known_peers` - List of peer node IDs to track for ACKs
575    ///
576    /// The source node is automatically marked as acknowledged.
577    pub fn new(source_node: u32, timestamp: u64, known_peers: &[u32]) -> Self {
578        let mut acks = BTreeMap::new();
579
580        // Source node implicitly ACKs their own emergency
581        acks.insert(source_node, true);
582
583        // All other known peers start as not-acked
584        for &peer_id in known_peers {
585            if peer_id != source_node {
586                acks.entry(peer_id).or_insert(false);
587            }
588        }
589
590        Self {
591            source_node,
592            timestamp,
593            acks,
594        }
595    }
596
597    /// Get the source node that triggered the emergency
598    pub fn source_node(&self) -> u32 {
599        self.source_node
600    }
601
602    /// Get the timestamp when the emergency was triggered
603    pub fn timestamp(&self) -> u64 {
604        self.timestamp
605    }
606
607    /// Check if a specific node has acknowledged
608    pub fn has_acked(&self, node_id: u32) -> bool {
609        self.acks.get(&node_id).copied().unwrap_or(false)
610    }
611
612    /// Record an acknowledgment from a node
613    ///
614    /// Returns true if this was a new ACK (state changed)
615    pub fn ack(&mut self, node_id: u32) -> bool {
616        let entry = self.acks.entry(node_id).or_insert(false);
617        if !*entry {
618            *entry = true;
619            true
620        } else {
621            false
622        }
623    }
624
625    /// Add a peer to track (if not already present)
626    ///
627    /// New peers start as not-acked. This is useful when discovering
628    /// new peers after the emergency was created.
629    pub fn add_peer(&mut self, node_id: u32) {
630        self.acks.entry(node_id).or_insert(false);
631    }
632
633    /// Get list of nodes that have acknowledged
634    pub fn acked_nodes(&self) -> Vec<u32> {
635        self.acks
636            .iter()
637            .filter(|(_, &acked)| acked)
638            .map(|(&node_id, _)| node_id)
639            .collect()
640    }
641
642    /// Get list of nodes that have NOT acknowledged
643    pub fn pending_nodes(&self) -> Vec<u32> {
644        self.acks
645            .iter()
646            .filter(|(_, &acked)| !acked)
647            .map(|(&node_id, _)| node_id)
648            .collect()
649    }
650
651    /// Get all tracked node IDs (both acked and pending)
652    pub fn all_nodes(&self) -> Vec<u32> {
653        self.acks.keys().copied().collect()
654    }
655
656    /// Check if all tracked nodes have acknowledged
657    pub fn all_acked(&self) -> bool {
658        !self.acks.is_empty() && self.acks.values().all(|&acked| acked)
659    }
660
661    /// Get the total number of tracked nodes
662    pub fn peer_count(&self) -> usize {
663        self.acks.len()
664    }
665
666    /// Get the number of nodes that have acknowledged
667    pub fn ack_count(&self) -> usize {
668        self.acks.values().filter(|&&acked| acked).count()
669    }
670
671    /// Merge with another emergency event (CRDT semantics)
672    ///
673    /// # Returns
674    /// `true` if our state changed
675    ///
676    /// # Semantics
677    /// - Same event (source_node, timestamp): merge ACK maps with OR
678    /// - Different event: take the one with higher timestamp
679    pub fn merge(&mut self, other: &EmergencyEvent) -> bool {
680        // Different emergency - take newer one
681        if self.source_node != other.source_node || self.timestamp != other.timestamp {
682            if other.timestamp > self.timestamp {
683                *self = other.clone();
684                return true;
685            }
686            return false;
687        }
688
689        // Same emergency - merge ACK maps with OR
690        let mut changed = false;
691        for (&node_id, &other_acked) in &other.acks {
692            let entry = self.acks.entry(node_id).or_insert(false);
693            if other_acked && !*entry {
694                *entry = true;
695                changed = true;
696            }
697        }
698        changed
699    }
700
701    /// Encode to bytes for transmission
702    ///
703    /// Format: source_node(4) + timestamp(8) + num_acks(4) + acks[N](5 each)
704    pub fn encode(&self) -> Vec<u8> {
705        let mut buf = Vec::with_capacity(16 + self.acks.len() * 5);
706
707        buf.extend_from_slice(&self.source_node.to_le_bytes());
708        buf.extend_from_slice(&self.timestamp.to_le_bytes());
709        buf.extend_from_slice(&(self.acks.len() as u32).to_le_bytes());
710
711        for (&node_id, &acked) in &self.acks {
712            buf.extend_from_slice(&node_id.to_le_bytes());
713            buf.push(if acked { 1 } else { 0 });
714        }
715
716        buf
717    }
718
719    /// Decode from bytes
720    pub fn decode(data: &[u8]) -> Option<Self> {
721        if data.len() < 16 {
722            return None;
723        }
724
725        let source_node = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
726        let timestamp = u64::from_le_bytes([
727            data[4], data[5], data[6], data[7], data[8], data[9], data[10], data[11],
728        ]);
729        let num_acks = u32::from_le_bytes([data[12], data[13], data[14], data[15]]) as usize;
730
731        if data.len() < 16 + num_acks * 5 {
732            return None;
733        }
734
735        let mut acks = BTreeMap::new();
736        let mut offset = 16;
737        for _ in 0..num_acks {
738            let node_id = u32::from_le_bytes([
739                data[offset],
740                data[offset + 1],
741                data[offset + 2],
742                data[offset + 3],
743            ]);
744            let acked = data[offset + 4] != 0;
745            acks.insert(node_id, acked);
746            offset += 5;
747        }
748
749        Some(Self {
750            source_node,
751            timestamp,
752            acks,
753        })
754    }
755}
756
757/// A peripheral device attached to a Node (soldier)
758///
759/// Peripherals are sub-tier devices that augment a soldier's capabilities
760/// with sensors and input (e.g., M5Stack Core2 wearable).
761#[derive(Debug, Clone, Default)]
762pub struct Peripheral {
763    /// Unique peripheral ID (derived from device MAC or similar)
764    pub id: u32,
765    /// Parent Node ID this peripheral is attached to (0 if not paired)
766    pub parent_node: u32,
767    /// Type of peripheral
768    pub peripheral_type: PeripheralType,
769    /// Callsign/name (inherited from parent or configured)
770    pub callsign: [u8; 12],
771    /// Current health status
772    pub health: HealthStatus,
773    /// Most recent event (if any)
774    pub last_event: Option<PeripheralEvent>,
775    /// Last update timestamp
776    pub timestamp: u64,
777}
778
779impl Peripheral {
780    /// Create a new peripheral
781    pub fn new(id: u32, peripheral_type: PeripheralType) -> Self {
782        Self {
783            id,
784            parent_node: 0,
785            peripheral_type,
786            callsign: [0u8; 12],
787            health: HealthStatus::default(),
788            last_event: None,
789            timestamp: 0,
790        }
791    }
792
793    /// Set the callsign (truncated to 12 bytes)
794    pub fn with_callsign(mut self, callsign: &str) -> Self {
795        let bytes = callsign.as_bytes();
796        let len = bytes.len().min(12);
797        self.callsign[..len].copy_from_slice(&bytes[..len]);
798        self
799    }
800
801    /// Get callsign as string
802    pub fn callsign_str(&self) -> &str {
803        let len = self.callsign.iter().position(|&b| b == 0).unwrap_or(12);
804        core::str::from_utf8(&self.callsign[..len]).unwrap_or("")
805    }
806
807    /// Set parent node
808    pub fn with_parent(mut self, parent_node: u32) -> Self {
809        self.parent_node = parent_node;
810        self
811    }
812
813    /// Record an event
814    pub fn set_event(&mut self, event_type: EventType, timestamp: u64) {
815        self.last_event = Some(PeripheralEvent::new(event_type, timestamp));
816        self.timestamp = timestamp;
817    }
818
819    /// Clear the last event
820    pub fn clear_event(&mut self) {
821        self.last_event = None;
822    }
823
824    /// Encode to bytes for BLE transmission
825    /// Format: [id:4][parent:4][type:1][callsign:12][health:4][has_event:1][event:9?][timestamp:8]
826    /// Size: 34 bytes without event, 43 bytes with event
827    pub fn encode(&self) -> Vec<u8> {
828        let mut buf = Vec::with_capacity(43);
829        buf.extend_from_slice(&self.id.to_le_bytes());
830        buf.extend_from_slice(&self.parent_node.to_le_bytes());
831        buf.push(self.peripheral_type as u8);
832        buf.extend_from_slice(&self.callsign);
833        buf.extend_from_slice(&self.health.encode());
834
835        if let Some(ref event) = self.last_event {
836            buf.push(1); // has event
837            buf.extend_from_slice(&event.encode());
838        } else {
839            buf.push(0); // no event
840        }
841
842        buf.extend_from_slice(&self.timestamp.to_le_bytes());
843        buf
844    }
845
846    /// Decode from bytes
847    pub fn decode(data: &[u8]) -> Option<Self> {
848        if data.len() < 34 {
849            return None;
850        }
851
852        let id = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
853        let parent_node = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
854        let peripheral_type = PeripheralType::from_u8(data[8]);
855
856        let mut callsign = [0u8; 12];
857        callsign.copy_from_slice(&data[9..21]);
858
859        let health = HealthStatus::decode(&data[21..25])?;
860
861        let has_event = data[25] != 0;
862        let (last_event, timestamp_offset) = if has_event {
863            if data.len() < 43 {
864                return None;
865            }
866            (PeripheralEvent::decode(&data[26..35]), 35)
867        } else {
868            (None, 26)
869        };
870
871        if data.len() < timestamp_offset + 8 {
872            return None;
873        }
874
875        let timestamp = u64::from_le_bytes([
876            data[timestamp_offset],
877            data[timestamp_offset + 1],
878            data[timestamp_offset + 2],
879            data[timestamp_offset + 3],
880            data[timestamp_offset + 4],
881            data[timestamp_offset + 5],
882            data[timestamp_offset + 6],
883            data[timestamp_offset + 7],
884        ]);
885
886        Some(Self {
887            id,
888            parent_node,
889            peripheral_type,
890            callsign,
891            health,
892            last_event,
893            timestamp,
894        })
895    }
896}
897
898// ============================================================================
899// ChatCRDT - Add-only set of chat messages for mesh-wide messaging
900// ============================================================================
901
902/// Maximum message text length in bytes
903pub const CHAT_MAX_TEXT_LEN: usize = 128;
904
905/// Maximum sender name length in bytes
906pub const CHAT_MAX_SENDER_LEN: usize = 12;
907
908/// Maximum number of messages to retain in the CRDT
909///
910/// Older messages are pruned to keep memory bounded on embedded devices.
911pub const CHAT_MAX_MESSAGES: usize = 32;
912
913/// Maximum number of chat messages to include in sync documents
914///
915/// BLE GATT notifications have a limited MTU (typically 512 bytes).
916/// To avoid exceeding this limit, sync documents only include the
917/// most recent messages. The full history is kept in the local CRDT.
918pub const CHAT_SYNC_LIMIT: usize = 8;
919
920/// A single chat message in the mesh
921///
922/// Messages are uniquely identified by `(origin_node, timestamp)`.
923/// This allows deduplication across mesh sync while preserving message ordering.
924#[derive(Debug, Clone, PartialEq)]
925pub struct ChatMessage {
926    /// Node that originated this message
927    pub origin_node: u32,
928    /// Timestamp when message was created (ms since epoch)
929    pub timestamp: u64,
930    /// Sender name/callsign (up to 12 bytes)
931    sender: [u8; CHAT_MAX_SENDER_LEN],
932    sender_len: u8,
933    /// Message text (up to 128 bytes)
934    text: [u8; CHAT_MAX_TEXT_LEN],
935    text_len: u8,
936    /// Whether this is a broadcast message (vs directed)
937    pub is_broadcast: bool,
938    /// Whether ACK is requested
939    pub requires_ack: bool,
940    /// Reply-to: origin node of the message being replied to (0 = not a reply)
941    pub reply_to_node: u32,
942    /// Reply-to: timestamp of the message being replied to (0 = not a reply)
943    pub reply_to_timestamp: u64,
944}
945
946impl Default for ChatMessage {
947    fn default() -> Self {
948        Self {
949            origin_node: 0,
950            timestamp: 0,
951            sender: [0u8; CHAT_MAX_SENDER_LEN],
952            sender_len: 0,
953            text: [0u8; CHAT_MAX_TEXT_LEN],
954            text_len: 0,
955            is_broadcast: true,
956            requires_ack: false,
957            reply_to_node: 0,
958            reply_to_timestamp: 0,
959        }
960    }
961}
962
963impl ChatMessage {
964    /// Create a new chat message
965    pub fn new(origin_node: u32, timestamp: u64, sender: &str, text: &str) -> Self {
966        let mut msg = Self {
967            origin_node,
968            timestamp,
969            ..Default::default()
970        };
971        msg.set_sender(sender);
972        msg.set_text(text);
973        msg
974    }
975
976    /// Set the sender name (truncated to 12 bytes)
977    pub fn set_sender(&mut self, sender: &str) {
978        let bytes = sender.as_bytes();
979        let len = bytes.len().min(CHAT_MAX_SENDER_LEN);
980        self.sender[..len].copy_from_slice(&bytes[..len]);
981        self.sender_len = len as u8;
982    }
983
984    /// Get the sender name as a string
985    pub fn sender(&self) -> &str {
986        core::str::from_utf8(&self.sender[..self.sender_len as usize]).unwrap_or("")
987    }
988
989    /// Set the message text (truncated to 128 bytes)
990    pub fn set_text(&mut self, text: &str) {
991        let bytes = text.as_bytes();
992        let len = bytes.len().min(CHAT_MAX_TEXT_LEN);
993        self.text[..len].copy_from_slice(&bytes[..len]);
994        self.text_len = len as u8;
995    }
996
997    /// Get the message text as a string
998    pub fn text(&self) -> &str {
999        core::str::from_utf8(&self.text[..self.text_len as usize]).unwrap_or("")
1000    }
1001
1002    /// Set reply-to information
1003    pub fn set_reply_to(&mut self, node: u32, timestamp: u64) {
1004        self.reply_to_node = node;
1005        self.reply_to_timestamp = timestamp;
1006    }
1007
1008    /// Check if this is a reply to another message
1009    pub fn is_reply(&self) -> bool {
1010        self.reply_to_node != 0 || self.reply_to_timestamp != 0
1011    }
1012
1013    /// Get the unique message ID (combines origin_node and timestamp)
1014    ///
1015    /// Format: `(origin_node as u64) << 32 | (timestamp & 0xFFFFFFFF)`
1016    /// This provides a sortable key where messages from same node are ordered by time.
1017    pub fn message_id(&self) -> u64 {
1018        ((self.origin_node as u64) << 32) | (self.timestamp & 0xFFFFFFFF)
1019    }
1020
1021    /// Encode to bytes for transmission
1022    ///
1023    /// Wire format:
1024    /// ```text
1025    /// origin_node:       4 bytes (LE)
1026    /// timestamp:         8 bytes (LE)
1027    /// sender_len:        1 byte
1028    /// sender:            sender_len bytes
1029    /// text_len:          1 byte
1030    /// text:              text_len bytes
1031    /// flags:             1 byte (bit 0: is_broadcast, bit 1: requires_ack)
1032    /// reply_to_node:     4 bytes (LE)
1033    /// reply_to_timestamp: 8 bytes (LE)
1034    /// ```
1035    pub fn encode(&self) -> Vec<u8> {
1036        let size = 4 + 8 + 1 + self.sender_len as usize + 1 + self.text_len as usize + 1 + 4 + 8;
1037        let mut buf = Vec::with_capacity(size);
1038
1039        buf.extend_from_slice(&self.origin_node.to_le_bytes());
1040        buf.extend_from_slice(&self.timestamp.to_le_bytes());
1041        buf.push(self.sender_len);
1042        buf.extend_from_slice(&self.sender[..self.sender_len as usize]);
1043        buf.push(self.text_len);
1044        buf.extend_from_slice(&self.text[..self.text_len as usize]);
1045
1046        let mut flags = 0u8;
1047        if self.is_broadcast {
1048            flags |= 0x01;
1049        }
1050        if self.requires_ack {
1051            flags |= 0x02;
1052        }
1053        buf.push(flags);
1054
1055        buf.extend_from_slice(&self.reply_to_node.to_le_bytes());
1056        buf.extend_from_slice(&self.reply_to_timestamp.to_le_bytes());
1057
1058        buf
1059    }
1060
1061    /// Decode from bytes
1062    pub fn decode(data: &[u8]) -> Option<(Self, usize)> {
1063        if data.len() < 14 {
1064            // Minimum: 4 + 8 + 1 + 0 + 1 + 0 + 0 (no reply fields in old format)
1065            return None;
1066        }
1067
1068        let origin_node = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
1069        let timestamp = u64::from_le_bytes([
1070            data[4], data[5], data[6], data[7], data[8], data[9], data[10], data[11],
1071        ]);
1072
1073        let sender_len = data[12] as usize;
1074        if sender_len > CHAT_MAX_SENDER_LEN || data.len() < 13 + sender_len + 1 {
1075            return None;
1076        }
1077
1078        let mut sender = [0u8; CHAT_MAX_SENDER_LEN];
1079        sender[..sender_len].copy_from_slice(&data[13..13 + sender_len]);
1080
1081        let text_offset = 13 + sender_len;
1082        let text_len = data[text_offset] as usize;
1083        if text_len > CHAT_MAX_TEXT_LEN || data.len() < text_offset + 1 + text_len + 1 {
1084            return None;
1085        }
1086
1087        let mut text = [0u8; CHAT_MAX_TEXT_LEN];
1088        text[..text_len].copy_from_slice(&data[text_offset + 1..text_offset + 1 + text_len]);
1089
1090        let flags_offset = text_offset + 1 + text_len;
1091        let flags = data[flags_offset];
1092        let is_broadcast = flags & 0x01 != 0;
1093        let requires_ack = flags & 0x02 != 0;
1094
1095        // Reply-to fields (optional for backward compat)
1096        let mut reply_to_node = 0u32;
1097        let mut reply_to_timestamp = 0u64;
1098        let mut total_len = flags_offset + 1;
1099
1100        if data.len() >= flags_offset + 1 + 12 {
1101            reply_to_node = u32::from_le_bytes([
1102                data[flags_offset + 1],
1103                data[flags_offset + 2],
1104                data[flags_offset + 3],
1105                data[flags_offset + 4],
1106            ]);
1107            reply_to_timestamp = u64::from_le_bytes([
1108                data[flags_offset + 5],
1109                data[flags_offset + 6],
1110                data[flags_offset + 7],
1111                data[flags_offset + 8],
1112                data[flags_offset + 9],
1113                data[flags_offset + 10],
1114                data[flags_offset + 11],
1115                data[flags_offset + 12],
1116            ]);
1117            total_len = flags_offset + 13;
1118        }
1119
1120        Some((
1121            Self {
1122                origin_node,
1123                timestamp,
1124                sender,
1125                sender_len: sender_len as u8,
1126                text,
1127                text_len: text_len as u8,
1128                is_broadcast,
1129                requires_ack,
1130                reply_to_node,
1131                reply_to_timestamp,
1132            },
1133            total_len,
1134        ))
1135    }
1136}
1137
1138/// Chat CRDT - Add-only set of messages
1139///
1140/// Implements add-only set semantics where messages are identified by
1141/// `(origin_node, timestamp)`. Once a message is added, it cannot be removed
1142/// (tombstone-free design optimized for mesh networks).
1143///
1144/// ## CRDT Semantics
1145///
1146/// - **Merge**: Union of all messages from both sets
1147/// - **Identity**: `(origin_node, timestamp)` - duplicates are ignored
1148/// - **Ordering**: Messages are stored sorted by message_id for efficient iteration
1149/// - **Pruning**: Oldest messages are removed when exceeding `CHAT_MAX_MESSAGES`
1150///
1151/// ## Wire Format
1152///
1153/// ```text
1154/// num_messages: 2 bytes (LE)
1155/// messages[N]:  variable (see ChatMessage::encode)
1156/// ```
1157#[derive(Debug, Clone, Default)]
1158pub struct ChatCRDT {
1159    /// Messages indexed by message_id for deduplication
1160    messages: BTreeMap<u64, ChatMessage>,
1161}
1162
1163impl ChatCRDT {
1164    /// Create a new empty chat CRDT
1165    pub fn new() -> Self {
1166        Self {
1167            messages: BTreeMap::new(),
1168        }
1169    }
1170
1171    /// Add a message to the chat
1172    ///
1173    /// Returns `true` if the message was new (not a duplicate)
1174    pub fn add_message(&mut self, message: ChatMessage) -> bool {
1175        let id = message.message_id();
1176        if self.messages.contains_key(&id) {
1177            return false;
1178        }
1179
1180        self.messages.insert(id, message);
1181        self.prune_if_needed();
1182        true
1183    }
1184
1185    /// Create and add a new message
1186    pub fn send_message(
1187        &mut self,
1188        origin_node: u32,
1189        timestamp: u64,
1190        sender: &str,
1191        text: &str,
1192    ) -> bool {
1193        let msg = ChatMessage::new(origin_node, timestamp, sender, text);
1194        self.add_message(msg)
1195    }
1196
1197    /// Get a message by its ID
1198    pub fn get_message(&self, origin_node: u32, timestamp: u64) -> Option<&ChatMessage> {
1199        let id = ((origin_node as u64) << 32) | (timestamp & 0xFFFFFFFF);
1200        self.messages.get(&id)
1201    }
1202
1203    /// Get all messages, ordered by message_id
1204    pub fn messages(&self) -> impl Iterator<Item = &ChatMessage> {
1205        self.messages.values()
1206    }
1207
1208    /// Get messages newer than a given timestamp
1209    pub fn messages_since(&self, since_timestamp: u64) -> impl Iterator<Item = &ChatMessage> {
1210        self.messages
1211            .values()
1212            .filter(move |m| m.timestamp > since_timestamp)
1213    }
1214
1215    /// Get the number of messages
1216    pub fn len(&self) -> usize {
1217        self.messages.len()
1218    }
1219
1220    /// Check if there are no messages
1221    pub fn is_empty(&self) -> bool {
1222        self.messages.is_empty()
1223    }
1224
1225    /// Get the newest message timestamp (if any)
1226    pub fn newest_timestamp(&self) -> Option<u64> {
1227        self.messages.values().map(|m| m.timestamp).max()
1228    }
1229
1230    /// Merge with another ChatCRDT
1231    ///
1232    /// Returns `true` if any new messages were added
1233    pub fn merge(&mut self, other: &ChatCRDT) -> bool {
1234        let mut changed = false;
1235        for (id, msg) in &other.messages {
1236            if !self.messages.contains_key(id) {
1237                self.messages.insert(*id, msg.clone());
1238                changed = true;
1239            }
1240        }
1241        if changed {
1242            self.prune_if_needed();
1243        }
1244        changed
1245    }
1246
1247    /// Prune oldest messages if we exceed the limit
1248    fn prune_if_needed(&mut self) {
1249        while self.messages.len() > CHAT_MAX_MESSAGES {
1250            // Remove the entry with the lowest timestamp
1251            if let Some(&oldest_id) = self.messages.keys().next() {
1252                self.messages.remove(&oldest_id);
1253            }
1254        }
1255    }
1256
1257    /// Encode to bytes for transmission
1258    pub fn encode(&self) -> Vec<u8> {
1259        let mut buf = Vec::new();
1260
1261        // Number of messages
1262        buf.extend_from_slice(&(self.messages.len() as u16).to_le_bytes());
1263
1264        // Each message
1265        for msg in self.messages.values() {
1266            buf.extend_from_slice(&msg.encode());
1267        }
1268
1269        buf
1270    }
1271
1272    /// Decode from bytes
1273    pub fn decode(data: &[u8]) -> Option<Self> {
1274        if data.len() < 2 {
1275            return None;
1276        }
1277
1278        let num_messages = u16::from_le_bytes([data[0], data[1]]) as usize;
1279        let mut messages = BTreeMap::new();
1280        let mut offset = 2;
1281
1282        for _ in 0..num_messages {
1283            if offset >= data.len() {
1284                break;
1285            }
1286            if let Some((msg, len)) = ChatMessage::decode(&data[offset..]) {
1287                let id = msg.message_id();
1288                messages.insert(id, msg);
1289                offset += len;
1290            } else {
1291                break;
1292            }
1293        }
1294
1295        Some(Self { messages })
1296    }
1297
1298    /// Get the encoded size of this CRDT
1299    pub fn encoded_size(&self) -> usize {
1300        2 + self
1301            .messages
1302            .values()
1303            .map(|m| m.encode().len())
1304            .sum::<usize>()
1305    }
1306
1307    /// Create a copy limited to the most recent messages for sync
1308    ///
1309    /// Returns a new ChatCRDT containing only the N most recent messages,
1310    /// where N is `CHAT_SYNC_LIMIT`. This is used when building sync documents
1311    /// to avoid exceeding BLE MTU limits.
1312    ///
1313    /// The local CRDT retains all messages up to `CHAT_MAX_MESSAGES`.
1314    pub fn for_sync(&self) -> Self {
1315        if self.messages.len() <= CHAT_SYNC_LIMIT {
1316            return self.clone();
1317        }
1318
1319        // BTreeMap is ordered by key (message_id), and message_id encodes
1320        // timestamp in lower bits, so we take from the end for newest
1321        let messages: BTreeMap<u64, ChatMessage> = self
1322            .messages
1323            .iter()
1324            .rev()
1325            .take(CHAT_SYNC_LIMIT)
1326            .map(|(&k, v)| (k, v.clone()))
1327            .collect();
1328
1329        Self { messages }
1330    }
1331}
1332
1333/// CRDT operation types for sync
1334#[derive(Debug, Clone)]
1335pub enum CrdtOperation {
1336    /// Update a position register
1337    UpdatePosition {
1338        /// Node ID that owns this position
1339        node_id: NodeId,
1340        /// Position data
1341        position: Position,
1342        /// Timestamp of the update
1343        timestamp: Timestamp,
1344    },
1345    /// Update health status register
1346    UpdateHealth {
1347        /// Node ID that owns this status
1348        node_id: NodeId,
1349        /// Health status data
1350        status: HealthStatus,
1351        /// Timestamp of the update
1352        timestamp: Timestamp,
1353    },
1354    /// Increment a counter
1355    IncrementCounter {
1356        /// Counter identifier
1357        counter_id: u8,
1358        /// Node performing the increment
1359        node_id: NodeId,
1360        /// Amount to increment
1361        amount: u64,
1362    },
1363    /// Generic LWW update (key-value)
1364    UpdateRegister {
1365        /// Key for the register
1366        key: String,
1367        /// Value data
1368        value: Vec<u8>,
1369        /// Timestamp of the update
1370        timestamp: Timestamp,
1371        /// Node that set the value
1372        node_id: NodeId,
1373    },
1374}
1375
1376impl CrdtOperation {
1377    /// Get the approximate size in bytes
1378    pub fn size(&self) -> usize {
1379        match self {
1380            CrdtOperation::UpdatePosition { position, .. } => 4 + 8 + position.encode().len(),
1381            CrdtOperation::UpdateHealth { status, .. } => 4 + 8 + status.encode().len(),
1382            CrdtOperation::IncrementCounter { .. } => 1 + 4 + 8,
1383            CrdtOperation::UpdateRegister { key, value, .. } => 4 + 8 + key.len() + value.len(),
1384        }
1385    }
1386
1387    /// Encode to bytes
1388    pub fn encode(&self) -> Vec<u8> {
1389        let mut buf = Vec::new();
1390        match self {
1391            CrdtOperation::UpdatePosition {
1392                node_id,
1393                position,
1394                timestamp,
1395            } => {
1396                buf.push(0x01); // Type tag
1397                buf.extend_from_slice(&node_id.as_u32().to_le_bytes());
1398                buf.extend_from_slice(&timestamp.to_le_bytes());
1399                buf.extend_from_slice(&position.encode());
1400            }
1401            CrdtOperation::UpdateHealth {
1402                node_id,
1403                status,
1404                timestamp,
1405            } => {
1406                buf.push(0x02); // Type tag
1407                buf.extend_from_slice(&node_id.as_u32().to_le_bytes());
1408                buf.extend_from_slice(&timestamp.to_le_bytes());
1409                buf.extend_from_slice(&status.encode());
1410            }
1411            CrdtOperation::IncrementCounter {
1412                counter_id,
1413                node_id,
1414                amount,
1415            } => {
1416                buf.push(0x03); // Type tag
1417                buf.push(*counter_id);
1418                buf.extend_from_slice(&node_id.as_u32().to_le_bytes());
1419                buf.extend_from_slice(&amount.to_le_bytes());
1420            }
1421            CrdtOperation::UpdateRegister {
1422                key,
1423                value,
1424                timestamp,
1425                node_id,
1426            } => {
1427                buf.push(0x04); // Type tag
1428                buf.extend_from_slice(&node_id.as_u32().to_le_bytes());
1429                buf.extend_from_slice(&timestamp.to_le_bytes());
1430                buf.push(key.len() as u8);
1431                buf.extend_from_slice(key.as_bytes());
1432                buf.extend_from_slice(&(value.len() as u16).to_le_bytes());
1433                buf.extend_from_slice(value);
1434            }
1435        }
1436        buf
1437    }
1438
1439    /// Decode from bytes
1440    pub fn decode(data: &[u8]) -> Option<Self> {
1441        if data.is_empty() {
1442            return None;
1443        }
1444
1445        match data[0] {
1446            0x01 => {
1447                // UpdatePosition
1448                if data.len() < 13 {
1449                    return None;
1450                }
1451                let node_id = NodeId::new(u32::from_le_bytes([data[1], data[2], data[3], data[4]]));
1452                let timestamp = u64::from_le_bytes([
1453                    data[5], data[6], data[7], data[8], data[9], data[10], data[11], data[12],
1454                ]);
1455                let position = Position::decode(&data[13..])?;
1456                Some(CrdtOperation::UpdatePosition {
1457                    node_id,
1458                    position,
1459                    timestamp,
1460                })
1461            }
1462            0x02 => {
1463                // UpdateHealth
1464                if data.len() < 13 {
1465                    return None;
1466                }
1467                let node_id = NodeId::new(u32::from_le_bytes([data[1], data[2], data[3], data[4]]));
1468                let timestamp = u64::from_le_bytes([
1469                    data[5], data[6], data[7], data[8], data[9], data[10], data[11], data[12],
1470                ]);
1471                let status = HealthStatus::decode(&data[13..])?;
1472                Some(CrdtOperation::UpdateHealth {
1473                    node_id,
1474                    status,
1475                    timestamp,
1476                })
1477            }
1478            0x03 => {
1479                // IncrementCounter
1480                if data.len() < 14 {
1481                    return None;
1482                }
1483                let counter_id = data[1];
1484                let node_id = NodeId::new(u32::from_le_bytes([data[2], data[3], data[4], data[5]]));
1485                let amount = u64::from_le_bytes([
1486                    data[6], data[7], data[8], data[9], data[10], data[11], data[12], data[13],
1487                ]);
1488                Some(CrdtOperation::IncrementCounter {
1489                    counter_id,
1490                    node_id,
1491                    amount,
1492                })
1493            }
1494            0x04 => {
1495                // UpdateRegister
1496                if data.len() < 14 {
1497                    return None;
1498                }
1499                let node_id = NodeId::new(u32::from_le_bytes([data[1], data[2], data[3], data[4]]));
1500                let timestamp = u64::from_le_bytes([
1501                    data[5], data[6], data[7], data[8], data[9], data[10], data[11], data[12],
1502                ]);
1503                let key_len = data[13] as usize;
1504                if data.len() < 14 + key_len + 2 {
1505                    return None;
1506                }
1507                let key = core::str::from_utf8(&data[14..14 + key_len])
1508                    .ok()?
1509                    .to_string();
1510                let value_len =
1511                    u16::from_le_bytes([data[14 + key_len], data[15 + key_len]]) as usize;
1512                if data.len() < 16 + key_len + value_len {
1513                    return None;
1514                }
1515                let value = data[16 + key_len..16 + key_len + value_len].to_vec();
1516                Some(CrdtOperation::UpdateRegister {
1517                    key,
1518                    value,
1519                    timestamp,
1520                    node_id,
1521                })
1522            }
1523            _ => None,
1524        }
1525    }
1526}
1527
1528#[cfg(test)]
1529mod tests {
1530    use super::*;
1531
1532    #[test]
1533    fn test_lww_register_basic() {
1534        let mut reg = LwwRegister::new(42u32, 100, NodeId::new(1));
1535        assert_eq!(*reg.get(), 42);
1536        assert_eq!(reg.timestamp(), 100);
1537
1538        // Higher timestamp wins
1539        assert!(reg.set(99, 200, NodeId::new(2)));
1540        assert_eq!(*reg.get(), 99);
1541
1542        // Lower timestamp loses
1543        assert!(!reg.set(50, 150, NodeId::new(3)));
1544        assert_eq!(*reg.get(), 99);
1545    }
1546
1547    #[test]
1548    fn test_lww_register_tiebreak() {
1549        let mut reg = LwwRegister::new(1u32, 100, NodeId::new(1));
1550
1551        // Same timestamp, higher node_id wins
1552        assert!(reg.set(2, 100, NodeId::new(2)));
1553        assert_eq!(*reg.get(), 2);
1554
1555        // Same timestamp, lower node_id loses
1556        assert!(!reg.set(3, 100, NodeId::new(1)));
1557        assert_eq!(*reg.get(), 2);
1558    }
1559
1560    #[test]
1561    fn test_lww_register_merge() {
1562        let mut reg1 = LwwRegister::new(1u32, 100, NodeId::new(1));
1563        let reg2 = LwwRegister::new(2u32, 200, NodeId::new(2));
1564
1565        assert!(reg1.merge(&reg2));
1566        assert_eq!(*reg1.get(), 2);
1567    }
1568
1569    #[test]
1570    fn test_gcounter_basic() {
1571        let mut counter = GCounter::new();
1572        let node1 = NodeId::new(1);
1573        let node2 = NodeId::new(2);
1574
1575        counter.increment(&node1, 5);
1576        counter.increment(&node2, 3);
1577        counter.increment(&node1, 2);
1578
1579        assert_eq!(counter.value(), 10);
1580        assert_eq!(counter.node_count(&node1), 7);
1581        assert_eq!(counter.node_count(&node2), 3);
1582    }
1583
1584    #[test]
1585    fn test_gcounter_merge() {
1586        let mut counter1 = GCounter::new();
1587        let mut counter2 = GCounter::new();
1588        let node1 = NodeId::new(1);
1589        let node2 = NodeId::new(2);
1590
1591        counter1.increment(&node1, 5);
1592        counter2.increment(&node1, 3);
1593        counter2.increment(&node2, 4);
1594
1595        counter1.merge(&counter2);
1596
1597        assert_eq!(counter1.value(), 9); // max(5,3) + 4
1598        assert_eq!(counter1.node_count(&node1), 5);
1599        assert_eq!(counter1.node_count(&node2), 4);
1600    }
1601
1602    #[test]
1603    fn test_gcounter_encode_decode() {
1604        let mut counter = GCounter::new();
1605        counter.increment(&NodeId::new(1), 5);
1606        counter.increment(&NodeId::new(2), 10);
1607
1608        let encoded = counter.encode();
1609        let decoded = GCounter::decode(&encoded).unwrap();
1610
1611        assert_eq!(decoded.value(), counter.value());
1612        assert_eq!(decoded.node_count(&NodeId::new(1)), 5);
1613        assert_eq!(decoded.node_count(&NodeId::new(2)), 10);
1614    }
1615
1616    #[test]
1617    fn test_position_encode_decode() {
1618        let pos = Position::new(37.7749, -122.4194)
1619            .with_altitude(100.0)
1620            .with_accuracy(5.0);
1621
1622        let encoded = pos.encode();
1623        let decoded = Position::decode(&encoded).unwrap();
1624
1625        assert_eq!(decoded.latitude, pos.latitude);
1626        assert_eq!(decoded.longitude, pos.longitude);
1627        assert_eq!(decoded.altitude, pos.altitude);
1628        assert_eq!(decoded.accuracy, pos.accuracy);
1629    }
1630
1631    #[test]
1632    fn test_position_minimal_encode() {
1633        let pos = Position::new(0.0, 0.0);
1634        let encoded = pos.encode();
1635        assert_eq!(encoded.len(), 9); // lat + lon + flags
1636
1637        let pos_with_alt = Position::new(0.0, 0.0).with_altitude(0.0);
1638        let encoded_alt = pos_with_alt.encode();
1639        assert_eq!(encoded_alt.len(), 13);
1640    }
1641
1642    #[test]
1643    fn test_health_status() {
1644        let mut status = HealthStatus::new(85).with_heart_rate(72).with_activity(1);
1645
1646        assert_eq!(status.battery_percent, 85);
1647        assert_eq!(status.heart_rate, Some(72));
1648        assert!(!status.has_alert(HealthStatus::ALERT_MAN_DOWN));
1649
1650        status.set_alert(HealthStatus::ALERT_MAN_DOWN);
1651        assert!(status.has_alert(HealthStatus::ALERT_MAN_DOWN));
1652
1653        let encoded = status.encode();
1654        let decoded = HealthStatus::decode(&encoded).unwrap();
1655        assert_eq!(decoded.battery_percent, 85);
1656        assert_eq!(decoded.heart_rate, Some(72));
1657        assert!(decoded.has_alert(HealthStatus::ALERT_MAN_DOWN));
1658    }
1659
1660    #[test]
1661    fn test_crdt_operation_position() {
1662        let op = CrdtOperation::UpdatePosition {
1663            node_id: NodeId::new(0x1234),
1664            position: Position::new(37.0, -122.0),
1665            timestamp: 1000,
1666        };
1667
1668        let encoded = op.encode();
1669        let decoded = CrdtOperation::decode(&encoded).unwrap();
1670
1671        if let CrdtOperation::UpdatePosition {
1672            node_id,
1673            position,
1674            timestamp,
1675        } = decoded
1676        {
1677            assert_eq!(node_id.as_u32(), 0x1234);
1678            assert_eq!(timestamp, 1000);
1679            assert_eq!(position.latitude, 37.0);
1680        } else {
1681            panic!("Wrong operation type");
1682        }
1683    }
1684
1685    #[test]
1686    fn test_crdt_operation_counter() {
1687        let op = CrdtOperation::IncrementCounter {
1688            counter_id: 1,
1689            node_id: NodeId::new(0x5678),
1690            amount: 42,
1691        };
1692
1693        let encoded = op.encode();
1694        let decoded = CrdtOperation::decode(&encoded).unwrap();
1695
1696        if let CrdtOperation::IncrementCounter {
1697            counter_id,
1698            node_id,
1699            amount,
1700        } = decoded
1701        {
1702            assert_eq!(counter_id, 1);
1703            assert_eq!(node_id.as_u32(), 0x5678);
1704            assert_eq!(amount, 42);
1705        } else {
1706            panic!("Wrong operation type");
1707        }
1708    }
1709
1710    #[test]
1711    fn test_crdt_operation_size() {
1712        let pos_op = CrdtOperation::UpdatePosition {
1713            node_id: NodeId::new(1),
1714            position: Position::new(0.0, 0.0),
1715            timestamp: 0,
1716        };
1717        assert!(pos_op.size() > 0);
1718
1719        let counter_op = CrdtOperation::IncrementCounter {
1720            counter_id: 0,
1721            node_id: NodeId::new(1),
1722            amount: 1,
1723        };
1724        assert_eq!(counter_op.size(), 13);
1725    }
1726
1727    // ============================================================================
1728    // Peripheral Tests
1729    // ============================================================================
1730
1731    #[test]
1732    fn test_peripheral_type_from_u8() {
1733        assert_eq!(PeripheralType::from_u8(0), PeripheralType::Unknown);
1734        assert_eq!(PeripheralType::from_u8(1), PeripheralType::SoldierSensor);
1735        assert_eq!(PeripheralType::from_u8(2), PeripheralType::FixedSensor);
1736        assert_eq!(PeripheralType::from_u8(3), PeripheralType::Relay);
1737        assert_eq!(PeripheralType::from_u8(99), PeripheralType::Unknown);
1738    }
1739
1740    #[test]
1741    fn test_event_type_from_u8() {
1742        assert_eq!(EventType::from_u8(0), EventType::None);
1743        assert_eq!(EventType::from_u8(1), EventType::Ping);
1744        assert_eq!(EventType::from_u8(2), EventType::NeedAssist);
1745        assert_eq!(EventType::from_u8(3), EventType::Emergency);
1746        assert_eq!(EventType::from_u8(4), EventType::Moving);
1747        assert_eq!(EventType::from_u8(5), EventType::InPosition);
1748        assert_eq!(EventType::from_u8(6), EventType::Ack);
1749        assert_eq!(EventType::from_u8(99), EventType::None);
1750    }
1751
1752    #[test]
1753    fn test_event_type_labels() {
1754        assert_eq!(EventType::None.label(), "");
1755        assert_eq!(EventType::Emergency.label(), "EMERGENCY");
1756        assert_eq!(EventType::Ping.label(), "PING");
1757    }
1758
1759    #[test]
1760    fn test_peripheral_event_encode_decode() {
1761        let event = PeripheralEvent::new(EventType::Emergency, 1234567890);
1762        let encoded = event.encode();
1763        assert_eq!(encoded.len(), 9);
1764
1765        let decoded = PeripheralEvent::decode(&encoded).unwrap();
1766        assert_eq!(decoded.event_type, EventType::Emergency);
1767        assert_eq!(decoded.timestamp, 1234567890);
1768    }
1769
1770    #[test]
1771    fn test_peripheral_new() {
1772        let peripheral = Peripheral::new(0x12345678, PeripheralType::SoldierSensor);
1773        assert_eq!(peripheral.id, 0x12345678);
1774        assert_eq!(peripheral.peripheral_type, PeripheralType::SoldierSensor);
1775        assert_eq!(peripheral.parent_node, 0);
1776        assert!(peripheral.last_event.is_none());
1777    }
1778
1779    #[test]
1780    fn test_peripheral_with_callsign() {
1781        let peripheral = Peripheral::new(1, PeripheralType::SoldierSensor).with_callsign("ALPHA-1");
1782        assert_eq!(peripheral.callsign_str(), "ALPHA-1");
1783
1784        // Test truncation
1785        let peripheral2 = Peripheral::new(2, PeripheralType::SoldierSensor)
1786            .with_callsign("THIS_IS_A_VERY_LONG_CALLSIGN");
1787        assert_eq!(peripheral2.callsign_str(), "THIS_IS_A_VE");
1788    }
1789
1790    #[test]
1791    fn test_peripheral_set_event() {
1792        let mut peripheral = Peripheral::new(1, PeripheralType::SoldierSensor);
1793        peripheral.set_event(EventType::Emergency, 1000);
1794
1795        assert!(peripheral.last_event.is_some());
1796        let event = peripheral.last_event.as_ref().unwrap();
1797        assert_eq!(event.event_type, EventType::Emergency);
1798        assert_eq!(event.timestamp, 1000);
1799        assert_eq!(peripheral.timestamp, 1000);
1800
1801        peripheral.clear_event();
1802        assert!(peripheral.last_event.is_none());
1803    }
1804
1805    #[test]
1806    fn test_peripheral_encode_decode_without_event() {
1807        let peripheral = Peripheral::new(0xAABBCCDD, PeripheralType::SoldierSensor)
1808            .with_callsign("BRAVO-2")
1809            .with_parent(0x11223344);
1810
1811        let encoded = peripheral.encode();
1812        assert_eq!(encoded.len(), 34); // No event
1813
1814        let decoded = Peripheral::decode(&encoded).unwrap();
1815        assert_eq!(decoded.id, 0xAABBCCDD);
1816        assert_eq!(decoded.parent_node, 0x11223344);
1817        assert_eq!(decoded.peripheral_type, PeripheralType::SoldierSensor);
1818        assert_eq!(decoded.callsign_str(), "BRAVO-2");
1819        assert!(decoded.last_event.is_none());
1820    }
1821
1822    #[test]
1823    fn test_peripheral_encode_decode_with_event() {
1824        let mut peripheral = Peripheral::new(0x12345678, PeripheralType::SoldierSensor)
1825            .with_callsign("CHARLIE")
1826            .with_parent(0x87654321);
1827        peripheral.health = HealthStatus::new(85);
1828        peripheral.set_event(EventType::NeedAssist, 9999);
1829
1830        let encoded = peripheral.encode();
1831        assert_eq!(encoded.len(), 43); // With event
1832
1833        let decoded = Peripheral::decode(&encoded).unwrap();
1834        assert_eq!(decoded.id, 0x12345678);
1835        assert_eq!(decoded.parent_node, 0x87654321);
1836        assert_eq!(decoded.callsign_str(), "CHARLIE");
1837        assert_eq!(decoded.health.battery_percent, 85);
1838        assert!(decoded.last_event.is_some());
1839        let event = decoded.last_event.as_ref().unwrap();
1840        assert_eq!(event.event_type, EventType::NeedAssist);
1841        assert_eq!(event.timestamp, 9999);
1842    }
1843
1844    #[test]
1845    fn test_peripheral_decode_invalid_data() {
1846        // Too short
1847        assert!(Peripheral::decode(&[0u8; 10]).is_none());
1848
1849        // Valid length but no event
1850        let mut data = vec![0u8; 34];
1851        data[25] = 0; // no event flag
1852        assert!(Peripheral::decode(&data).is_some());
1853
1854        // Claims to have event but too short
1855        data[25] = 1; // has event flag
1856        assert!(Peripheral::decode(&data).is_none());
1857    }
1858
1859    // ============================================================================
1860    // EmergencyEvent Tests
1861    // ============================================================================
1862
1863    #[test]
1864    fn test_emergency_event_new() {
1865        let peers = vec![0x22222222, 0x33333333];
1866        let event = EmergencyEvent::new(0x11111111, 1000, &peers);
1867
1868        assert_eq!(event.source_node(), 0x11111111);
1869        assert_eq!(event.timestamp(), 1000);
1870        assert_eq!(event.peer_count(), 3); // source + 2 peers
1871
1872        // Source is auto-acked
1873        assert!(event.has_acked(0x11111111));
1874        // Others are not
1875        assert!(!event.has_acked(0x22222222));
1876        assert!(!event.has_acked(0x33333333));
1877    }
1878
1879    #[test]
1880    fn test_emergency_event_ack() {
1881        let peers = vec![0x22222222, 0x33333333];
1882        let mut event = EmergencyEvent::new(0x11111111, 1000, &peers);
1883
1884        assert_eq!(event.ack_count(), 1); // just source
1885        assert!(!event.all_acked());
1886
1887        // ACK from first peer
1888        assert!(event.ack(0x22222222)); // returns true - new ack
1889        assert_eq!(event.ack_count(), 2);
1890        assert!(!event.all_acked());
1891
1892        // Duplicate ACK
1893        assert!(!event.ack(0x22222222)); // returns false - already acked
1894
1895        // ACK from second peer
1896        assert!(event.ack(0x33333333));
1897        assert_eq!(event.ack_count(), 3);
1898        assert!(event.all_acked());
1899    }
1900
1901    #[test]
1902    fn test_emergency_event_pending_nodes() {
1903        let peers = vec![0x22222222, 0x33333333];
1904        let mut event = EmergencyEvent::new(0x11111111, 1000, &peers);
1905
1906        let pending = event.pending_nodes();
1907        assert_eq!(pending.len(), 2);
1908        assert!(pending.contains(&0x22222222));
1909        assert!(pending.contains(&0x33333333));
1910
1911        event.ack(0x22222222);
1912        let pending = event.pending_nodes();
1913        assert_eq!(pending.len(), 1);
1914        assert!(pending.contains(&0x33333333));
1915    }
1916
1917    #[test]
1918    fn test_emergency_event_encode_decode() {
1919        let peers = vec![0x22222222, 0x33333333];
1920        let mut event = EmergencyEvent::new(0x11111111, 1234567890, &peers);
1921        event.ack(0x22222222);
1922
1923        let encoded = event.encode();
1924        let decoded = EmergencyEvent::decode(&encoded).unwrap();
1925
1926        assert_eq!(decoded.source_node(), 0x11111111);
1927        assert_eq!(decoded.timestamp(), 1234567890);
1928        assert!(decoded.has_acked(0x11111111));
1929        assert!(decoded.has_acked(0x22222222));
1930        assert!(!decoded.has_acked(0x33333333));
1931    }
1932
1933    #[test]
1934    fn test_emergency_event_merge_same_event() {
1935        // Two nodes have the same emergency, different ack states
1936        let peers = vec![0x22222222, 0x33333333];
1937        let mut event1 = EmergencyEvent::new(0x11111111, 1000, &peers);
1938        let mut event2 = EmergencyEvent::new(0x11111111, 1000, &peers);
1939
1940        event1.ack(0x22222222);
1941        event2.ack(0x33333333);
1942
1943        // Merge event2 into event1
1944        let changed = event1.merge(&event2);
1945        assert!(changed);
1946        assert!(event1.has_acked(0x22222222));
1947        assert!(event1.has_acked(0x33333333));
1948        assert!(event1.all_acked());
1949    }
1950
1951    #[test]
1952    fn test_emergency_event_merge_different_events() {
1953        // Old emergency
1954        let mut old_event = EmergencyEvent::new(0x11111111, 1000, &[0x22222222]);
1955        old_event.ack(0x22222222);
1956
1957        // New emergency from different source
1958        let new_event = EmergencyEvent::new(0x33333333, 2000, &[0x11111111, 0x22222222]);
1959
1960        // Merge new into old - should replace
1961        let changed = old_event.merge(&new_event);
1962        assert!(changed);
1963        assert_eq!(old_event.source_node(), 0x33333333);
1964        assert_eq!(old_event.timestamp(), 2000);
1965        // Old ack state should be gone
1966        assert!(!old_event.has_acked(0x22222222));
1967    }
1968
1969    #[test]
1970    fn test_emergency_event_merge_older_event_ignored() {
1971        // Current emergency
1972        let mut current = EmergencyEvent::new(0x11111111, 2000, &[0x22222222]);
1973
1974        // Older emergency
1975        let older = EmergencyEvent::new(0x33333333, 1000, &[0x11111111]);
1976
1977        // Merge older into current - should NOT replace
1978        let changed = current.merge(&older);
1979        assert!(!changed);
1980        assert_eq!(current.source_node(), 0x11111111);
1981        assert_eq!(current.timestamp(), 2000);
1982    }
1983
1984    #[test]
1985    fn test_emergency_event_add_peer() {
1986        let mut event = EmergencyEvent::new(0x11111111, 1000, &[]);
1987
1988        // Add a peer discovered after emergency started
1989        event.add_peer(0x22222222);
1990        assert!(!event.has_acked(0x22222222));
1991        assert_eq!(event.peer_count(), 2);
1992
1993        // Adding same peer again doesn't change ack status
1994        event.ack(0x22222222);
1995        event.add_peer(0x22222222);
1996        assert!(event.has_acked(0x22222222)); // still acked
1997    }
1998
1999    #[test]
2000    fn test_emergency_event_decode_invalid() {
2001        // Too short
2002        assert!(EmergencyEvent::decode(&[0u8; 10]).is_none());
2003
2004        // Valid header but claims more acks than data
2005        let mut data = vec![0u8; 16];
2006        data[12] = 5; // claims 5 ack entries
2007        assert!(EmergencyEvent::decode(&data).is_none());
2008    }
2009
2010    // ============================================================================
2011    // ChatMessage Tests
2012    // ============================================================================
2013
2014    #[test]
2015    fn test_chat_message_new() {
2016        let msg = ChatMessage::new(0x12345678, 1000, "ALPHA-1", "Hello mesh!");
2017        assert_eq!(msg.origin_node, 0x12345678);
2018        assert_eq!(msg.timestamp, 1000);
2019        assert_eq!(msg.sender(), "ALPHA-1");
2020        assert_eq!(msg.text(), "Hello mesh!");
2021        assert!(msg.is_broadcast);
2022        assert!(!msg.requires_ack);
2023        assert!(!msg.is_reply());
2024    }
2025
2026    #[test]
2027    fn test_chat_message_reply_to() {
2028        let mut msg = ChatMessage::new(0x12345678, 2000, "BRAVO", "Roger that");
2029        msg.set_reply_to(0xAABBCCDD, 1500);
2030
2031        assert!(msg.is_reply());
2032        assert_eq!(msg.reply_to_node, 0xAABBCCDD);
2033        assert_eq!(msg.reply_to_timestamp, 1500);
2034    }
2035
2036    #[test]
2037    fn test_chat_message_truncation() {
2038        // Test sender truncation (max 12 bytes)
2039        let msg = ChatMessage::new(0x1, 1000, "VERY_LONG_CALLSIGN", "Hi");
2040        assert_eq!(msg.sender(), "VERY_LONG_CA"); // 12 chars
2041
2042        // Test text truncation (max 128 bytes)
2043        let long_text = "A".repeat(200);
2044        let msg = ChatMessage::new(0x1, 1000, "X", &long_text);
2045        assert_eq!(msg.text().len(), 128);
2046    }
2047
2048    #[test]
2049    fn test_chat_message_id() {
2050        let msg = ChatMessage::new(0x12345678, 0xABCDEF01, "X", "Y");
2051        let id = msg.message_id();
2052        // ID = (origin << 32) | (timestamp & 0xFFFFFFFF)
2053        assert_eq!(id, (0x12345678u64 << 32) | 0xABCDEF01);
2054    }
2055
2056    #[test]
2057    fn test_chat_message_encode_decode() {
2058        let mut msg = ChatMessage::new(0x12345678, 1234567890, "CHARLIE", "Test message");
2059        msg.is_broadcast = true;
2060        msg.requires_ack = true;
2061        msg.set_reply_to(0xAABBCCDD, 1234567000);
2062
2063        let encoded = msg.encode();
2064        let (decoded, len) = ChatMessage::decode(&encoded).unwrap();
2065
2066        assert_eq!(len, encoded.len());
2067        assert_eq!(decoded.origin_node, 0x12345678);
2068        assert_eq!(decoded.timestamp, 1234567890);
2069        assert_eq!(decoded.sender(), "CHARLIE");
2070        assert_eq!(decoded.text(), "Test message");
2071        assert!(decoded.is_broadcast);
2072        assert!(decoded.requires_ack);
2073        assert_eq!(decoded.reply_to_node, 0xAABBCCDD);
2074        assert_eq!(decoded.reply_to_timestamp, 1234567000);
2075    }
2076
2077    #[test]
2078    fn test_chat_message_decode_minimal() {
2079        // Message with empty sender and text
2080        let msg = ChatMessage::new(0x1, 1000, "", "");
2081        let encoded = msg.encode();
2082        let (decoded, _) = ChatMessage::decode(&encoded).unwrap();
2083        assert_eq!(decoded.sender(), "");
2084        assert_eq!(decoded.text(), "");
2085    }
2086
2087    // ============================================================================
2088    // ChatCRDT Tests
2089    // ============================================================================
2090
2091    #[test]
2092    fn test_chat_crdt_new() {
2093        let chat = ChatCRDT::new();
2094        assert!(chat.is_empty());
2095        assert_eq!(chat.len(), 0);
2096    }
2097
2098    #[test]
2099    fn test_chat_crdt_add_message() {
2100        let mut chat = ChatCRDT::new();
2101
2102        let msg = ChatMessage::new(0x12345678, 1000, "ALPHA", "Hello");
2103        assert!(chat.add_message(msg.clone()));
2104        assert_eq!(chat.len(), 1);
2105
2106        // Duplicate should be rejected
2107        assert!(!chat.add_message(msg));
2108        assert_eq!(chat.len(), 1);
2109    }
2110
2111    #[test]
2112    fn test_chat_crdt_send_message() {
2113        let mut chat = ChatCRDT::new();
2114
2115        assert!(chat.send_message(0x1, 1000, "ALPHA", "First"));
2116        assert!(chat.send_message(0x2, 1001, "BRAVO", "Second"));
2117        assert_eq!(chat.len(), 2);
2118
2119        // Same node, same timestamp = duplicate
2120        assert!(!chat.send_message(0x1, 1000, "ALPHA", "Duplicate"));
2121        assert_eq!(chat.len(), 2);
2122    }
2123
2124    #[test]
2125    fn test_chat_crdt_get_message() {
2126        let mut chat = ChatCRDT::new();
2127        chat.send_message(0x12345678, 1000, "ALPHA", "Test");
2128
2129        let msg = chat.get_message(0x12345678, 1000);
2130        assert!(msg.is_some());
2131        assert_eq!(msg.unwrap().text(), "Test");
2132
2133        // Non-existent message
2134        assert!(chat.get_message(0x99999999, 1000).is_none());
2135    }
2136
2137    #[test]
2138    fn test_chat_crdt_merge() {
2139        let mut chat1 = ChatCRDT::new();
2140        let mut chat2 = ChatCRDT::new();
2141
2142        chat1.send_message(0x1, 1000, "ALPHA", "From 1");
2143        chat2.send_message(0x2, 1001, "BRAVO", "From 2");
2144
2145        // Merge chat2 into chat1
2146        let changed = chat1.merge(&chat2);
2147        assert!(changed);
2148        assert_eq!(chat1.len(), 2);
2149
2150        // Merge again - no changes
2151        let changed = chat1.merge(&chat2);
2152        assert!(!changed);
2153        assert_eq!(chat1.len(), 2);
2154    }
2155
2156    #[test]
2157    fn test_chat_crdt_merge_duplicates() {
2158        let mut chat1 = ChatCRDT::new();
2159        let mut chat2 = ChatCRDT::new();
2160
2161        // Both have the same message
2162        chat1.send_message(0x1, 1000, "ALPHA", "Same message");
2163        chat2.send_message(0x1, 1000, "ALPHA", "Same message");
2164
2165        // Merge should not create duplicates
2166        chat1.merge(&chat2);
2167        assert_eq!(chat1.len(), 1);
2168    }
2169
2170    #[test]
2171    fn test_chat_crdt_pruning() {
2172        let mut chat = ChatCRDT::new();
2173
2174        // Add more than CHAT_MAX_MESSAGES
2175        for i in 0..(CHAT_MAX_MESSAGES + 10) {
2176            chat.send_message(i as u32, i as u64, "X", "Y");
2177        }
2178
2179        // Should be pruned to max
2180        assert_eq!(chat.len(), CHAT_MAX_MESSAGES);
2181
2182        // Oldest messages should be removed
2183        // (first 10 should be gone)
2184        assert!(chat.get_message(0, 0).is_none());
2185        assert!(chat.get_message(9, 9).is_none());
2186        // Newer messages should remain
2187        assert!(chat.get_message(10, 10).is_some());
2188    }
2189
2190    #[test]
2191    fn test_chat_crdt_encode_decode() {
2192        let mut chat = ChatCRDT::new();
2193        chat.send_message(0x12345678, 1000, "ALPHA", "First message");
2194        chat.send_message(0xAABBCCDD, 2000, "BRAVO", "Second message");
2195
2196        let encoded = chat.encode();
2197        let decoded = ChatCRDT::decode(&encoded).unwrap();
2198
2199        assert_eq!(decoded.len(), 2);
2200        assert!(decoded.get_message(0x12345678, 1000).is_some());
2201        assert!(decoded.get_message(0xAABBCCDD, 2000).is_some());
2202    }
2203
2204    #[test]
2205    fn test_chat_crdt_messages_since() {
2206        let mut chat = ChatCRDT::new();
2207        chat.send_message(0x1, 1000, "A", "Old");
2208        chat.send_message(0x2, 2000, "B", "Mid");
2209        chat.send_message(0x3, 3000, "C", "New");
2210
2211        let recent: Vec<_> = chat.messages_since(1500).collect();
2212        assert_eq!(recent.len(), 2);
2213    }
2214
2215    #[test]
2216    fn test_chat_crdt_newest_timestamp() {
2217        let mut chat = ChatCRDT::new();
2218        assert!(chat.newest_timestamp().is_none());
2219
2220        chat.send_message(0x1, 1000, "A", "1");
2221        assert_eq!(chat.newest_timestamp(), Some(1000));
2222
2223        chat.send_message(0x2, 3000, "B", "2");
2224        assert_eq!(chat.newest_timestamp(), Some(3000));
2225
2226        chat.send_message(0x3, 2000, "C", "3"); // older timestamp
2227        assert_eq!(chat.newest_timestamp(), Some(3000));
2228    }
2229}