1#[cfg(not(feature = "std"))]
23use alloc::{collections::BTreeMap, format, string::String, string::ToString, vec::Vec};
24#[cfg(feature = "std")]
25use std::collections::HashMap;
26
27use super::crdt::{CrdtOperation, Timestamp};
28use crate::NodeId;
29
30#[derive(Debug, Clone, Default)]
32pub struct PeerSyncState {
33 #[cfg(feature = "std")]
35 last_sent: HashMap<String, Timestamp>,
36 #[cfg(not(feature = "std"))]
37 last_sent: BTreeMap<String, Timestamp>,
38
39 pub last_received_timestamp: Timestamp,
41
42 pub sync_count: u32,
44
45 pub bytes_sent: u64,
47
48 pub bytes_received: u64,
50}
51
52impl PeerSyncState {
53 pub fn new() -> Self {
55 Self::default()
56 }
57
58 pub fn mark_sent(&mut self, key: &str, timestamp: Timestamp) {
60 self.last_sent.insert(key.to_string(), timestamp);
61 }
62
63 pub fn last_sent_timestamp(&self, key: &str) -> Option<Timestamp> {
65 self.last_sent.get(key).copied()
66 }
67
68 pub fn needs_send(&self, key: &str, timestamp: Timestamp) -> bool {
70 match self.last_sent.get(key) {
71 Some(&last) => timestamp > last,
72 None => true,
73 }
74 }
75
76 pub fn reset(&mut self) {
78 self.last_sent.clear();
79 }
80}
81
82#[derive(Debug)]
84pub struct DeltaEncoder {
85 #[allow(dead_code)]
87 node_id: NodeId,
88
89 #[cfg(feature = "std")]
91 peers: HashMap<u32, PeerSyncState>,
92 #[cfg(not(feature = "std"))]
93 peers: BTreeMap<u32, PeerSyncState>,
94
95 #[cfg(feature = "std")]
97 current_state: HashMap<String, (u64, Timestamp)>,
98 #[cfg(not(feature = "std"))]
99 current_state: BTreeMap<String, (u64, Timestamp)>,
100}
101
102impl DeltaEncoder {
103 pub fn new(node_id: NodeId) -> Self {
105 Self {
106 node_id,
107 #[cfg(feature = "std")]
108 peers: HashMap::new(),
109 #[cfg(not(feature = "std"))]
110 peers: BTreeMap::new(),
111 #[cfg(feature = "std")]
112 current_state: HashMap::new(),
113 #[cfg(not(feature = "std"))]
114 current_state: BTreeMap::new(),
115 }
116 }
117
118 pub fn add_peer(&mut self, peer_id: &NodeId) {
120 self.peers.entry(peer_id.as_u32()).or_default();
121 }
122
123 pub fn remove_peer(&mut self, peer_id: &NodeId) {
125 self.peers.remove(&peer_id.as_u32());
126 }
127
128 pub fn get_peer_state(&self, peer_id: &NodeId) -> Option<&PeerSyncState> {
130 self.peers.get(&peer_id.as_u32())
131 }
132
133 pub fn get_peer_state_mut(&mut self, peer_id: &NodeId) -> Option<&mut PeerSyncState> {
135 self.peers.get_mut(&peer_id.as_u32())
136 }
137
138 pub fn update_state(&mut self, key: &str, value_hash: u64, timestamp: Timestamp) {
140 self.current_state
141 .insert(key.to_string(), (value_hash, timestamp));
142 }
143
144 pub fn filter_for_peer(
146 &self,
147 peer_id: &NodeId,
148 operations: &[CrdtOperation],
149 ) -> Vec<CrdtOperation> {
150 let peer_state = match self.peers.get(&peer_id.as_u32()) {
151 Some(state) => state,
152 None => return operations.to_vec(), };
154
155 operations
156 .iter()
157 .filter(|op| {
158 let (key, timestamp) = Self::operation_key_timestamp(op);
159 peer_state.needs_send(&key, timestamp)
160 })
161 .cloned()
162 .collect()
163 }
164
165 pub fn mark_sent(&mut self, peer_id: &NodeId, operations: &[CrdtOperation]) {
167 let peer_state = match self.peers.get_mut(&peer_id.as_u32()) {
168 Some(state) => state,
169 None => return,
170 };
171
172 for op in operations {
173 let (key, timestamp) = Self::operation_key_timestamp(op);
174 peer_state.mark_sent(&key, timestamp);
175 }
176 }
177
178 pub fn record_sent(&mut self, peer_id: &NodeId, bytes: usize) {
180 if let Some(state) = self.peers.get_mut(&peer_id.as_u32()) {
181 state.bytes_sent += bytes as u64;
182 state.sync_count += 1;
183 }
184 }
185
186 pub fn record_received(&mut self, peer_id: &NodeId, bytes: usize, timestamp: Timestamp) {
188 if let Some(state) = self.peers.get_mut(&peer_id.as_u32()) {
189 state.bytes_received += bytes as u64;
190 state.last_received_timestamp = timestamp;
191 }
192 }
193
194 pub fn reset_peer(&mut self, peer_id: &NodeId) {
196 if let Some(state) = self.peers.get_mut(&peer_id.as_u32()) {
197 state.reset();
198 }
199 }
200
201 pub fn stats(&self) -> DeltaStats {
203 let mut total_sent = 0u64;
204 let mut total_received = 0u64;
205 let mut total_syncs = 0u32;
206
207 for state in self.peers.values() {
208 total_sent += state.bytes_sent;
209 total_received += state.bytes_received;
210 total_syncs += state.sync_count;
211 }
212
213 DeltaStats {
214 peer_count: self.peers.len(),
215 total_bytes_sent: total_sent,
216 total_bytes_received: total_received,
217 total_syncs,
218 tracked_keys: self.current_state.len(),
219 }
220 }
221
222 fn operation_key_timestamp(op: &CrdtOperation) -> (String, Timestamp) {
224 match op {
225 CrdtOperation::UpdatePosition {
226 node_id, timestamp, ..
227 } => (format!("pos:{}", node_id), *timestamp),
228 CrdtOperation::UpdateHealth {
229 node_id, timestamp, ..
230 } => (format!("health:{}", node_id), *timestamp),
231 CrdtOperation::IncrementCounter {
232 counter_id,
233 node_id,
234 ..
235 } => {
236 (format!("counter:{}:{}", counter_id, node_id), u64::MAX)
238 }
239 CrdtOperation::UpdateRegister {
240 key,
241 timestamp,
242 node_id,
243 ..
244 } => (format!("reg:{}:{}", key, node_id), *timestamp),
245 }
246 }
247}
248
249#[derive(Debug, Clone, Default)]
251pub struct DeltaStats {
252 pub peer_count: usize,
254 pub total_bytes_sent: u64,
256 pub total_bytes_received: u64,
258 pub total_syncs: u32,
260 pub tracked_keys: usize,
262}
263
264#[derive(Debug, Clone, Default)]
266pub struct VectorClock {
267 #[cfg(feature = "std")]
269 clocks: HashMap<u32, u64>,
270 #[cfg(not(feature = "std"))]
271 clocks: BTreeMap<u32, u64>,
272}
273
274impl VectorClock {
275 pub fn new() -> Self {
277 Self::default()
278 }
279
280 pub fn increment(&mut self, node_id: &NodeId) -> u64 {
282 let clock = self.clocks.entry(node_id.as_u32()).or_insert(0);
283 *clock += 1;
284 *clock
285 }
286
287 pub fn get(&self, node_id: &NodeId) -> u64 {
289 self.clocks.get(&node_id.as_u32()).copied().unwrap_or(0)
290 }
291
292 pub fn update(&mut self, node_id: &NodeId, value: u64) {
294 let clock = self.clocks.entry(node_id.as_u32()).or_insert(0);
295 *clock = (*clock).max(value);
296 }
297
298 pub fn merge(&mut self, other: &VectorClock) {
300 for (&node_id, &value) in &other.clocks {
301 let clock = self.clocks.entry(node_id).or_insert(0);
302 *clock = (*clock).max(value);
303 }
304 }
305
306 pub fn happens_before(&self, other: &VectorClock) -> bool {
308 let mut dominated = false;
309
310 for (&node_id, &our_val) in &self.clocks {
312 let their_val = other.clocks.get(&node_id).copied().unwrap_or(0);
313 if our_val > their_val {
314 return false;
315 }
316 if our_val < their_val {
317 dominated = true;
318 }
319 }
320
321 for (&node_id, &their_val) in &other.clocks {
323 if !self.clocks.contains_key(&node_id) && their_val > 0 {
324 dominated = true;
325 }
326 }
327
328 dominated
329 }
330
331 pub fn concurrent_with(&self, other: &VectorClock) -> bool {
333 !self.happens_before(other) && !other.happens_before(self)
334 }
335
336 pub fn encode(&self) -> Vec<u8> {
338 let mut buf = Vec::with_capacity(4 + self.clocks.len() * 12);
339 buf.extend_from_slice(&(self.clocks.len() as u32).to_le_bytes());
340 for (&node_id, &value) in &self.clocks {
341 buf.extend_from_slice(&node_id.to_le_bytes());
342 buf.extend_from_slice(&value.to_le_bytes());
343 }
344 buf
345 }
346
347 pub fn decode(data: &[u8]) -> Option<Self> {
349 if data.len() < 4 {
350 return None;
351 }
352
353 let count = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
354 if data.len() < 4 + count * 12 {
355 return None;
356 }
357
358 #[cfg(feature = "std")]
359 let mut clocks = HashMap::with_capacity(count);
360 #[cfg(not(feature = "std"))]
361 let mut clocks = BTreeMap::new();
362
363 let mut offset = 4;
364 for _ in 0..count {
365 let node_id = u32::from_le_bytes([
366 data[offset],
367 data[offset + 1],
368 data[offset + 2],
369 data[offset + 3],
370 ]);
371 let value = u64::from_le_bytes([
372 data[offset + 4],
373 data[offset + 5],
374 data[offset + 6],
375 data[offset + 7],
376 data[offset + 8],
377 data[offset + 9],
378 data[offset + 10],
379 data[offset + 11],
380 ]);
381 clocks.insert(node_id, value);
382 offset += 12;
383 }
384
385 Some(Self { clocks })
386 }
387}
388
389#[cfg(test)]
390mod tests {
391 use super::*;
392 use crate::sync::crdt::Position;
393
394 fn make_position_op(node_id: u32, timestamp: u64) -> CrdtOperation {
395 CrdtOperation::UpdatePosition {
396 node_id: NodeId::new(node_id),
397 position: Position::new(37.0, -122.0),
398 timestamp,
399 }
400 }
401
402 #[test]
403 fn test_peer_sync_state() {
404 let mut state = PeerSyncState::new();
405
406 assert!(state.needs_send("key1", 100));
407
408 state.mark_sent("key1", 100);
409
410 assert!(!state.needs_send("key1", 100));
411 assert!(!state.needs_send("key1", 50));
412 assert!(state.needs_send("key1", 101));
413 }
414
415 #[test]
416 fn test_delta_encoder_filter() {
417 let mut encoder = DeltaEncoder::new(NodeId::new(1));
418 let peer = NodeId::new(2);
419
420 encoder.add_peer(&peer);
421
422 let ops = vec![make_position_op(1, 100), make_position_op(2, 200)];
423
424 let filtered = encoder.filter_for_peer(&peer, &ops);
426 assert_eq!(filtered.len(), 2);
427
428 encoder.mark_sent(&peer, &filtered);
430
431 let filtered2 = encoder.filter_for_peer(&peer, &ops);
433 assert_eq!(filtered2.len(), 0);
434
435 let new_ops = vec![make_position_op(1, 101)];
437 let filtered3 = encoder.filter_for_peer(&peer, &new_ops);
438 assert_eq!(filtered3.len(), 1);
439 }
440
441 #[test]
442 fn test_delta_encoder_stats() {
443 let mut encoder = DeltaEncoder::new(NodeId::new(1));
444
445 encoder.add_peer(&NodeId::new(2));
446 encoder.add_peer(&NodeId::new(3));
447
448 encoder.record_sent(&NodeId::new(2), 100);
449 encoder.record_sent(&NodeId::new(3), 50);
450 encoder.record_received(&NodeId::new(2), 75, 1000);
451
452 let stats = encoder.stats();
453 assert_eq!(stats.peer_count, 2);
454 assert_eq!(stats.total_bytes_sent, 150);
455 assert_eq!(stats.total_bytes_received, 75);
456 assert_eq!(stats.total_syncs, 2);
457 }
458
459 #[test]
460 fn test_vector_clock_increment() {
461 let mut clock = VectorClock::new();
462 let node = NodeId::new(1);
463
464 assert_eq!(clock.get(&node), 0);
465
466 clock.increment(&node);
467 assert_eq!(clock.get(&node), 1);
468
469 clock.increment(&node);
470 assert_eq!(clock.get(&node), 2);
471 }
472
473 #[test]
474 fn test_vector_clock_merge() {
475 let mut clock1 = VectorClock::new();
476 let mut clock2 = VectorClock::new();
477
478 let node1 = NodeId::new(1);
479 let node2 = NodeId::new(2);
480
481 clock1.update(&node1, 5);
482 clock1.update(&node2, 3);
483
484 clock2.update(&node1, 3);
485 clock2.update(&node2, 7);
486
487 clock1.merge(&clock2);
488
489 assert_eq!(clock1.get(&node1), 5); assert_eq!(clock1.get(&node2), 7); }
492
493 #[test]
494 fn test_vector_clock_happens_before() {
495 let mut clock1 = VectorClock::new();
496 let mut clock2 = VectorClock::new();
497
498 let node = NodeId::new(1);
499
500 clock1.update(&node, 1);
501 clock2.update(&node, 2);
502
503 assert!(clock1.happens_before(&clock2));
504 assert!(!clock2.happens_before(&clock1));
505 }
506
507 #[test]
508 fn test_vector_clock_concurrent() {
509 let mut clock1 = VectorClock::new();
510 let mut clock2 = VectorClock::new();
511
512 let node1 = NodeId::new(1);
513 let node2 = NodeId::new(2);
514
515 clock1.update(&node1, 2);
516 clock1.update(&node2, 1);
517
518 clock2.update(&node1, 1);
519 clock2.update(&node2, 2);
520
521 assert!(clock1.concurrent_with(&clock2));
523 }
524
525 #[test]
526 fn test_vector_clock_encode_decode() {
527 let mut clock = VectorClock::new();
528 clock.update(&NodeId::new(1), 5);
529 clock.update(&NodeId::new(2), 10);
530
531 let encoded = clock.encode();
532 let decoded = VectorClock::decode(&encoded).unwrap();
533
534 assert_eq!(decoded.get(&NodeId::new(1)), 5);
535 assert_eq!(decoded.get(&NodeId::new(2)), 10);
536 }
537
538 #[test]
539 fn test_reset_peer() {
540 let mut encoder = DeltaEncoder::new(NodeId::new(1));
541 let peer = NodeId::new(2);
542
543 encoder.add_peer(&peer);
544 encoder.mark_sent(&peer, &[make_position_op(1, 100)]);
545
546 encoder.reset_peer(&peer);
548
549 let ops = vec![make_position_op(1, 100)];
550 let filtered = encoder.filter_for_peer(&peer, &ops);
551 assert_eq!(filtered.len(), 1);
552 }
553}