hive_btle/sync/
delta.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Delta Encoder for HIVE-Lite Sync
17//!
18//! Tracks what state has been sent to each peer and only sends
19//! the changes (deltas) since the last sync. This dramatically
20//! reduces bandwidth over BLE.
21
22#[cfg(not(feature = "std"))]
23use alloc::{collections::BTreeMap, format, string::String, string::ToString, vec::Vec};
24#[cfg(feature = "std")]
25use std::collections::HashMap;
26
27use super::crdt::{CrdtOperation, Timestamp};
28use crate::NodeId;
29
30/// Tracks the sync state with a specific peer
31#[derive(Debug, Clone, Default)]
32pub struct PeerSyncState {
33    /// Last timestamp we synced each key
34    #[cfg(feature = "std")]
35    last_sent: HashMap<String, Timestamp>,
36    #[cfg(not(feature = "std"))]
37    last_sent: BTreeMap<String, Timestamp>,
38
39    /// Last timestamp we received from this peer
40    pub last_received_timestamp: Timestamp,
41
42    /// Number of successful syncs
43    pub sync_count: u32,
44
45    /// Bytes sent to this peer
46    pub bytes_sent: u64,
47
48    /// Bytes received from this peer
49    pub bytes_received: u64,
50}
51
52impl PeerSyncState {
53    /// Create new peer sync state
54    pub fn new() -> Self {
55        Self::default()
56    }
57
58    /// Record that we sent a key at a timestamp
59    pub fn mark_sent(&mut self, key: &str, timestamp: Timestamp) {
60        self.last_sent.insert(key.to_string(), timestamp);
61    }
62
63    /// Get the last sent timestamp for a key
64    pub fn last_sent_timestamp(&self, key: &str) -> Option<Timestamp> {
65        self.last_sent.get(key).copied()
66    }
67
68    /// Check if we need to send this key (has newer timestamp)
69    pub fn needs_send(&self, key: &str, timestamp: Timestamp) -> bool {
70        match self.last_sent.get(key) {
71            Some(&last) => timestamp > last,
72            None => true,
73        }
74    }
75
76    /// Clear all tracking (for reconnection)
77    pub fn reset(&mut self) {
78        self.last_sent.clear();
79    }
80}
81
82/// Manages delta encoding for all peers
83#[derive(Debug)]
84pub struct DeltaEncoder {
85    /// Our node ID (for future use in operations)
86    #[allow(dead_code)]
87    node_id: NodeId,
88
89    /// Sync state per peer
90    #[cfg(feature = "std")]
91    peers: HashMap<u32, PeerSyncState>,
92    #[cfg(not(feature = "std"))]
93    peers: BTreeMap<u32, PeerSyncState>,
94
95    /// Current state (key -> (value_hash, timestamp))
96    #[cfg(feature = "std")]
97    current_state: HashMap<String, (u64, Timestamp)>,
98    #[cfg(not(feature = "std"))]
99    current_state: BTreeMap<String, (u64, Timestamp)>,
100}
101
102impl DeltaEncoder {
103    /// Create a new delta encoder
104    pub fn new(node_id: NodeId) -> Self {
105        Self {
106            node_id,
107            #[cfg(feature = "std")]
108            peers: HashMap::new(),
109            #[cfg(not(feature = "std"))]
110            peers: BTreeMap::new(),
111            #[cfg(feature = "std")]
112            current_state: HashMap::new(),
113            #[cfg(not(feature = "std"))]
114            current_state: BTreeMap::new(),
115        }
116    }
117
118    /// Register a peer for delta tracking
119    pub fn add_peer(&mut self, peer_id: &NodeId) {
120        self.peers.entry(peer_id.as_u32()).or_default();
121    }
122
123    /// Remove a peer
124    pub fn remove_peer(&mut self, peer_id: &NodeId) {
125        self.peers.remove(&peer_id.as_u32());
126    }
127
128    /// Get peer sync state
129    pub fn get_peer_state(&self, peer_id: &NodeId) -> Option<&PeerSyncState> {
130        self.peers.get(&peer_id.as_u32())
131    }
132
133    /// Get mutable peer sync state
134    pub fn get_peer_state_mut(&mut self, peer_id: &NodeId) -> Option<&mut PeerSyncState> {
135        self.peers.get_mut(&peer_id.as_u32())
136    }
137
138    /// Update our current state with an operation
139    pub fn update_state(&mut self, key: &str, value_hash: u64, timestamp: Timestamp) {
140        self.current_state
141            .insert(key.to_string(), (value_hash, timestamp));
142    }
143
144    /// Filter operations to only those that need to be sent to a peer
145    pub fn filter_for_peer(
146        &self,
147        peer_id: &NodeId,
148        operations: &[CrdtOperation],
149    ) -> Vec<CrdtOperation> {
150        let peer_state = match self.peers.get(&peer_id.as_u32()) {
151            Some(state) => state,
152            None => return operations.to_vec(), // Unknown peer, send all
153        };
154
155        operations
156            .iter()
157            .filter(|op| {
158                let (key, timestamp) = Self::operation_key_timestamp(op);
159                peer_state.needs_send(&key, timestamp)
160            })
161            .cloned()
162            .collect()
163    }
164
165    /// Mark operations as sent to a peer
166    pub fn mark_sent(&mut self, peer_id: &NodeId, operations: &[CrdtOperation]) {
167        let peer_state = match self.peers.get_mut(&peer_id.as_u32()) {
168            Some(state) => state,
169            None => return,
170        };
171
172        for op in operations {
173            let (key, timestamp) = Self::operation_key_timestamp(op);
174            peer_state.mark_sent(&key, timestamp);
175        }
176    }
177
178    /// Record bytes sent to peer
179    pub fn record_sent(&mut self, peer_id: &NodeId, bytes: usize) {
180        if let Some(state) = self.peers.get_mut(&peer_id.as_u32()) {
181            state.bytes_sent += bytes as u64;
182            state.sync_count += 1;
183        }
184    }
185
186    /// Record bytes received from peer
187    pub fn record_received(&mut self, peer_id: &NodeId, bytes: usize, timestamp: Timestamp) {
188        if let Some(state) = self.peers.get_mut(&peer_id.as_u32()) {
189            state.bytes_received += bytes as u64;
190            state.last_received_timestamp = timestamp;
191        }
192    }
193
194    /// Reset sync state for a peer (e.g., on reconnection)
195    pub fn reset_peer(&mut self, peer_id: &NodeId) {
196        if let Some(state) = self.peers.get_mut(&peer_id.as_u32()) {
197            state.reset();
198        }
199    }
200
201    /// Get sync statistics
202    pub fn stats(&self) -> DeltaStats {
203        let mut total_sent = 0u64;
204        let mut total_received = 0u64;
205        let mut total_syncs = 0u32;
206
207        for state in self.peers.values() {
208            total_sent += state.bytes_sent;
209            total_received += state.bytes_received;
210            total_syncs += state.sync_count;
211        }
212
213        DeltaStats {
214            peer_count: self.peers.len(),
215            total_bytes_sent: total_sent,
216            total_bytes_received: total_received,
217            total_syncs,
218            tracked_keys: self.current_state.len(),
219        }
220    }
221
222    /// Extract key and timestamp from an operation
223    fn operation_key_timestamp(op: &CrdtOperation) -> (String, Timestamp) {
224        match op {
225            CrdtOperation::UpdatePosition {
226                node_id, timestamp, ..
227            } => (format!("pos:{}", node_id), *timestamp),
228            CrdtOperation::UpdateHealth {
229                node_id, timestamp, ..
230            } => (format!("health:{}", node_id), *timestamp),
231            CrdtOperation::IncrementCounter {
232                counter_id,
233                node_id,
234                ..
235            } => {
236                // Counters always need to be synced (they accumulate)
237                (format!("counter:{}:{}", counter_id, node_id), u64::MAX)
238            }
239            CrdtOperation::UpdateRegister {
240                key,
241                timestamp,
242                node_id,
243                ..
244            } => (format!("reg:{}:{}", key, node_id), *timestamp),
245        }
246    }
247}
248
249/// Statistics about delta encoding
250#[derive(Debug, Clone, Default)]
251pub struct DeltaStats {
252    /// Number of peers being tracked
253    pub peer_count: usize,
254    /// Total bytes sent across all peers
255    pub total_bytes_sent: u64,
256    /// Total bytes received across all peers
257    pub total_bytes_received: u64,
258    /// Total number of sync operations
259    pub total_syncs: u32,
260    /// Number of keys being tracked
261    pub tracked_keys: usize,
262}
263
264/// Vector clock for causality tracking
265#[derive(Debug, Clone, Default)]
266pub struct VectorClock {
267    /// Per-node logical clocks
268    #[cfg(feature = "std")]
269    clocks: HashMap<u32, u64>,
270    #[cfg(not(feature = "std"))]
271    clocks: BTreeMap<u32, u64>,
272}
273
274impl VectorClock {
275    /// Create a new empty vector clock
276    pub fn new() -> Self {
277        Self::default()
278    }
279
280    /// Increment the clock for a node
281    pub fn increment(&mut self, node_id: &NodeId) -> u64 {
282        let clock = self.clocks.entry(node_id.as_u32()).or_insert(0);
283        *clock += 1;
284        *clock
285    }
286
287    /// Get the clock value for a node
288    pub fn get(&self, node_id: &NodeId) -> u64 {
289        self.clocks.get(&node_id.as_u32()).copied().unwrap_or(0)
290    }
291
292    /// Update clock for a node (take max)
293    pub fn update(&mut self, node_id: &NodeId, value: u64) {
294        let clock = self.clocks.entry(node_id.as_u32()).or_insert(0);
295        *clock = (*clock).max(value);
296    }
297
298    /// Merge with another vector clock (take component-wise max)
299    pub fn merge(&mut self, other: &VectorClock) {
300        for (&node_id, &value) in &other.clocks {
301            let clock = self.clocks.entry(node_id).or_insert(0);
302            *clock = (*clock).max(value);
303        }
304    }
305
306    /// Check if this clock happens-before another
307    pub fn happens_before(&self, other: &VectorClock) -> bool {
308        let mut dominated = false;
309
310        // All our clocks must be <= other's
311        for (&node_id, &our_val) in &self.clocks {
312            let their_val = other.clocks.get(&node_id).copied().unwrap_or(0);
313            if our_val > their_val {
314                return false;
315            }
316            if our_val < their_val {
317                dominated = true;
318            }
319        }
320
321        // Check for clocks they have that we don't
322        for (&node_id, &their_val) in &other.clocks {
323            if !self.clocks.contains_key(&node_id) && their_val > 0 {
324                dominated = true;
325            }
326        }
327
328        dominated
329    }
330
331    /// Check if clocks are concurrent (neither happens-before the other)
332    pub fn concurrent_with(&self, other: &VectorClock) -> bool {
333        !self.happens_before(other) && !other.happens_before(self)
334    }
335
336    /// Encode to bytes
337    pub fn encode(&self) -> Vec<u8> {
338        let mut buf = Vec::with_capacity(4 + self.clocks.len() * 12);
339        buf.extend_from_slice(&(self.clocks.len() as u32).to_le_bytes());
340        for (&node_id, &value) in &self.clocks {
341            buf.extend_from_slice(&node_id.to_le_bytes());
342            buf.extend_from_slice(&value.to_le_bytes());
343        }
344        buf
345    }
346
347    /// Decode from bytes
348    pub fn decode(data: &[u8]) -> Option<Self> {
349        if data.len() < 4 {
350            return None;
351        }
352
353        let count = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
354        if data.len() < 4 + count * 12 {
355            return None;
356        }
357
358        #[cfg(feature = "std")]
359        let mut clocks = HashMap::with_capacity(count);
360        #[cfg(not(feature = "std"))]
361        let mut clocks = BTreeMap::new();
362
363        let mut offset = 4;
364        for _ in 0..count {
365            let node_id = u32::from_le_bytes([
366                data[offset],
367                data[offset + 1],
368                data[offset + 2],
369                data[offset + 3],
370            ]);
371            let value = u64::from_le_bytes([
372                data[offset + 4],
373                data[offset + 5],
374                data[offset + 6],
375                data[offset + 7],
376                data[offset + 8],
377                data[offset + 9],
378                data[offset + 10],
379                data[offset + 11],
380            ]);
381            clocks.insert(node_id, value);
382            offset += 12;
383        }
384
385        Some(Self { clocks })
386    }
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392    use crate::sync::crdt::Position;
393
394    fn make_position_op(node_id: u32, timestamp: u64) -> CrdtOperation {
395        CrdtOperation::UpdatePosition {
396            node_id: NodeId::new(node_id),
397            position: Position::new(37.0, -122.0),
398            timestamp,
399        }
400    }
401
402    #[test]
403    fn test_peer_sync_state() {
404        let mut state = PeerSyncState::new();
405
406        assert!(state.needs_send("key1", 100));
407
408        state.mark_sent("key1", 100);
409
410        assert!(!state.needs_send("key1", 100));
411        assert!(!state.needs_send("key1", 50));
412        assert!(state.needs_send("key1", 101));
413    }
414
415    #[test]
416    fn test_delta_encoder_filter() {
417        let mut encoder = DeltaEncoder::new(NodeId::new(1));
418        let peer = NodeId::new(2);
419
420        encoder.add_peer(&peer);
421
422        let ops = vec![make_position_op(1, 100), make_position_op(2, 200)];
423
424        // First time, all ops should be sent
425        let filtered = encoder.filter_for_peer(&peer, &ops);
426        assert_eq!(filtered.len(), 2);
427
428        // Mark as sent
429        encoder.mark_sent(&peer, &filtered);
430
431        // Same ops should not be sent again
432        let filtered2 = encoder.filter_for_peer(&peer, &ops);
433        assert_eq!(filtered2.len(), 0);
434
435        // Newer ops should be sent
436        let new_ops = vec![make_position_op(1, 101)];
437        let filtered3 = encoder.filter_for_peer(&peer, &new_ops);
438        assert_eq!(filtered3.len(), 1);
439    }
440
441    #[test]
442    fn test_delta_encoder_stats() {
443        let mut encoder = DeltaEncoder::new(NodeId::new(1));
444
445        encoder.add_peer(&NodeId::new(2));
446        encoder.add_peer(&NodeId::new(3));
447
448        encoder.record_sent(&NodeId::new(2), 100);
449        encoder.record_sent(&NodeId::new(3), 50);
450        encoder.record_received(&NodeId::new(2), 75, 1000);
451
452        let stats = encoder.stats();
453        assert_eq!(stats.peer_count, 2);
454        assert_eq!(stats.total_bytes_sent, 150);
455        assert_eq!(stats.total_bytes_received, 75);
456        assert_eq!(stats.total_syncs, 2);
457    }
458
459    #[test]
460    fn test_vector_clock_increment() {
461        let mut clock = VectorClock::new();
462        let node = NodeId::new(1);
463
464        assert_eq!(clock.get(&node), 0);
465
466        clock.increment(&node);
467        assert_eq!(clock.get(&node), 1);
468
469        clock.increment(&node);
470        assert_eq!(clock.get(&node), 2);
471    }
472
473    #[test]
474    fn test_vector_clock_merge() {
475        let mut clock1 = VectorClock::new();
476        let mut clock2 = VectorClock::new();
477
478        let node1 = NodeId::new(1);
479        let node2 = NodeId::new(2);
480
481        clock1.update(&node1, 5);
482        clock1.update(&node2, 3);
483
484        clock2.update(&node1, 3);
485        clock2.update(&node2, 7);
486
487        clock1.merge(&clock2);
488
489        assert_eq!(clock1.get(&node1), 5); // max(5, 3)
490        assert_eq!(clock1.get(&node2), 7); // max(3, 7)
491    }
492
493    #[test]
494    fn test_vector_clock_happens_before() {
495        let mut clock1 = VectorClock::new();
496        let mut clock2 = VectorClock::new();
497
498        let node = NodeId::new(1);
499
500        clock1.update(&node, 1);
501        clock2.update(&node, 2);
502
503        assert!(clock1.happens_before(&clock2));
504        assert!(!clock2.happens_before(&clock1));
505    }
506
507    #[test]
508    fn test_vector_clock_concurrent() {
509        let mut clock1 = VectorClock::new();
510        let mut clock2 = VectorClock::new();
511
512        let node1 = NodeId::new(1);
513        let node2 = NodeId::new(2);
514
515        clock1.update(&node1, 2);
516        clock1.update(&node2, 1);
517
518        clock2.update(&node1, 1);
519        clock2.update(&node2, 2);
520
521        // Neither dominates the other
522        assert!(clock1.concurrent_with(&clock2));
523    }
524
525    #[test]
526    fn test_vector_clock_encode_decode() {
527        let mut clock = VectorClock::new();
528        clock.update(&NodeId::new(1), 5);
529        clock.update(&NodeId::new(2), 10);
530
531        let encoded = clock.encode();
532        let decoded = VectorClock::decode(&encoded).unwrap();
533
534        assert_eq!(decoded.get(&NodeId::new(1)), 5);
535        assert_eq!(decoded.get(&NodeId::new(2)), 10);
536    }
537
538    #[test]
539    fn test_reset_peer() {
540        let mut encoder = DeltaEncoder::new(NodeId::new(1));
541        let peer = NodeId::new(2);
542
543        encoder.add_peer(&peer);
544        encoder.mark_sent(&peer, &[make_position_op(1, 100)]);
545
546        // After reset, should need to send again
547        encoder.reset_peer(&peer);
548
549        let ops = vec![make_position_op(1, 100)];
550        let filtered = encoder.filter_for_peer(&peer, &ops);
551        assert_eq!(filtered.len(), 1);
552    }
553}