hive_btle/sync/
delta.rs

1//! Delta Encoder for HIVE-Lite Sync
2//!
3//! Tracks what state has been sent to each peer and only sends
4//! the changes (deltas) since the last sync. This dramatically
5//! reduces bandwidth over BLE.
6
7#[cfg(not(feature = "std"))]
8use alloc::{collections::BTreeMap, format, string::String, string::ToString, vec::Vec};
9#[cfg(feature = "std")]
10use std::collections::HashMap;
11
12use super::crdt::{CrdtOperation, Timestamp};
13use crate::NodeId;
14
15/// Tracks the sync state with a specific peer
16#[derive(Debug, Clone, Default)]
17pub struct PeerSyncState {
18    /// Last timestamp we synced each key
19    #[cfg(feature = "std")]
20    last_sent: HashMap<String, Timestamp>,
21    #[cfg(not(feature = "std"))]
22    last_sent: BTreeMap<String, Timestamp>,
23
24    /// Last timestamp we received from this peer
25    pub last_received_timestamp: Timestamp,
26
27    /// Number of successful syncs
28    pub sync_count: u32,
29
30    /// Bytes sent to this peer
31    pub bytes_sent: u64,
32
33    /// Bytes received from this peer
34    pub bytes_received: u64,
35}
36
37impl PeerSyncState {
38    /// Create new peer sync state
39    pub fn new() -> Self {
40        Self::default()
41    }
42
43    /// Record that we sent a key at a timestamp
44    pub fn mark_sent(&mut self, key: &str, timestamp: Timestamp) {
45        self.last_sent.insert(key.to_string(), timestamp);
46    }
47
48    /// Get the last sent timestamp for a key
49    pub fn last_sent_timestamp(&self, key: &str) -> Option<Timestamp> {
50        self.last_sent.get(key).copied()
51    }
52
53    /// Check if we need to send this key (has newer timestamp)
54    pub fn needs_send(&self, key: &str, timestamp: Timestamp) -> bool {
55        match self.last_sent.get(key) {
56            Some(&last) => timestamp > last,
57            None => true,
58        }
59    }
60
61    /// Clear all tracking (for reconnection)
62    pub fn reset(&mut self) {
63        self.last_sent.clear();
64    }
65}
66
67/// Manages delta encoding for all peers
68#[derive(Debug)]
69pub struct DeltaEncoder {
70    /// Our node ID (for future use in operations)
71    #[allow(dead_code)]
72    node_id: NodeId,
73
74    /// Sync state per peer
75    #[cfg(feature = "std")]
76    peers: HashMap<u32, PeerSyncState>,
77    #[cfg(not(feature = "std"))]
78    peers: BTreeMap<u32, PeerSyncState>,
79
80    /// Current state (key -> (value_hash, timestamp))
81    #[cfg(feature = "std")]
82    current_state: HashMap<String, (u64, Timestamp)>,
83    #[cfg(not(feature = "std"))]
84    current_state: BTreeMap<String, (u64, Timestamp)>,
85}
86
87impl DeltaEncoder {
88    /// Create a new delta encoder
89    pub fn new(node_id: NodeId) -> Self {
90        Self {
91            node_id,
92            #[cfg(feature = "std")]
93            peers: HashMap::new(),
94            #[cfg(not(feature = "std"))]
95            peers: BTreeMap::new(),
96            #[cfg(feature = "std")]
97            current_state: HashMap::new(),
98            #[cfg(not(feature = "std"))]
99            current_state: BTreeMap::new(),
100        }
101    }
102
103    /// Register a peer for delta tracking
104    pub fn add_peer(&mut self, peer_id: &NodeId) {
105        self.peers.entry(peer_id.as_u32()).or_default();
106    }
107
108    /// Remove a peer
109    pub fn remove_peer(&mut self, peer_id: &NodeId) {
110        self.peers.remove(&peer_id.as_u32());
111    }
112
113    /// Get peer sync state
114    pub fn get_peer_state(&self, peer_id: &NodeId) -> Option<&PeerSyncState> {
115        self.peers.get(&peer_id.as_u32())
116    }
117
118    /// Get mutable peer sync state
119    pub fn get_peer_state_mut(&mut self, peer_id: &NodeId) -> Option<&mut PeerSyncState> {
120        self.peers.get_mut(&peer_id.as_u32())
121    }
122
123    /// Update our current state with an operation
124    pub fn update_state(&mut self, key: &str, value_hash: u64, timestamp: Timestamp) {
125        self.current_state
126            .insert(key.to_string(), (value_hash, timestamp));
127    }
128
129    /// Filter operations to only those that need to be sent to a peer
130    pub fn filter_for_peer(
131        &self,
132        peer_id: &NodeId,
133        operations: &[CrdtOperation],
134    ) -> Vec<CrdtOperation> {
135        let peer_state = match self.peers.get(&peer_id.as_u32()) {
136            Some(state) => state,
137            None => return operations.to_vec(), // Unknown peer, send all
138        };
139
140        operations
141            .iter()
142            .filter(|op| {
143                let (key, timestamp) = Self::operation_key_timestamp(op);
144                peer_state.needs_send(&key, timestamp)
145            })
146            .cloned()
147            .collect()
148    }
149
150    /// Mark operations as sent to a peer
151    pub fn mark_sent(&mut self, peer_id: &NodeId, operations: &[CrdtOperation]) {
152        let peer_state = match self.peers.get_mut(&peer_id.as_u32()) {
153            Some(state) => state,
154            None => return,
155        };
156
157        for op in operations {
158            let (key, timestamp) = Self::operation_key_timestamp(op);
159            peer_state.mark_sent(&key, timestamp);
160        }
161    }
162
163    /// Record bytes sent to peer
164    pub fn record_sent(&mut self, peer_id: &NodeId, bytes: usize) {
165        if let Some(state) = self.peers.get_mut(&peer_id.as_u32()) {
166            state.bytes_sent += bytes as u64;
167            state.sync_count += 1;
168        }
169    }
170
171    /// Record bytes received from peer
172    pub fn record_received(&mut self, peer_id: &NodeId, bytes: usize, timestamp: Timestamp) {
173        if let Some(state) = self.peers.get_mut(&peer_id.as_u32()) {
174            state.bytes_received += bytes as u64;
175            state.last_received_timestamp = timestamp;
176        }
177    }
178
179    /// Reset sync state for a peer (e.g., on reconnection)
180    pub fn reset_peer(&mut self, peer_id: &NodeId) {
181        if let Some(state) = self.peers.get_mut(&peer_id.as_u32()) {
182            state.reset();
183        }
184    }
185
186    /// Get sync statistics
187    pub fn stats(&self) -> DeltaStats {
188        let mut total_sent = 0u64;
189        let mut total_received = 0u64;
190        let mut total_syncs = 0u32;
191
192        for state in self.peers.values() {
193            total_sent += state.bytes_sent;
194            total_received += state.bytes_received;
195            total_syncs += state.sync_count;
196        }
197
198        DeltaStats {
199            peer_count: self.peers.len(),
200            total_bytes_sent: total_sent,
201            total_bytes_received: total_received,
202            total_syncs,
203            tracked_keys: self.current_state.len(),
204        }
205    }
206
207    /// Extract key and timestamp from an operation
208    fn operation_key_timestamp(op: &CrdtOperation) -> (String, Timestamp) {
209        match op {
210            CrdtOperation::UpdatePosition {
211                node_id, timestamp, ..
212            } => (format!("pos:{}", node_id), *timestamp),
213            CrdtOperation::UpdateHealth {
214                node_id, timestamp, ..
215            } => (format!("health:{}", node_id), *timestamp),
216            CrdtOperation::IncrementCounter {
217                counter_id,
218                node_id,
219                ..
220            } => {
221                // Counters always need to be synced (they accumulate)
222                (format!("counter:{}:{}", counter_id, node_id), u64::MAX)
223            }
224            CrdtOperation::UpdateRegister {
225                key,
226                timestamp,
227                node_id,
228                ..
229            } => (format!("reg:{}:{}", key, node_id), *timestamp),
230        }
231    }
232}
233
234/// Statistics about delta encoding
235#[derive(Debug, Clone, Default)]
236pub struct DeltaStats {
237    /// Number of peers being tracked
238    pub peer_count: usize,
239    /// Total bytes sent across all peers
240    pub total_bytes_sent: u64,
241    /// Total bytes received across all peers
242    pub total_bytes_received: u64,
243    /// Total number of sync operations
244    pub total_syncs: u32,
245    /// Number of keys being tracked
246    pub tracked_keys: usize,
247}
248
249/// Vector clock for causality tracking
250#[derive(Debug, Clone, Default)]
251pub struct VectorClock {
252    /// Per-node logical clocks
253    #[cfg(feature = "std")]
254    clocks: HashMap<u32, u64>,
255    #[cfg(not(feature = "std"))]
256    clocks: BTreeMap<u32, u64>,
257}
258
259impl VectorClock {
260    /// Create a new empty vector clock
261    pub fn new() -> Self {
262        Self::default()
263    }
264
265    /// Increment the clock for a node
266    pub fn increment(&mut self, node_id: &NodeId) -> u64 {
267        let clock = self.clocks.entry(node_id.as_u32()).or_insert(0);
268        *clock += 1;
269        *clock
270    }
271
272    /// Get the clock value for a node
273    pub fn get(&self, node_id: &NodeId) -> u64 {
274        self.clocks.get(&node_id.as_u32()).copied().unwrap_or(0)
275    }
276
277    /// Update clock for a node (take max)
278    pub fn update(&mut self, node_id: &NodeId, value: u64) {
279        let clock = self.clocks.entry(node_id.as_u32()).or_insert(0);
280        *clock = (*clock).max(value);
281    }
282
283    /// Merge with another vector clock (take component-wise max)
284    pub fn merge(&mut self, other: &VectorClock) {
285        for (&node_id, &value) in &other.clocks {
286            let clock = self.clocks.entry(node_id).or_insert(0);
287            *clock = (*clock).max(value);
288        }
289    }
290
291    /// Check if this clock happens-before another
292    pub fn happens_before(&self, other: &VectorClock) -> bool {
293        let mut dominated = false;
294
295        // All our clocks must be <= other's
296        for (&node_id, &our_val) in &self.clocks {
297            let their_val = other.clocks.get(&node_id).copied().unwrap_or(0);
298            if our_val > their_val {
299                return false;
300            }
301            if our_val < their_val {
302                dominated = true;
303            }
304        }
305
306        // Check for clocks they have that we don't
307        for (&node_id, &their_val) in &other.clocks {
308            if !self.clocks.contains_key(&node_id) && their_val > 0 {
309                dominated = true;
310            }
311        }
312
313        dominated
314    }
315
316    /// Check if clocks are concurrent (neither happens-before the other)
317    pub fn concurrent_with(&self, other: &VectorClock) -> bool {
318        !self.happens_before(other) && !other.happens_before(self)
319    }
320
321    /// Encode to bytes
322    pub fn encode(&self) -> Vec<u8> {
323        let mut buf = Vec::with_capacity(4 + self.clocks.len() * 12);
324        buf.extend_from_slice(&(self.clocks.len() as u32).to_le_bytes());
325        for (&node_id, &value) in &self.clocks {
326            buf.extend_from_slice(&node_id.to_le_bytes());
327            buf.extend_from_slice(&value.to_le_bytes());
328        }
329        buf
330    }
331
332    /// Decode from bytes
333    pub fn decode(data: &[u8]) -> Option<Self> {
334        if data.len() < 4 {
335            return None;
336        }
337
338        let count = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
339        if data.len() < 4 + count * 12 {
340            return None;
341        }
342
343        #[cfg(feature = "std")]
344        let mut clocks = HashMap::with_capacity(count);
345        #[cfg(not(feature = "std"))]
346        let mut clocks = BTreeMap::new();
347
348        let mut offset = 4;
349        for _ in 0..count {
350            let node_id = u32::from_le_bytes([
351                data[offset],
352                data[offset + 1],
353                data[offset + 2],
354                data[offset + 3],
355            ]);
356            let value = u64::from_le_bytes([
357                data[offset + 4],
358                data[offset + 5],
359                data[offset + 6],
360                data[offset + 7],
361                data[offset + 8],
362                data[offset + 9],
363                data[offset + 10],
364                data[offset + 11],
365            ]);
366            clocks.insert(node_id, value);
367            offset += 12;
368        }
369
370        Some(Self { clocks })
371    }
372}
373
374#[cfg(test)]
375mod tests {
376    use super::*;
377    use crate::sync::crdt::Position;
378
379    fn make_position_op(node_id: u32, timestamp: u64) -> CrdtOperation {
380        CrdtOperation::UpdatePosition {
381            node_id: NodeId::new(node_id),
382            position: Position::new(37.0, -122.0),
383            timestamp,
384        }
385    }
386
387    #[test]
388    fn test_peer_sync_state() {
389        let mut state = PeerSyncState::new();
390
391        assert!(state.needs_send("key1", 100));
392
393        state.mark_sent("key1", 100);
394
395        assert!(!state.needs_send("key1", 100));
396        assert!(!state.needs_send("key1", 50));
397        assert!(state.needs_send("key1", 101));
398    }
399
400    #[test]
401    fn test_delta_encoder_filter() {
402        let mut encoder = DeltaEncoder::new(NodeId::new(1));
403        let peer = NodeId::new(2);
404
405        encoder.add_peer(&peer);
406
407        let ops = vec![make_position_op(1, 100), make_position_op(2, 200)];
408
409        // First time, all ops should be sent
410        let filtered = encoder.filter_for_peer(&peer, &ops);
411        assert_eq!(filtered.len(), 2);
412
413        // Mark as sent
414        encoder.mark_sent(&peer, &filtered);
415
416        // Same ops should not be sent again
417        let filtered2 = encoder.filter_for_peer(&peer, &ops);
418        assert_eq!(filtered2.len(), 0);
419
420        // Newer ops should be sent
421        let new_ops = vec![make_position_op(1, 101)];
422        let filtered3 = encoder.filter_for_peer(&peer, &new_ops);
423        assert_eq!(filtered3.len(), 1);
424    }
425
426    #[test]
427    fn test_delta_encoder_stats() {
428        let mut encoder = DeltaEncoder::new(NodeId::new(1));
429
430        encoder.add_peer(&NodeId::new(2));
431        encoder.add_peer(&NodeId::new(3));
432
433        encoder.record_sent(&NodeId::new(2), 100);
434        encoder.record_sent(&NodeId::new(3), 50);
435        encoder.record_received(&NodeId::new(2), 75, 1000);
436
437        let stats = encoder.stats();
438        assert_eq!(stats.peer_count, 2);
439        assert_eq!(stats.total_bytes_sent, 150);
440        assert_eq!(stats.total_bytes_received, 75);
441        assert_eq!(stats.total_syncs, 2);
442    }
443
444    #[test]
445    fn test_vector_clock_increment() {
446        let mut clock = VectorClock::new();
447        let node = NodeId::new(1);
448
449        assert_eq!(clock.get(&node), 0);
450
451        clock.increment(&node);
452        assert_eq!(clock.get(&node), 1);
453
454        clock.increment(&node);
455        assert_eq!(clock.get(&node), 2);
456    }
457
458    #[test]
459    fn test_vector_clock_merge() {
460        let mut clock1 = VectorClock::new();
461        let mut clock2 = VectorClock::new();
462
463        let node1 = NodeId::new(1);
464        let node2 = NodeId::new(2);
465
466        clock1.update(&node1, 5);
467        clock1.update(&node2, 3);
468
469        clock2.update(&node1, 3);
470        clock2.update(&node2, 7);
471
472        clock1.merge(&clock2);
473
474        assert_eq!(clock1.get(&node1), 5); // max(5, 3)
475        assert_eq!(clock1.get(&node2), 7); // max(3, 7)
476    }
477
478    #[test]
479    fn test_vector_clock_happens_before() {
480        let mut clock1 = VectorClock::new();
481        let mut clock2 = VectorClock::new();
482
483        let node = NodeId::new(1);
484
485        clock1.update(&node, 1);
486        clock2.update(&node, 2);
487
488        assert!(clock1.happens_before(&clock2));
489        assert!(!clock2.happens_before(&clock1));
490    }
491
492    #[test]
493    fn test_vector_clock_concurrent() {
494        let mut clock1 = VectorClock::new();
495        let mut clock2 = VectorClock::new();
496
497        let node1 = NodeId::new(1);
498        let node2 = NodeId::new(2);
499
500        clock1.update(&node1, 2);
501        clock1.update(&node2, 1);
502
503        clock2.update(&node1, 1);
504        clock2.update(&node2, 2);
505
506        // Neither dominates the other
507        assert!(clock1.concurrent_with(&clock2));
508    }
509
510    #[test]
511    fn test_vector_clock_encode_decode() {
512        let mut clock = VectorClock::new();
513        clock.update(&NodeId::new(1), 5);
514        clock.update(&NodeId::new(2), 10);
515
516        let encoded = clock.encode();
517        let decoded = VectorClock::decode(&encoded).unwrap();
518
519        assert_eq!(decoded.get(&NodeId::new(1)), 5);
520        assert_eq!(decoded.get(&NodeId::new(2)), 10);
521    }
522
523    #[test]
524    fn test_reset_peer() {
525        let mut encoder = DeltaEncoder::new(NodeId::new(1));
526        let peer = NodeId::new(2);
527
528        encoder.add_peer(&peer);
529        encoder.mark_sent(&peer, &[make_position_op(1, 100)]);
530
531        // After reset, should need to send again
532        encoder.reset_peer(&peer);
533
534        let ops = vec![make_position_op(1, 100)];
535        let filtered = encoder.filter_for_peer(&peer, &ops);
536        assert_eq!(filtered.len(), 1);
537    }
538}