Skip to main content

hive_btle/sync/
delta_document.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Delta Document wire format for bandwidth-efficient sync
17//!
18//! This module implements the v2 document format that supports delta sync -
19//! sending only changed operations instead of full state snapshots.
20//!
21//! ## Wire Format
22//!
23//! Delta documents are identified by the DELTA_DOCUMENT_MARKER (0xB2):
24//!
25//! ```text
26//! [1 byte:  marker (0xB2)]
27//! [1 byte:  flags]
28//!   - bit 0: has_vector_clock
29//!   - bit 1: is_response (sync response vs broadcast)
30//!   - bits 2-7: reserved
31//! [4 bytes: origin_node (LE)]
32//! [8 bytes: timestamp_ms (LE)]
33//! [variable: vector_clock (if flag set)]
34//!   - [2 bytes: entry_count]
35//!   - [entry_count × (4 bytes node_id + 8 bytes clock)]
36//! [2 bytes: operation_count (LE)]
37//! [operations...]
38//! ```
39//!
40//! ## Operation Format
41//!
42//! Each operation is prefixed with a 1-byte type:
43//!
44//! - 0x01: IncrementCounter - counter increment
45//! - 0x02: UpdatePeripheral - peripheral state update
46//! - 0x03: SetEmergency - create emergency event
47//! - 0x04: AckEmergency - acknowledge emergency
48//! - 0x05: ClearEmergency - clear emergency
49//!
50//! ## Usage
51//!
52//! ```ignore
53//! // Check if incoming data is a delta document
54//! if DeltaDocument::is_delta_document(&data) {
55//!     let delta = DeltaDocument::decode(&data)?;
56//!     for op in &delta.operations {
57//!         apply_operation(op);
58//!     }
59//! }
60//!
61//! // Build delta for a specific peer
62//! let delta = mesh.build_delta_for_peer(&peer_id);
63//! let data = delta.encode();
64//! ```
65
66#[cfg(not(feature = "std"))]
67use alloc::{string::String, vec::Vec};
68
69use crate::registry::AppOperation;
70use crate::sync::crdt::Peripheral;
71use crate::sync::delta::VectorClock;
72use crate::NodeId;
73
74/// Marker byte for delta document format
75pub const DELTA_DOCUMENT_MARKER: u8 = 0xB2;
76
77/// Operation type constants
78pub mod op_type {
79    /// Counter increment operation
80    pub const INCREMENT_COUNTER: u8 = 0x01;
81    /// Peripheral state update operation
82    pub const UPDATE_PERIPHERAL: u8 = 0x02;
83    /// Set emergency event operation
84    pub const SET_EMERGENCY: u8 = 0x03;
85    /// Acknowledge emergency operation
86    pub const ACK_EMERGENCY: u8 = 0x04;
87    /// Clear emergency operation
88    pub const CLEAR_EMERGENCY: u8 = 0x05;
89}
90
91/// Flags for delta document
92#[derive(Debug, Clone, Copy, Default)]
93pub struct DeltaFlags {
94    /// Whether vector clock is included
95    pub has_vector_clock: bool,
96    /// Whether this is a sync response (vs broadcast)
97    pub is_response: bool,
98}
99
100impl DeltaFlags {
101    /// Encode flags to a single byte
102    pub fn to_byte(&self) -> u8 {
103        let mut flags = 0u8;
104        if self.has_vector_clock {
105            flags |= 0x01;
106        }
107        if self.is_response {
108            flags |= 0x02;
109        }
110        flags
111    }
112
113    /// Decode flags from a single byte
114    pub fn from_byte(byte: u8) -> Self {
115        Self {
116            has_vector_clock: byte & 0x01 != 0,
117            is_response: byte & 0x02 != 0,
118        }
119    }
120}
121
122/// A CRDT operation for delta sync
123#[derive(Debug, Clone)]
124pub enum Operation {
125    /// Increment a counter
126    IncrementCounter {
127        /// Counter ID (0 = default mesh counter)
128        counter_id: u8,
129        /// Node that incremented
130        node_id: NodeId,
131        /// Amount to increment
132        amount: u64,
133        /// Timestamp of increment
134        timestamp: u64,
135    },
136
137    /// Update peripheral state
138    UpdatePeripheral {
139        /// The peripheral data
140        peripheral: Peripheral,
141        /// Timestamp of update
142        timestamp: u64,
143    },
144
145    /// Set an emergency event
146    SetEmergency {
147        /// Source node declaring emergency
148        source_node: NodeId,
149        /// Timestamp of emergency
150        timestamp: u64,
151        /// Known peers at time of emergency
152        known_peers: Vec<u32>,
153    },
154
155    /// Acknowledge an emergency
156    AckEmergency {
157        /// Node sending the ACK
158        node_id: NodeId,
159        /// Timestamp of emergency being ACKed
160        emergency_timestamp: u64,
161    },
162
163    /// Clear an emergency
164    ClearEmergency {
165        /// Timestamp of emergency being cleared
166        emergency_timestamp: u64,
167    },
168
169    /// App-layer document operation (0x10-0x1F range)
170    ///
171    /// Used for extensible document types registered via DocumentRegistry.
172    /// The AppOperation contains type_id, op_code, source_node, timestamp, and payload.
173    App(AppOperation),
174}
175
176impl Operation {
177    /// Get the timestamp associated with this operation
178    pub fn timestamp(&self) -> u64 {
179        match self {
180            Operation::IncrementCounter { timestamp, .. } => *timestamp,
181            Operation::UpdatePeripheral { timestamp, .. } => *timestamp,
182            Operation::SetEmergency { timestamp, .. } => *timestamp,
183            Operation::AckEmergency {
184                emergency_timestamp,
185                ..
186            } => *emergency_timestamp,
187            Operation::ClearEmergency {
188                emergency_timestamp,
189            } => *emergency_timestamp,
190            Operation::App(op) => op.timestamp,
191        }
192    }
193
194    /// Get a unique key for this operation (for deduplication)
195    pub fn key(&self) -> String {
196        match self {
197            Operation::IncrementCounter {
198                counter_id,
199                node_id,
200                ..
201            } => {
202                #[cfg(feature = "std")]
203                return format!("counter:{}:{}", counter_id, node_id.as_u32());
204                #[cfg(not(feature = "std"))]
205                return alloc::format!("counter:{}:{}", counter_id, node_id.as_u32());
206            }
207            Operation::UpdatePeripheral { peripheral, .. } => {
208                #[cfg(feature = "std")]
209                return format!("peripheral:{}", peripheral.id);
210                #[cfg(not(feature = "std"))]
211                return alloc::format!("peripheral:{}", peripheral.id);
212            }
213            Operation::SetEmergency { source_node, .. } => {
214                #[cfg(feature = "std")]
215                return format!("emergency:{}", source_node.as_u32());
216                #[cfg(not(feature = "std"))]
217                return alloc::format!("emergency:{}", source_node.as_u32());
218            }
219            Operation::AckEmergency { node_id, .. } => {
220                #[cfg(feature = "std")]
221                return format!("ack:{}", node_id.as_u32());
222                #[cfg(not(feature = "std"))]
223                return alloc::format!("ack:{}", node_id.as_u32());
224            }
225            Operation::ClearEmergency { .. } => "clear_emergency".into(),
226            Operation::App(op) => {
227                // Key includes type_id, source_node, and timestamp for document identity
228                #[cfg(feature = "std")]
229                return format!("app:{}:{}:{}", op.type_id, op.source_node, op.timestamp);
230                #[cfg(not(feature = "std"))]
231                return alloc::format!("app:{}:{}:{}", op.type_id, op.source_node, op.timestamp);
232            }
233        }
234    }
235
236    /// Encode to bytes
237    pub fn encode(&self) -> Vec<u8> {
238        let mut buf = Vec::new();
239
240        match self {
241            Operation::IncrementCounter {
242                counter_id,
243                node_id,
244                amount,
245                timestamp,
246            } => {
247                buf.push(op_type::INCREMENT_COUNTER);
248                buf.push(*counter_id);
249                buf.extend_from_slice(&node_id.as_u32().to_le_bytes());
250                buf.extend_from_slice(&amount.to_le_bytes());
251                buf.extend_from_slice(&timestamp.to_le_bytes());
252            }
253            Operation::UpdatePeripheral {
254                peripheral,
255                timestamp,
256            } => {
257                buf.push(op_type::UPDATE_PERIPHERAL);
258                buf.extend_from_slice(&timestamp.to_le_bytes());
259                let pdata = peripheral.encode();
260                buf.extend_from_slice(&(pdata.len() as u16).to_le_bytes());
261                buf.extend_from_slice(&pdata);
262            }
263            Operation::SetEmergency {
264                source_node,
265                timestamp,
266                known_peers,
267            } => {
268                buf.push(op_type::SET_EMERGENCY);
269                buf.extend_from_slice(&source_node.as_u32().to_le_bytes());
270                buf.extend_from_slice(&timestamp.to_le_bytes());
271                buf.push(known_peers.len() as u8);
272                for peer in known_peers {
273                    buf.extend_from_slice(&peer.to_le_bytes());
274                }
275            }
276            Operation::AckEmergency {
277                node_id,
278                emergency_timestamp,
279            } => {
280                buf.push(op_type::ACK_EMERGENCY);
281                buf.extend_from_slice(&node_id.as_u32().to_le_bytes());
282                buf.extend_from_slice(&emergency_timestamp.to_le_bytes());
283            }
284            Operation::ClearEmergency {
285                emergency_timestamp,
286            } => {
287                buf.push(op_type::CLEAR_EMERGENCY);
288                buf.extend_from_slice(&emergency_timestamp.to_le_bytes());
289            }
290            Operation::App(op) => {
291                // AppOperation has its own encode that includes op_type byte (0x10-0x1F)
292                buf.extend_from_slice(&op.encode());
293            }
294        }
295
296        buf
297    }
298
299    /// Decode from bytes
300    pub fn decode(data: &[u8]) -> Option<(Self, usize)> {
301        if data.is_empty() {
302            return None;
303        }
304
305        let op_type = data[0];
306
307        match op_type {
308            op_type::INCREMENT_COUNTER => {
309                if data.len() < 22 {
310                    return None;
311                }
312                let counter_id = data[1];
313                let node_id = NodeId::new(u32::from_le_bytes([data[2], data[3], data[4], data[5]]));
314                let amount = u64::from_le_bytes([
315                    data[6], data[7], data[8], data[9], data[10], data[11], data[12], data[13],
316                ]);
317                let timestamp = u64::from_le_bytes([
318                    data[14], data[15], data[16], data[17], data[18], data[19], data[20], data[21],
319                ]);
320                Some((
321                    Operation::IncrementCounter {
322                        counter_id,
323                        node_id,
324                        amount,
325                        timestamp,
326                    },
327                    22,
328                ))
329            }
330            op_type::UPDATE_PERIPHERAL => {
331                if data.len() < 11 {
332                    return None;
333                }
334                let timestamp = u64::from_le_bytes([
335                    data[1], data[2], data[3], data[4], data[5], data[6], data[7], data[8],
336                ]);
337                let plen = u16::from_le_bytes([data[9], data[10]]) as usize;
338                if data.len() < 11 + plen {
339                    return None;
340                }
341                let peripheral = Peripheral::decode(&data[11..11 + plen])?;
342                Some((
343                    Operation::UpdatePeripheral {
344                        peripheral,
345                        timestamp,
346                    },
347                    11 + plen,
348                ))
349            }
350            op_type::SET_EMERGENCY => {
351                if data.len() < 14 {
352                    return None;
353                }
354                let source_node =
355                    NodeId::new(u32::from_le_bytes([data[1], data[2], data[3], data[4]]));
356                let timestamp = u64::from_le_bytes([
357                    data[5], data[6], data[7], data[8], data[9], data[10], data[11], data[12],
358                ]);
359                let peer_count = data[13] as usize;
360                if data.len() < 14 + peer_count * 4 {
361                    return None;
362                }
363                let mut known_peers = Vec::with_capacity(peer_count);
364                let mut offset = 14;
365                for _ in 0..peer_count {
366                    known_peers.push(u32::from_le_bytes([
367                        data[offset],
368                        data[offset + 1],
369                        data[offset + 2],
370                        data[offset + 3],
371                    ]));
372                    offset += 4;
373                }
374                Some((
375                    Operation::SetEmergency {
376                        source_node,
377                        timestamp,
378                        known_peers,
379                    },
380                    offset,
381                ))
382            }
383            op_type::ACK_EMERGENCY => {
384                if data.len() < 13 {
385                    return None;
386                }
387                let node_id = NodeId::new(u32::from_le_bytes([data[1], data[2], data[3], data[4]]));
388                let emergency_timestamp = u64::from_le_bytes([
389                    data[5], data[6], data[7], data[8], data[9], data[10], data[11], data[12],
390                ]);
391                Some((
392                    Operation::AckEmergency {
393                        node_id,
394                        emergency_timestamp,
395                    },
396                    13,
397                ))
398            }
399            op_type::CLEAR_EMERGENCY => {
400                if data.len() < 9 {
401                    return None;
402                }
403                let emergency_timestamp = u64::from_le_bytes([
404                    data[1], data[2], data[3], data[4], data[5], data[6], data[7], data[8],
405                ]);
406                Some((
407                    Operation::ClearEmergency {
408                        emergency_timestamp,
409                    },
410                    9,
411                ))
412            }
413            // App-layer operations (0x10-0x1F range)
414            op if AppOperation::is_app_op_type(op) => {
415                let (app_op, consumed) = AppOperation::decode(data)?;
416                Some((Operation::App(app_op), consumed))
417            }
418            _ => None,
419        }
420    }
421}
422
423/// A delta document containing only changed operations
424#[derive(Debug, Clone)]
425pub struct DeltaDocument {
426    /// Origin node that created this delta
427    pub origin_node: NodeId,
428
429    /// Timestamp when delta was created
430    pub timestamp_ms: u64,
431
432    /// Flags
433    pub flags: DeltaFlags,
434
435    /// Vector clock (for sync negotiation)
436    pub vector_clock: Option<VectorClock>,
437
438    /// Operations in this delta
439    pub operations: Vec<Operation>,
440}
441
442impl DeltaDocument {
443    /// Create a new empty delta document
444    pub fn new(origin_node: NodeId, timestamp_ms: u64) -> Self {
445        Self {
446            origin_node,
447            timestamp_ms,
448            flags: DeltaFlags::default(),
449            vector_clock: None,
450            operations: Vec::new(),
451        }
452    }
453
454    /// Create with vector clock
455    pub fn with_vector_clock(mut self, clock: VectorClock) -> Self {
456        self.vector_clock = Some(clock);
457        self.flags.has_vector_clock = true;
458        self
459    }
460
461    /// Mark as sync response
462    pub fn as_response(mut self) -> Self {
463        self.flags.is_response = true;
464        self
465    }
466
467    /// Add an operation
468    pub fn add_operation(&mut self, op: Operation) {
469        self.operations.push(op);
470    }
471
472    /// Check if empty (no operations)
473    pub fn is_empty(&self) -> bool {
474        self.operations.is_empty()
475    }
476
477    /// Get operation count
478    pub fn operation_count(&self) -> usize {
479        self.operations.len()
480    }
481
482    /// Check if data starts with delta document marker
483    pub fn is_delta_document(data: &[u8]) -> bool {
484        !data.is_empty() && data[0] == DELTA_DOCUMENT_MARKER
485    }
486
487    /// Encode to bytes
488    pub fn encode(&self) -> Vec<u8> {
489        let mut buf = Vec::new();
490
491        // Marker
492        buf.push(DELTA_DOCUMENT_MARKER);
493
494        // Flags
495        buf.push(self.flags.to_byte());
496
497        // Origin node
498        buf.extend_from_slice(&self.origin_node.as_u32().to_le_bytes());
499
500        // Timestamp
501        buf.extend_from_slice(&self.timestamp_ms.to_le_bytes());
502
503        // Vector clock (if present)
504        if let Some(ref clock) = self.vector_clock {
505            let clock_data = clock.encode();
506            buf.extend_from_slice(&clock_data);
507        }
508
509        // Operation count
510        buf.extend_from_slice(&(self.operations.len() as u16).to_le_bytes());
511
512        // Operations
513        for op in &self.operations {
514            buf.extend_from_slice(&op.encode());
515        }
516
517        buf
518    }
519
520    /// Decode from bytes
521    pub fn decode(data: &[u8]) -> Option<Self> {
522        // Minimum size: marker(1) + flags(1) + origin(4) + timestamp(8) + op_count(2) = 16
523        if data.len() < 16 {
524            return None;
525        }
526
527        if data[0] != DELTA_DOCUMENT_MARKER {
528            return None;
529        }
530
531        let flags = DeltaFlags::from_byte(data[1]);
532        let origin_node = NodeId::new(u32::from_le_bytes([data[2], data[3], data[4], data[5]]));
533        let timestamp_ms = u64::from_le_bytes([
534            data[6], data[7], data[8], data[9], data[10], data[11], data[12], data[13],
535        ]);
536
537        let mut offset = 14;
538
539        // Vector clock (if present)
540        let vector_clock = if flags.has_vector_clock {
541            let clock = VectorClock::decode(&data[offset..])?;
542            // Calculate clock size: 4 bytes count + count * 12 bytes
543            let count = u32::from_le_bytes([
544                data[offset],
545                data[offset + 1],
546                data[offset + 2],
547                data[offset + 3],
548            ]) as usize;
549            offset += 4 + count * 12;
550            Some(clock)
551        } else {
552            None
553        };
554
555        // Operation count
556        if data.len() < offset + 2 {
557            return None;
558        }
559        let op_count = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
560        offset += 2;
561
562        // Operations
563        let mut operations = Vec::with_capacity(op_count);
564        for _ in 0..op_count {
565            if offset >= data.len() {
566                return None;
567            }
568            let (op, size) = Operation::decode(&data[offset..])?;
569            operations.push(op);
570            offset += size;
571        }
572
573        Some(Self {
574            origin_node,
575            timestamp_ms,
576            flags,
577            vector_clock,
578            operations,
579        })
580    }
581
582    /// Get estimated encoded size
583    pub fn encoded_size(&self) -> usize {
584        let base = 16; // marker + flags + origin + timestamp + op_count
585        let clock_size = self
586            .vector_clock
587            .as_ref()
588            .map(|c| c.encode().len())
589            .unwrap_or(0);
590        let ops_size: usize = self.operations.iter().map(|op| op.encode().len()).sum();
591        base + clock_size + ops_size
592    }
593}
594
595#[cfg(test)]
596mod tests {
597    use super::*;
598    use crate::sync::crdt::PeripheralType;
599
600    #[test]
601    fn test_operation_increment_counter_encode_decode() {
602        let op = Operation::IncrementCounter {
603            counter_id: 0,
604            node_id: NodeId::new(0x12345678),
605            amount: 42,
606            timestamp: 1000,
607        };
608
609        let encoded = op.encode();
610        let (decoded, size) = Operation::decode(&encoded).unwrap();
611
612        assert_eq!(size, encoded.len());
613        if let Operation::IncrementCounter {
614            counter_id,
615            node_id,
616            amount,
617            timestamp,
618        } = decoded
619        {
620            assert_eq!(counter_id, 0);
621            assert_eq!(node_id.as_u32(), 0x12345678);
622            assert_eq!(amount, 42);
623            assert_eq!(timestamp, 1000);
624        } else {
625            panic!("Wrong operation type");
626        }
627    }
628
629    #[test]
630    fn test_operation_update_peripheral_encode_decode() {
631        let peripheral =
632            Peripheral::new(0xAABBCCDD, PeripheralType::SoldierSensor).with_callsign("ALPHA-1");
633
634        let op = Operation::UpdatePeripheral {
635            peripheral: peripheral.clone(),
636            timestamp: 2000,
637        };
638
639        let encoded = op.encode();
640        let (decoded, size) = Operation::decode(&encoded).unwrap();
641
642        assert_eq!(size, encoded.len());
643        if let Operation::UpdatePeripheral {
644            peripheral: p,
645            timestamp: t,
646        } = decoded
647        {
648            assert_eq!(p.id, peripheral.id);
649            assert_eq!(p.callsign_str(), "ALPHA-1");
650            assert_eq!(t, 2000);
651        } else {
652            panic!("Wrong operation type");
653        }
654    }
655
656    #[test]
657    fn test_operation_set_emergency_encode_decode() {
658        let op = Operation::SetEmergency {
659            source_node: NodeId::new(0x11111111),
660            timestamp: 3000,
661            known_peers: vec![0x22222222, 0x33333333],
662        };
663
664        let encoded = op.encode();
665        let (decoded, size) = Operation::decode(&encoded).unwrap();
666
667        assert_eq!(size, encoded.len());
668        if let Operation::SetEmergency {
669            source_node,
670            timestamp,
671            known_peers,
672        } = decoded
673        {
674            assert_eq!(source_node.as_u32(), 0x11111111);
675            assert_eq!(timestamp, 3000);
676            assert_eq!(known_peers, vec![0x22222222, 0x33333333]);
677        } else {
678            panic!("Wrong operation type");
679        }
680    }
681
682    #[test]
683    fn test_operation_ack_emergency_encode_decode() {
684        let op = Operation::AckEmergency {
685            node_id: NodeId::new(0x22222222),
686            emergency_timestamp: 3000,
687        };
688
689        let encoded = op.encode();
690        let (decoded, size) = Operation::decode(&encoded).unwrap();
691
692        assert_eq!(size, encoded.len());
693        if let Operation::AckEmergency {
694            node_id,
695            emergency_timestamp,
696        } = decoded
697        {
698            assert_eq!(node_id.as_u32(), 0x22222222);
699            assert_eq!(emergency_timestamp, 3000);
700        } else {
701            panic!("Wrong operation type");
702        }
703    }
704
705    #[test]
706    fn test_operation_clear_emergency_encode_decode() {
707        let op = Operation::ClearEmergency {
708            emergency_timestamp: 3000,
709        };
710
711        let encoded = op.encode();
712        let (decoded, size) = Operation::decode(&encoded).unwrap();
713
714        assert_eq!(size, encoded.len());
715        if let Operation::ClearEmergency {
716            emergency_timestamp,
717        } = decoded
718        {
719            assert_eq!(emergency_timestamp, 3000);
720        } else {
721            panic!("Wrong operation type");
722        }
723    }
724
725    #[test]
726    fn test_delta_document_empty() {
727        let delta = DeltaDocument::new(NodeId::new(0x12345678), 1000);
728
729        assert!(delta.is_empty());
730        assert_eq!(delta.operation_count(), 0);
731
732        let encoded = delta.encode();
733        assert!(DeltaDocument::is_delta_document(&encoded));
734
735        let decoded = DeltaDocument::decode(&encoded).unwrap();
736        assert_eq!(decoded.origin_node.as_u32(), 0x12345678);
737        assert_eq!(decoded.timestamp_ms, 1000);
738        assert!(decoded.is_empty());
739    }
740
741    #[test]
742    fn test_delta_document_with_operations() {
743        let mut delta = DeltaDocument::new(NodeId::new(0x12345678), 1000);
744
745        delta.add_operation(Operation::IncrementCounter {
746            counter_id: 0,
747            node_id: NodeId::new(0x12345678),
748            amount: 1,
749            timestamp: 1000,
750        });
751
752        delta.add_operation(Operation::AckEmergency {
753            node_id: NodeId::new(0x12345678),
754            emergency_timestamp: 500,
755        });
756
757        assert_eq!(delta.operation_count(), 2);
758
759        let encoded = delta.encode();
760        let decoded = DeltaDocument::decode(&encoded).unwrap();
761
762        assert_eq!(decoded.operation_count(), 2);
763    }
764
765    #[test]
766    fn test_delta_document_with_vector_clock() {
767        let mut clock = VectorClock::new();
768        clock.update(&NodeId::new(0x11111111), 5);
769        clock.update(&NodeId::new(0x22222222), 3);
770
771        let delta =
772            DeltaDocument::new(NodeId::new(0x12345678), 1000).with_vector_clock(clock.clone());
773
774        assert!(delta.flags.has_vector_clock);
775
776        let encoded = delta.encode();
777        let decoded = DeltaDocument::decode(&encoded).unwrap();
778
779        assert!(decoded.flags.has_vector_clock);
780        assert!(decoded.vector_clock.is_some());
781
782        let decoded_clock = decoded.vector_clock.unwrap();
783        assert_eq!(decoded_clock.get(&NodeId::new(0x11111111)), 5);
784        assert_eq!(decoded_clock.get(&NodeId::new(0x22222222)), 3);
785    }
786
787    #[test]
788    fn test_delta_document_is_delta_document() {
789        let delta = DeltaDocument::new(NodeId::new(0x12345678), 1000);
790        let encoded = delta.encode();
791
792        assert!(DeltaDocument::is_delta_document(&encoded));
793
794        // Non-delta data
795        let non_delta = vec![0x00, 0x01, 0x02, 0x03];
796        assert!(!DeltaDocument::is_delta_document(&non_delta));
797
798        let empty: Vec<u8> = vec![];
799        assert!(!DeltaDocument::is_delta_document(&empty));
800    }
801
802    #[test]
803    fn test_operation_key() {
804        let op1 = Operation::IncrementCounter {
805            counter_id: 0,
806            node_id: NodeId::new(0x11111111),
807            amount: 1,
808            timestamp: 1000,
809        };
810        let op2 = Operation::IncrementCounter {
811            counter_id: 0,
812            node_id: NodeId::new(0x11111111),
813            amount: 2,
814            timestamp: 2000,
815        };
816        let op3 = Operation::IncrementCounter {
817            counter_id: 0,
818            node_id: NodeId::new(0x22222222),
819            amount: 1,
820            timestamp: 1000,
821        };
822
823        // Same node, same counter = same key
824        assert_eq!(op1.key(), op2.key());
825
826        // Different node = different key
827        assert_ne!(op1.key(), op3.key());
828    }
829}