1#[cfg(not(feature = "std"))]
8use alloc::{collections::BTreeMap, format, string::String, string::ToString, vec::Vec};
9#[cfg(feature = "std")]
10use std::collections::HashMap;
11
12use super::crdt::{CrdtOperation, Timestamp};
13use crate::NodeId;
14
15#[derive(Debug, Clone, Default)]
17pub struct PeerSyncState {
18 #[cfg(feature = "std")]
20 last_sent: HashMap<String, Timestamp>,
21 #[cfg(not(feature = "std"))]
22 last_sent: BTreeMap<String, Timestamp>,
23
24 pub last_received_timestamp: Timestamp,
26
27 pub sync_count: u32,
29
30 pub bytes_sent: u64,
32
33 pub bytes_received: u64,
35}
36
37impl PeerSyncState {
38 pub fn new() -> Self {
40 Self::default()
41 }
42
43 pub fn mark_sent(&mut self, key: &str, timestamp: Timestamp) {
45 self.last_sent.insert(key.to_string(), timestamp);
46 }
47
48 pub fn last_sent_timestamp(&self, key: &str) -> Option<Timestamp> {
50 self.last_sent.get(key).copied()
51 }
52
53 pub fn needs_send(&self, key: &str, timestamp: Timestamp) -> bool {
55 match self.last_sent.get(key) {
56 Some(&last) => timestamp > last,
57 None => true,
58 }
59 }
60
61 pub fn reset(&mut self) {
63 self.last_sent.clear();
64 }
65}
66
67#[derive(Debug)]
69pub struct DeltaEncoder {
70 #[allow(dead_code)]
72 node_id: NodeId,
73
74 #[cfg(feature = "std")]
76 peers: HashMap<u32, PeerSyncState>,
77 #[cfg(not(feature = "std"))]
78 peers: BTreeMap<u32, PeerSyncState>,
79
80 #[cfg(feature = "std")]
82 current_state: HashMap<String, (u64, Timestamp)>,
83 #[cfg(not(feature = "std"))]
84 current_state: BTreeMap<String, (u64, Timestamp)>,
85}
86
87impl DeltaEncoder {
88 pub fn new(node_id: NodeId) -> Self {
90 Self {
91 node_id,
92 #[cfg(feature = "std")]
93 peers: HashMap::new(),
94 #[cfg(not(feature = "std"))]
95 peers: BTreeMap::new(),
96 #[cfg(feature = "std")]
97 current_state: HashMap::new(),
98 #[cfg(not(feature = "std"))]
99 current_state: BTreeMap::new(),
100 }
101 }
102
103 pub fn add_peer(&mut self, peer_id: &NodeId) {
105 self.peers.entry(peer_id.as_u32()).or_default();
106 }
107
108 pub fn remove_peer(&mut self, peer_id: &NodeId) {
110 self.peers.remove(&peer_id.as_u32());
111 }
112
113 pub fn get_peer_state(&self, peer_id: &NodeId) -> Option<&PeerSyncState> {
115 self.peers.get(&peer_id.as_u32())
116 }
117
118 pub fn get_peer_state_mut(&mut self, peer_id: &NodeId) -> Option<&mut PeerSyncState> {
120 self.peers.get_mut(&peer_id.as_u32())
121 }
122
123 pub fn update_state(&mut self, key: &str, value_hash: u64, timestamp: Timestamp) {
125 self.current_state
126 .insert(key.to_string(), (value_hash, timestamp));
127 }
128
129 pub fn filter_for_peer(
131 &self,
132 peer_id: &NodeId,
133 operations: &[CrdtOperation],
134 ) -> Vec<CrdtOperation> {
135 let peer_state = match self.peers.get(&peer_id.as_u32()) {
136 Some(state) => state,
137 None => return operations.to_vec(), };
139
140 operations
141 .iter()
142 .filter(|op| {
143 let (key, timestamp) = Self::operation_key_timestamp(op);
144 peer_state.needs_send(&key, timestamp)
145 })
146 .cloned()
147 .collect()
148 }
149
150 pub fn mark_sent(&mut self, peer_id: &NodeId, operations: &[CrdtOperation]) {
152 let peer_state = match self.peers.get_mut(&peer_id.as_u32()) {
153 Some(state) => state,
154 None => return,
155 };
156
157 for op in operations {
158 let (key, timestamp) = Self::operation_key_timestamp(op);
159 peer_state.mark_sent(&key, timestamp);
160 }
161 }
162
163 pub fn record_sent(&mut self, peer_id: &NodeId, bytes: usize) {
165 if let Some(state) = self.peers.get_mut(&peer_id.as_u32()) {
166 state.bytes_sent += bytes as u64;
167 state.sync_count += 1;
168 }
169 }
170
171 pub fn record_received(&mut self, peer_id: &NodeId, bytes: usize, timestamp: Timestamp) {
173 if let Some(state) = self.peers.get_mut(&peer_id.as_u32()) {
174 state.bytes_received += bytes as u64;
175 state.last_received_timestamp = timestamp;
176 }
177 }
178
179 pub fn reset_peer(&mut self, peer_id: &NodeId) {
181 if let Some(state) = self.peers.get_mut(&peer_id.as_u32()) {
182 state.reset();
183 }
184 }
185
186 pub fn stats(&self) -> DeltaStats {
188 let mut total_sent = 0u64;
189 let mut total_received = 0u64;
190 let mut total_syncs = 0u32;
191
192 for state in self.peers.values() {
193 total_sent += state.bytes_sent;
194 total_received += state.bytes_received;
195 total_syncs += state.sync_count;
196 }
197
198 DeltaStats {
199 peer_count: self.peers.len(),
200 total_bytes_sent: total_sent,
201 total_bytes_received: total_received,
202 total_syncs,
203 tracked_keys: self.current_state.len(),
204 }
205 }
206
207 fn operation_key_timestamp(op: &CrdtOperation) -> (String, Timestamp) {
209 match op {
210 CrdtOperation::UpdatePosition {
211 node_id, timestamp, ..
212 } => (format!("pos:{}", node_id), *timestamp),
213 CrdtOperation::UpdateHealth {
214 node_id, timestamp, ..
215 } => (format!("health:{}", node_id), *timestamp),
216 CrdtOperation::IncrementCounter {
217 counter_id,
218 node_id,
219 ..
220 } => {
221 (format!("counter:{}:{}", counter_id, node_id), u64::MAX)
223 }
224 CrdtOperation::UpdateRegister {
225 key,
226 timestamp,
227 node_id,
228 ..
229 } => (format!("reg:{}:{}", key, node_id), *timestamp),
230 }
231 }
232}
233
234#[derive(Debug, Clone, Default)]
236pub struct DeltaStats {
237 pub peer_count: usize,
239 pub total_bytes_sent: u64,
241 pub total_bytes_received: u64,
243 pub total_syncs: u32,
245 pub tracked_keys: usize,
247}
248
249#[derive(Debug, Clone, Default)]
251pub struct VectorClock {
252 #[cfg(feature = "std")]
254 clocks: HashMap<u32, u64>,
255 #[cfg(not(feature = "std"))]
256 clocks: BTreeMap<u32, u64>,
257}
258
259impl VectorClock {
260 pub fn new() -> Self {
262 Self::default()
263 }
264
265 pub fn increment(&mut self, node_id: &NodeId) -> u64 {
267 let clock = self.clocks.entry(node_id.as_u32()).or_insert(0);
268 *clock += 1;
269 *clock
270 }
271
272 pub fn get(&self, node_id: &NodeId) -> u64 {
274 self.clocks.get(&node_id.as_u32()).copied().unwrap_or(0)
275 }
276
277 pub fn update(&mut self, node_id: &NodeId, value: u64) {
279 let clock = self.clocks.entry(node_id.as_u32()).or_insert(0);
280 *clock = (*clock).max(value);
281 }
282
283 pub fn merge(&mut self, other: &VectorClock) {
285 for (&node_id, &value) in &other.clocks {
286 let clock = self.clocks.entry(node_id).or_insert(0);
287 *clock = (*clock).max(value);
288 }
289 }
290
291 pub fn happens_before(&self, other: &VectorClock) -> bool {
293 let mut dominated = false;
294
295 for (&node_id, &our_val) in &self.clocks {
297 let their_val = other.clocks.get(&node_id).copied().unwrap_or(0);
298 if our_val > their_val {
299 return false;
300 }
301 if our_val < their_val {
302 dominated = true;
303 }
304 }
305
306 for (&node_id, &their_val) in &other.clocks {
308 if !self.clocks.contains_key(&node_id) && their_val > 0 {
309 dominated = true;
310 }
311 }
312
313 dominated
314 }
315
316 pub fn concurrent_with(&self, other: &VectorClock) -> bool {
318 !self.happens_before(other) && !other.happens_before(self)
319 }
320
321 pub fn encode(&self) -> Vec<u8> {
323 let mut buf = Vec::with_capacity(4 + self.clocks.len() * 12);
324 buf.extend_from_slice(&(self.clocks.len() as u32).to_le_bytes());
325 for (&node_id, &value) in &self.clocks {
326 buf.extend_from_slice(&node_id.to_le_bytes());
327 buf.extend_from_slice(&value.to_le_bytes());
328 }
329 buf
330 }
331
332 pub fn decode(data: &[u8]) -> Option<Self> {
334 if data.len() < 4 {
335 return None;
336 }
337
338 let count = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
339 if data.len() < 4 + count * 12 {
340 return None;
341 }
342
343 #[cfg(feature = "std")]
344 let mut clocks = HashMap::with_capacity(count);
345 #[cfg(not(feature = "std"))]
346 let mut clocks = BTreeMap::new();
347
348 let mut offset = 4;
349 for _ in 0..count {
350 let node_id = u32::from_le_bytes([
351 data[offset],
352 data[offset + 1],
353 data[offset + 2],
354 data[offset + 3],
355 ]);
356 let value = u64::from_le_bytes([
357 data[offset + 4],
358 data[offset + 5],
359 data[offset + 6],
360 data[offset + 7],
361 data[offset + 8],
362 data[offset + 9],
363 data[offset + 10],
364 data[offset + 11],
365 ]);
366 clocks.insert(node_id, value);
367 offset += 12;
368 }
369
370 Some(Self { clocks })
371 }
372}
373
374#[cfg(test)]
375mod tests {
376 use super::*;
377 use crate::sync::crdt::Position;
378
379 fn make_position_op(node_id: u32, timestamp: u64) -> CrdtOperation {
380 CrdtOperation::UpdatePosition {
381 node_id: NodeId::new(node_id),
382 position: Position::new(37.0, -122.0),
383 timestamp,
384 }
385 }
386
387 #[test]
388 fn test_peer_sync_state() {
389 let mut state = PeerSyncState::new();
390
391 assert!(state.needs_send("key1", 100));
392
393 state.mark_sent("key1", 100);
394
395 assert!(!state.needs_send("key1", 100));
396 assert!(!state.needs_send("key1", 50));
397 assert!(state.needs_send("key1", 101));
398 }
399
400 #[test]
401 fn test_delta_encoder_filter() {
402 let mut encoder = DeltaEncoder::new(NodeId::new(1));
403 let peer = NodeId::new(2);
404
405 encoder.add_peer(&peer);
406
407 let ops = vec![make_position_op(1, 100), make_position_op(2, 200)];
408
409 let filtered = encoder.filter_for_peer(&peer, &ops);
411 assert_eq!(filtered.len(), 2);
412
413 encoder.mark_sent(&peer, &filtered);
415
416 let filtered2 = encoder.filter_for_peer(&peer, &ops);
418 assert_eq!(filtered2.len(), 0);
419
420 let new_ops = vec![make_position_op(1, 101)];
422 let filtered3 = encoder.filter_for_peer(&peer, &new_ops);
423 assert_eq!(filtered3.len(), 1);
424 }
425
426 #[test]
427 fn test_delta_encoder_stats() {
428 let mut encoder = DeltaEncoder::new(NodeId::new(1));
429
430 encoder.add_peer(&NodeId::new(2));
431 encoder.add_peer(&NodeId::new(3));
432
433 encoder.record_sent(&NodeId::new(2), 100);
434 encoder.record_sent(&NodeId::new(3), 50);
435 encoder.record_received(&NodeId::new(2), 75, 1000);
436
437 let stats = encoder.stats();
438 assert_eq!(stats.peer_count, 2);
439 assert_eq!(stats.total_bytes_sent, 150);
440 assert_eq!(stats.total_bytes_received, 75);
441 assert_eq!(stats.total_syncs, 2);
442 }
443
444 #[test]
445 fn test_vector_clock_increment() {
446 let mut clock = VectorClock::new();
447 let node = NodeId::new(1);
448
449 assert_eq!(clock.get(&node), 0);
450
451 clock.increment(&node);
452 assert_eq!(clock.get(&node), 1);
453
454 clock.increment(&node);
455 assert_eq!(clock.get(&node), 2);
456 }
457
458 #[test]
459 fn test_vector_clock_merge() {
460 let mut clock1 = VectorClock::new();
461 let mut clock2 = VectorClock::new();
462
463 let node1 = NodeId::new(1);
464 let node2 = NodeId::new(2);
465
466 clock1.update(&node1, 5);
467 clock1.update(&node2, 3);
468
469 clock2.update(&node1, 3);
470 clock2.update(&node2, 7);
471
472 clock1.merge(&clock2);
473
474 assert_eq!(clock1.get(&node1), 5); assert_eq!(clock1.get(&node2), 7); }
477
478 #[test]
479 fn test_vector_clock_happens_before() {
480 let mut clock1 = VectorClock::new();
481 let mut clock2 = VectorClock::new();
482
483 let node = NodeId::new(1);
484
485 clock1.update(&node, 1);
486 clock2.update(&node, 2);
487
488 assert!(clock1.happens_before(&clock2));
489 assert!(!clock2.happens_before(&clock1));
490 }
491
492 #[test]
493 fn test_vector_clock_concurrent() {
494 let mut clock1 = VectorClock::new();
495 let mut clock2 = VectorClock::new();
496
497 let node1 = NodeId::new(1);
498 let node2 = NodeId::new(2);
499
500 clock1.update(&node1, 2);
501 clock1.update(&node2, 1);
502
503 clock2.update(&node1, 1);
504 clock2.update(&node2, 2);
505
506 assert!(clock1.concurrent_with(&clock2));
508 }
509
510 #[test]
511 fn test_vector_clock_encode_decode() {
512 let mut clock = VectorClock::new();
513 clock.update(&NodeId::new(1), 5);
514 clock.update(&NodeId::new(2), 10);
515
516 let encoded = clock.encode();
517 let decoded = VectorClock::decode(&encoded).unwrap();
518
519 assert_eq!(decoded.get(&NodeId::new(1)), 5);
520 assert_eq!(decoded.get(&NodeId::new(2)), 10);
521 }
522
523 #[test]
524 fn test_reset_peer() {
525 let mut encoder = DeltaEncoder::new(NodeId::new(1));
526 let peer = NodeId::new(2);
527
528 encoder.add_peer(&peer);
529 encoder.mark_sent(&peer, &[make_position_op(1, 100)]);
530
531 encoder.reset_peer(&peer);
533
534 let ops = vec![make_position_op(1, 100)];
535 let filtered = encoder.filter_for_peer(&peer, &ops);
536 assert_eq!(filtered.len(), 1);
537 }
538}