hive_btle/sync/
delta_document.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Delta Document wire format for bandwidth-efficient sync
17//!
18//! This module implements the v2 document format that supports delta sync -
19//! sending only changed operations instead of full state snapshots.
20//!
21//! ## Wire Format
22//!
23//! Delta documents are identified by the DELTA_DOCUMENT_MARKER (0xB2):
24//!
25//! ```text
26//! [1 byte:  marker (0xB2)]
27//! [1 byte:  flags]
28//!   - bit 0: has_vector_clock
29//!   - bit 1: is_response (sync response vs broadcast)
30//!   - bits 2-7: reserved
31//! [4 bytes: origin_node (LE)]
32//! [8 bytes: timestamp_ms (LE)]
33//! [variable: vector_clock (if flag set)]
34//!   - [2 bytes: entry_count]
35//!   - [entry_count × (4 bytes node_id + 8 bytes clock)]
36//! [2 bytes: operation_count (LE)]
37//! [operations...]
38//! ```
39//!
40//! ## Operation Format
41//!
42//! Each operation is prefixed with a 1-byte type:
43//!
44//! - 0x01: IncrementCounter - counter increment
45//! - 0x02: UpdatePeripheral - peripheral state update
46//! - 0x03: SetEmergency - create emergency event
47//! - 0x04: AckEmergency - acknowledge emergency
48//! - 0x05: ClearEmergency - clear emergency
49//!
50//! ## Usage
51//!
52//! ```ignore
53//! // Check if incoming data is a delta document
54//! if DeltaDocument::is_delta_document(&data) {
55//!     let delta = DeltaDocument::decode(&data)?;
56//!     for op in &delta.operations {
57//!         apply_operation(op);
58//!     }
59//! }
60//!
61//! // Build delta for a specific peer
62//! let delta = mesh.build_delta_for_peer(&peer_id);
63//! let data = delta.encode();
64//! ```
65
66#[cfg(not(feature = "std"))]
67use alloc::{string::String, vec::Vec};
68
69use crate::sync::crdt::Peripheral;
70use crate::sync::delta::VectorClock;
71use crate::NodeId;
72
73/// Marker byte for delta document format
74pub const DELTA_DOCUMENT_MARKER: u8 = 0xB2;
75
76/// Operation type constants
77pub mod op_type {
78    /// Counter increment operation
79    pub const INCREMENT_COUNTER: u8 = 0x01;
80    /// Peripheral state update operation
81    pub const UPDATE_PERIPHERAL: u8 = 0x02;
82    /// Set emergency event operation
83    pub const SET_EMERGENCY: u8 = 0x03;
84    /// Acknowledge emergency operation
85    pub const ACK_EMERGENCY: u8 = 0x04;
86    /// Clear emergency operation
87    pub const CLEAR_EMERGENCY: u8 = 0x05;
88}
89
90/// Flags for delta document
91#[derive(Debug, Clone, Copy, Default)]
92pub struct DeltaFlags {
93    /// Whether vector clock is included
94    pub has_vector_clock: bool,
95    /// Whether this is a sync response (vs broadcast)
96    pub is_response: bool,
97}
98
99impl DeltaFlags {
100    /// Encode flags to a single byte
101    pub fn to_byte(&self) -> u8 {
102        let mut flags = 0u8;
103        if self.has_vector_clock {
104            flags |= 0x01;
105        }
106        if self.is_response {
107            flags |= 0x02;
108        }
109        flags
110    }
111
112    /// Decode flags from a single byte
113    pub fn from_byte(byte: u8) -> Self {
114        Self {
115            has_vector_clock: byte & 0x01 != 0,
116            is_response: byte & 0x02 != 0,
117        }
118    }
119}
120
121/// A CRDT operation for delta sync
122#[derive(Debug, Clone)]
123pub enum Operation {
124    /// Increment a counter
125    IncrementCounter {
126        /// Counter ID (0 = default mesh counter)
127        counter_id: u8,
128        /// Node that incremented
129        node_id: NodeId,
130        /// Amount to increment
131        amount: u64,
132        /// Timestamp of increment
133        timestamp: u64,
134    },
135
136    /// Update peripheral state
137    UpdatePeripheral {
138        /// The peripheral data
139        peripheral: Peripheral,
140        /// Timestamp of update
141        timestamp: u64,
142    },
143
144    /// Set an emergency event
145    SetEmergency {
146        /// Source node declaring emergency
147        source_node: NodeId,
148        /// Timestamp of emergency
149        timestamp: u64,
150        /// Known peers at time of emergency
151        known_peers: Vec<u32>,
152    },
153
154    /// Acknowledge an emergency
155    AckEmergency {
156        /// Node sending the ACK
157        node_id: NodeId,
158        /// Timestamp of emergency being ACKed
159        emergency_timestamp: u64,
160    },
161
162    /// Clear an emergency
163    ClearEmergency {
164        /// Timestamp of emergency being cleared
165        emergency_timestamp: u64,
166    },
167}
168
169impl Operation {
170    /// Get the timestamp associated with this operation
171    pub fn timestamp(&self) -> u64 {
172        match self {
173            Operation::IncrementCounter { timestamp, .. } => *timestamp,
174            Operation::UpdatePeripheral { timestamp, .. } => *timestamp,
175            Operation::SetEmergency { timestamp, .. } => *timestamp,
176            Operation::AckEmergency {
177                emergency_timestamp,
178                ..
179            } => *emergency_timestamp,
180            Operation::ClearEmergency {
181                emergency_timestamp,
182            } => *emergency_timestamp,
183        }
184    }
185
186    /// Get a unique key for this operation (for deduplication)
187    pub fn key(&self) -> String {
188        match self {
189            Operation::IncrementCounter {
190                counter_id,
191                node_id,
192                ..
193            } => {
194                #[cfg(feature = "std")]
195                return format!("counter:{}:{}", counter_id, node_id.as_u32());
196                #[cfg(not(feature = "std"))]
197                return alloc::format!("counter:{}:{}", counter_id, node_id.as_u32());
198            }
199            Operation::UpdatePeripheral { peripheral, .. } => {
200                #[cfg(feature = "std")]
201                return format!("peripheral:{}", peripheral.id);
202                #[cfg(not(feature = "std"))]
203                return alloc::format!("peripheral:{}", peripheral.id);
204            }
205            Operation::SetEmergency { source_node, .. } => {
206                #[cfg(feature = "std")]
207                return format!("emergency:{}", source_node.as_u32());
208                #[cfg(not(feature = "std"))]
209                return alloc::format!("emergency:{}", source_node.as_u32());
210            }
211            Operation::AckEmergency { node_id, .. } => {
212                #[cfg(feature = "std")]
213                return format!("ack:{}", node_id.as_u32());
214                #[cfg(not(feature = "std"))]
215                return alloc::format!("ack:{}", node_id.as_u32());
216            }
217            Operation::ClearEmergency { .. } => "clear_emergency".into(),
218        }
219    }
220
221    /// Encode to bytes
222    pub fn encode(&self) -> Vec<u8> {
223        let mut buf = Vec::new();
224
225        match self {
226            Operation::IncrementCounter {
227                counter_id,
228                node_id,
229                amount,
230                timestamp,
231            } => {
232                buf.push(op_type::INCREMENT_COUNTER);
233                buf.push(*counter_id);
234                buf.extend_from_slice(&node_id.as_u32().to_le_bytes());
235                buf.extend_from_slice(&amount.to_le_bytes());
236                buf.extend_from_slice(&timestamp.to_le_bytes());
237            }
238            Operation::UpdatePeripheral {
239                peripheral,
240                timestamp,
241            } => {
242                buf.push(op_type::UPDATE_PERIPHERAL);
243                buf.extend_from_slice(&timestamp.to_le_bytes());
244                let pdata = peripheral.encode();
245                buf.extend_from_slice(&(pdata.len() as u16).to_le_bytes());
246                buf.extend_from_slice(&pdata);
247            }
248            Operation::SetEmergency {
249                source_node,
250                timestamp,
251                known_peers,
252            } => {
253                buf.push(op_type::SET_EMERGENCY);
254                buf.extend_from_slice(&source_node.as_u32().to_le_bytes());
255                buf.extend_from_slice(&timestamp.to_le_bytes());
256                buf.push(known_peers.len() as u8);
257                for peer in known_peers {
258                    buf.extend_from_slice(&peer.to_le_bytes());
259                }
260            }
261            Operation::AckEmergency {
262                node_id,
263                emergency_timestamp,
264            } => {
265                buf.push(op_type::ACK_EMERGENCY);
266                buf.extend_from_slice(&node_id.as_u32().to_le_bytes());
267                buf.extend_from_slice(&emergency_timestamp.to_le_bytes());
268            }
269            Operation::ClearEmergency {
270                emergency_timestamp,
271            } => {
272                buf.push(op_type::CLEAR_EMERGENCY);
273                buf.extend_from_slice(&emergency_timestamp.to_le_bytes());
274            }
275        }
276
277        buf
278    }
279
280    /// Decode from bytes
281    pub fn decode(data: &[u8]) -> Option<(Self, usize)> {
282        if data.is_empty() {
283            return None;
284        }
285
286        let op_type = data[0];
287
288        match op_type {
289            op_type::INCREMENT_COUNTER => {
290                if data.len() < 22 {
291                    return None;
292                }
293                let counter_id = data[1];
294                let node_id = NodeId::new(u32::from_le_bytes([data[2], data[3], data[4], data[5]]));
295                let amount = u64::from_le_bytes([
296                    data[6], data[7], data[8], data[9], data[10], data[11], data[12], data[13],
297                ]);
298                let timestamp = u64::from_le_bytes([
299                    data[14], data[15], data[16], data[17], data[18], data[19], data[20], data[21],
300                ]);
301                Some((
302                    Operation::IncrementCounter {
303                        counter_id,
304                        node_id,
305                        amount,
306                        timestamp,
307                    },
308                    22,
309                ))
310            }
311            op_type::UPDATE_PERIPHERAL => {
312                if data.len() < 11 {
313                    return None;
314                }
315                let timestamp = u64::from_le_bytes([
316                    data[1], data[2], data[3], data[4], data[5], data[6], data[7], data[8],
317                ]);
318                let plen = u16::from_le_bytes([data[9], data[10]]) as usize;
319                if data.len() < 11 + plen {
320                    return None;
321                }
322                let peripheral = Peripheral::decode(&data[11..11 + plen])?;
323                Some((
324                    Operation::UpdatePeripheral {
325                        peripheral,
326                        timestamp,
327                    },
328                    11 + plen,
329                ))
330            }
331            op_type::SET_EMERGENCY => {
332                if data.len() < 14 {
333                    return None;
334                }
335                let source_node =
336                    NodeId::new(u32::from_le_bytes([data[1], data[2], data[3], data[4]]));
337                let timestamp = u64::from_le_bytes([
338                    data[5], data[6], data[7], data[8], data[9], data[10], data[11], data[12],
339                ]);
340                let peer_count = data[13] as usize;
341                if data.len() < 14 + peer_count * 4 {
342                    return None;
343                }
344                let mut known_peers = Vec::with_capacity(peer_count);
345                let mut offset = 14;
346                for _ in 0..peer_count {
347                    known_peers.push(u32::from_le_bytes([
348                        data[offset],
349                        data[offset + 1],
350                        data[offset + 2],
351                        data[offset + 3],
352                    ]));
353                    offset += 4;
354                }
355                Some((
356                    Operation::SetEmergency {
357                        source_node,
358                        timestamp,
359                        known_peers,
360                    },
361                    offset,
362                ))
363            }
364            op_type::ACK_EMERGENCY => {
365                if data.len() < 13 {
366                    return None;
367                }
368                let node_id = NodeId::new(u32::from_le_bytes([data[1], data[2], data[3], data[4]]));
369                let emergency_timestamp = u64::from_le_bytes([
370                    data[5], data[6], data[7], data[8], data[9], data[10], data[11], data[12],
371                ]);
372                Some((
373                    Operation::AckEmergency {
374                        node_id,
375                        emergency_timestamp,
376                    },
377                    13,
378                ))
379            }
380            op_type::CLEAR_EMERGENCY => {
381                if data.len() < 9 {
382                    return None;
383                }
384                let emergency_timestamp = u64::from_le_bytes([
385                    data[1], data[2], data[3], data[4], data[5], data[6], data[7], data[8],
386                ]);
387                Some((
388                    Operation::ClearEmergency {
389                        emergency_timestamp,
390                    },
391                    9,
392                ))
393            }
394            _ => None,
395        }
396    }
397}
398
399/// A delta document containing only changed operations
400#[derive(Debug, Clone)]
401pub struct DeltaDocument {
402    /// Origin node that created this delta
403    pub origin_node: NodeId,
404
405    /// Timestamp when delta was created
406    pub timestamp_ms: u64,
407
408    /// Flags
409    pub flags: DeltaFlags,
410
411    /// Vector clock (for sync negotiation)
412    pub vector_clock: Option<VectorClock>,
413
414    /// Operations in this delta
415    pub operations: Vec<Operation>,
416}
417
418impl DeltaDocument {
419    /// Create a new empty delta document
420    pub fn new(origin_node: NodeId, timestamp_ms: u64) -> Self {
421        Self {
422            origin_node,
423            timestamp_ms,
424            flags: DeltaFlags::default(),
425            vector_clock: None,
426            operations: Vec::new(),
427        }
428    }
429
430    /// Create with vector clock
431    pub fn with_vector_clock(mut self, clock: VectorClock) -> Self {
432        self.vector_clock = Some(clock);
433        self.flags.has_vector_clock = true;
434        self
435    }
436
437    /// Mark as sync response
438    pub fn as_response(mut self) -> Self {
439        self.flags.is_response = true;
440        self
441    }
442
443    /// Add an operation
444    pub fn add_operation(&mut self, op: Operation) {
445        self.operations.push(op);
446    }
447
448    /// Check if empty (no operations)
449    pub fn is_empty(&self) -> bool {
450        self.operations.is_empty()
451    }
452
453    /// Get operation count
454    pub fn operation_count(&self) -> usize {
455        self.operations.len()
456    }
457
458    /// Check if data starts with delta document marker
459    pub fn is_delta_document(data: &[u8]) -> bool {
460        !data.is_empty() && data[0] == DELTA_DOCUMENT_MARKER
461    }
462
463    /// Encode to bytes
464    pub fn encode(&self) -> Vec<u8> {
465        let mut buf = Vec::new();
466
467        // Marker
468        buf.push(DELTA_DOCUMENT_MARKER);
469
470        // Flags
471        buf.push(self.flags.to_byte());
472
473        // Origin node
474        buf.extend_from_slice(&self.origin_node.as_u32().to_le_bytes());
475
476        // Timestamp
477        buf.extend_from_slice(&self.timestamp_ms.to_le_bytes());
478
479        // Vector clock (if present)
480        if let Some(ref clock) = self.vector_clock {
481            let clock_data = clock.encode();
482            buf.extend_from_slice(&clock_data);
483        }
484
485        // Operation count
486        buf.extend_from_slice(&(self.operations.len() as u16).to_le_bytes());
487
488        // Operations
489        for op in &self.operations {
490            buf.extend_from_slice(&op.encode());
491        }
492
493        buf
494    }
495
496    /// Decode from bytes
497    pub fn decode(data: &[u8]) -> Option<Self> {
498        // Minimum size: marker(1) + flags(1) + origin(4) + timestamp(8) + op_count(2) = 16
499        if data.len() < 16 {
500            return None;
501        }
502
503        if data[0] != DELTA_DOCUMENT_MARKER {
504            return None;
505        }
506
507        let flags = DeltaFlags::from_byte(data[1]);
508        let origin_node = NodeId::new(u32::from_le_bytes([data[2], data[3], data[4], data[5]]));
509        let timestamp_ms = u64::from_le_bytes([
510            data[6], data[7], data[8], data[9], data[10], data[11], data[12], data[13],
511        ]);
512
513        let mut offset = 14;
514
515        // Vector clock (if present)
516        let vector_clock = if flags.has_vector_clock {
517            let clock = VectorClock::decode(&data[offset..])?;
518            // Calculate clock size: 4 bytes count + count * 12 bytes
519            let count = u32::from_le_bytes([
520                data[offset],
521                data[offset + 1],
522                data[offset + 2],
523                data[offset + 3],
524            ]) as usize;
525            offset += 4 + count * 12;
526            Some(clock)
527        } else {
528            None
529        };
530
531        // Operation count
532        if data.len() < offset + 2 {
533            return None;
534        }
535        let op_count = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
536        offset += 2;
537
538        // Operations
539        let mut operations = Vec::with_capacity(op_count);
540        for _ in 0..op_count {
541            if offset >= data.len() {
542                return None;
543            }
544            let (op, size) = Operation::decode(&data[offset..])?;
545            operations.push(op);
546            offset += size;
547        }
548
549        Some(Self {
550            origin_node,
551            timestamp_ms,
552            flags,
553            vector_clock,
554            operations,
555        })
556    }
557
558    /// Get estimated encoded size
559    pub fn encoded_size(&self) -> usize {
560        let base = 16; // marker + flags + origin + timestamp + op_count
561        let clock_size = self
562            .vector_clock
563            .as_ref()
564            .map(|c| c.encode().len())
565            .unwrap_or(0);
566        let ops_size: usize = self.operations.iter().map(|op| op.encode().len()).sum();
567        base + clock_size + ops_size
568    }
569}
570
571#[cfg(test)]
572mod tests {
573    use super::*;
574    use crate::sync::crdt::PeripheralType;
575
576    #[test]
577    fn test_operation_increment_counter_encode_decode() {
578        let op = Operation::IncrementCounter {
579            counter_id: 0,
580            node_id: NodeId::new(0x12345678),
581            amount: 42,
582            timestamp: 1000,
583        };
584
585        let encoded = op.encode();
586        let (decoded, size) = Operation::decode(&encoded).unwrap();
587
588        assert_eq!(size, encoded.len());
589        if let Operation::IncrementCounter {
590            counter_id,
591            node_id,
592            amount,
593            timestamp,
594        } = decoded
595        {
596            assert_eq!(counter_id, 0);
597            assert_eq!(node_id.as_u32(), 0x12345678);
598            assert_eq!(amount, 42);
599            assert_eq!(timestamp, 1000);
600        } else {
601            panic!("Wrong operation type");
602        }
603    }
604
605    #[test]
606    fn test_operation_update_peripheral_encode_decode() {
607        let peripheral =
608            Peripheral::new(0xAABBCCDD, PeripheralType::SoldierSensor).with_callsign("ALPHA-1");
609
610        let op = Operation::UpdatePeripheral {
611            peripheral: peripheral.clone(),
612            timestamp: 2000,
613        };
614
615        let encoded = op.encode();
616        let (decoded, size) = Operation::decode(&encoded).unwrap();
617
618        assert_eq!(size, encoded.len());
619        if let Operation::UpdatePeripheral {
620            peripheral: p,
621            timestamp: t,
622        } = decoded
623        {
624            assert_eq!(p.id, peripheral.id);
625            assert_eq!(p.callsign_str(), "ALPHA-1");
626            assert_eq!(t, 2000);
627        } else {
628            panic!("Wrong operation type");
629        }
630    }
631
632    #[test]
633    fn test_operation_set_emergency_encode_decode() {
634        let op = Operation::SetEmergency {
635            source_node: NodeId::new(0x11111111),
636            timestamp: 3000,
637            known_peers: vec![0x22222222, 0x33333333],
638        };
639
640        let encoded = op.encode();
641        let (decoded, size) = Operation::decode(&encoded).unwrap();
642
643        assert_eq!(size, encoded.len());
644        if let Operation::SetEmergency {
645            source_node,
646            timestamp,
647            known_peers,
648        } = decoded
649        {
650            assert_eq!(source_node.as_u32(), 0x11111111);
651            assert_eq!(timestamp, 3000);
652            assert_eq!(known_peers, vec![0x22222222, 0x33333333]);
653        } else {
654            panic!("Wrong operation type");
655        }
656    }
657
658    #[test]
659    fn test_operation_ack_emergency_encode_decode() {
660        let op = Operation::AckEmergency {
661            node_id: NodeId::new(0x22222222),
662            emergency_timestamp: 3000,
663        };
664
665        let encoded = op.encode();
666        let (decoded, size) = Operation::decode(&encoded).unwrap();
667
668        assert_eq!(size, encoded.len());
669        if let Operation::AckEmergency {
670            node_id,
671            emergency_timestamp,
672        } = decoded
673        {
674            assert_eq!(node_id.as_u32(), 0x22222222);
675            assert_eq!(emergency_timestamp, 3000);
676        } else {
677            panic!("Wrong operation type");
678        }
679    }
680
681    #[test]
682    fn test_operation_clear_emergency_encode_decode() {
683        let op = Operation::ClearEmergency {
684            emergency_timestamp: 3000,
685        };
686
687        let encoded = op.encode();
688        let (decoded, size) = Operation::decode(&encoded).unwrap();
689
690        assert_eq!(size, encoded.len());
691        if let Operation::ClearEmergency {
692            emergency_timestamp,
693        } = decoded
694        {
695            assert_eq!(emergency_timestamp, 3000);
696        } else {
697            panic!("Wrong operation type");
698        }
699    }
700
701    #[test]
702    fn test_delta_document_empty() {
703        let delta = DeltaDocument::new(NodeId::new(0x12345678), 1000);
704
705        assert!(delta.is_empty());
706        assert_eq!(delta.operation_count(), 0);
707
708        let encoded = delta.encode();
709        assert!(DeltaDocument::is_delta_document(&encoded));
710
711        let decoded = DeltaDocument::decode(&encoded).unwrap();
712        assert_eq!(decoded.origin_node.as_u32(), 0x12345678);
713        assert_eq!(decoded.timestamp_ms, 1000);
714        assert!(decoded.is_empty());
715    }
716
717    #[test]
718    fn test_delta_document_with_operations() {
719        let mut delta = DeltaDocument::new(NodeId::new(0x12345678), 1000);
720
721        delta.add_operation(Operation::IncrementCounter {
722            counter_id: 0,
723            node_id: NodeId::new(0x12345678),
724            amount: 1,
725            timestamp: 1000,
726        });
727
728        delta.add_operation(Operation::AckEmergency {
729            node_id: NodeId::new(0x12345678),
730            emergency_timestamp: 500,
731        });
732
733        assert_eq!(delta.operation_count(), 2);
734
735        let encoded = delta.encode();
736        let decoded = DeltaDocument::decode(&encoded).unwrap();
737
738        assert_eq!(decoded.operation_count(), 2);
739    }
740
741    #[test]
742    fn test_delta_document_with_vector_clock() {
743        let mut clock = VectorClock::new();
744        clock.update(&NodeId::new(0x11111111), 5);
745        clock.update(&NodeId::new(0x22222222), 3);
746
747        let delta =
748            DeltaDocument::new(NodeId::new(0x12345678), 1000).with_vector_clock(clock.clone());
749
750        assert!(delta.flags.has_vector_clock);
751
752        let encoded = delta.encode();
753        let decoded = DeltaDocument::decode(&encoded).unwrap();
754
755        assert!(decoded.flags.has_vector_clock);
756        assert!(decoded.vector_clock.is_some());
757
758        let decoded_clock = decoded.vector_clock.unwrap();
759        assert_eq!(decoded_clock.get(&NodeId::new(0x11111111)), 5);
760        assert_eq!(decoded_clock.get(&NodeId::new(0x22222222)), 3);
761    }
762
763    #[test]
764    fn test_delta_document_is_delta_document() {
765        let delta = DeltaDocument::new(NodeId::new(0x12345678), 1000);
766        let encoded = delta.encode();
767
768        assert!(DeltaDocument::is_delta_document(&encoded));
769
770        // Non-delta data
771        let non_delta = vec![0x00, 0x01, 0x02, 0x03];
772        assert!(!DeltaDocument::is_delta_document(&non_delta));
773
774        let empty: Vec<u8> = vec![];
775        assert!(!DeltaDocument::is_delta_document(&empty));
776    }
777
778    #[test]
779    fn test_operation_key() {
780        let op1 = Operation::IncrementCounter {
781            counter_id: 0,
782            node_id: NodeId::new(0x11111111),
783            amount: 1,
784            timestamp: 1000,
785        };
786        let op2 = Operation::IncrementCounter {
787            counter_id: 0,
788            node_id: NodeId::new(0x11111111),
789            amount: 2,
790            timestamp: 2000,
791        };
792        let op3 = Operation::IncrementCounter {
793            counter_id: 0,
794            node_id: NodeId::new(0x22222222),
795            amount: 1,
796            timestamp: 1000,
797        };
798
799        // Same node, same counter = same key
800        assert_eq!(op1.key(), op2.key());
801
802        // Different node = different key
803        assert_ne!(op1.key(), op3.key());
804    }
805}