1#[cfg(not(feature = "std"))]
67use alloc::{string::String, vec::Vec};
68
69use crate::sync::crdt::Peripheral;
70use crate::sync::delta::VectorClock;
71use crate::NodeId;
72
73pub const DELTA_DOCUMENT_MARKER: u8 = 0xB2;
75
76pub mod op_type {
78 pub const INCREMENT_COUNTER: u8 = 0x01;
80 pub const UPDATE_PERIPHERAL: u8 = 0x02;
82 pub const SET_EMERGENCY: u8 = 0x03;
84 pub const ACK_EMERGENCY: u8 = 0x04;
86 pub const CLEAR_EMERGENCY: u8 = 0x05;
88}
89
90#[derive(Debug, Clone, Copy, Default)]
92pub struct DeltaFlags {
93 pub has_vector_clock: bool,
95 pub is_response: bool,
97}
98
99impl DeltaFlags {
100 pub fn to_byte(&self) -> u8 {
102 let mut flags = 0u8;
103 if self.has_vector_clock {
104 flags |= 0x01;
105 }
106 if self.is_response {
107 flags |= 0x02;
108 }
109 flags
110 }
111
112 pub fn from_byte(byte: u8) -> Self {
114 Self {
115 has_vector_clock: byte & 0x01 != 0,
116 is_response: byte & 0x02 != 0,
117 }
118 }
119}
120
121#[derive(Debug, Clone)]
123pub enum Operation {
124 IncrementCounter {
126 counter_id: u8,
128 node_id: NodeId,
130 amount: u64,
132 timestamp: u64,
134 },
135
136 UpdatePeripheral {
138 peripheral: Peripheral,
140 timestamp: u64,
142 },
143
144 SetEmergency {
146 source_node: NodeId,
148 timestamp: u64,
150 known_peers: Vec<u32>,
152 },
153
154 AckEmergency {
156 node_id: NodeId,
158 emergency_timestamp: u64,
160 },
161
162 ClearEmergency {
164 emergency_timestamp: u64,
166 },
167}
168
169impl Operation {
170 pub fn timestamp(&self) -> u64 {
172 match self {
173 Operation::IncrementCounter { timestamp, .. } => *timestamp,
174 Operation::UpdatePeripheral { timestamp, .. } => *timestamp,
175 Operation::SetEmergency { timestamp, .. } => *timestamp,
176 Operation::AckEmergency {
177 emergency_timestamp,
178 ..
179 } => *emergency_timestamp,
180 Operation::ClearEmergency {
181 emergency_timestamp,
182 } => *emergency_timestamp,
183 }
184 }
185
186 pub fn key(&self) -> String {
188 match self {
189 Operation::IncrementCounter {
190 counter_id,
191 node_id,
192 ..
193 } => {
194 #[cfg(feature = "std")]
195 return format!("counter:{}:{}", counter_id, node_id.as_u32());
196 #[cfg(not(feature = "std"))]
197 return alloc::format!("counter:{}:{}", counter_id, node_id.as_u32());
198 }
199 Operation::UpdatePeripheral { peripheral, .. } => {
200 #[cfg(feature = "std")]
201 return format!("peripheral:{}", peripheral.id);
202 #[cfg(not(feature = "std"))]
203 return alloc::format!("peripheral:{}", peripheral.id);
204 }
205 Operation::SetEmergency { source_node, .. } => {
206 #[cfg(feature = "std")]
207 return format!("emergency:{}", source_node.as_u32());
208 #[cfg(not(feature = "std"))]
209 return alloc::format!("emergency:{}", source_node.as_u32());
210 }
211 Operation::AckEmergency { node_id, .. } => {
212 #[cfg(feature = "std")]
213 return format!("ack:{}", node_id.as_u32());
214 #[cfg(not(feature = "std"))]
215 return alloc::format!("ack:{}", node_id.as_u32());
216 }
217 Operation::ClearEmergency { .. } => "clear_emergency".into(),
218 }
219 }
220
221 pub fn encode(&self) -> Vec<u8> {
223 let mut buf = Vec::new();
224
225 match self {
226 Operation::IncrementCounter {
227 counter_id,
228 node_id,
229 amount,
230 timestamp,
231 } => {
232 buf.push(op_type::INCREMENT_COUNTER);
233 buf.push(*counter_id);
234 buf.extend_from_slice(&node_id.as_u32().to_le_bytes());
235 buf.extend_from_slice(&amount.to_le_bytes());
236 buf.extend_from_slice(×tamp.to_le_bytes());
237 }
238 Operation::UpdatePeripheral {
239 peripheral,
240 timestamp,
241 } => {
242 buf.push(op_type::UPDATE_PERIPHERAL);
243 buf.extend_from_slice(×tamp.to_le_bytes());
244 let pdata = peripheral.encode();
245 buf.extend_from_slice(&(pdata.len() as u16).to_le_bytes());
246 buf.extend_from_slice(&pdata);
247 }
248 Operation::SetEmergency {
249 source_node,
250 timestamp,
251 known_peers,
252 } => {
253 buf.push(op_type::SET_EMERGENCY);
254 buf.extend_from_slice(&source_node.as_u32().to_le_bytes());
255 buf.extend_from_slice(×tamp.to_le_bytes());
256 buf.push(known_peers.len() as u8);
257 for peer in known_peers {
258 buf.extend_from_slice(&peer.to_le_bytes());
259 }
260 }
261 Operation::AckEmergency {
262 node_id,
263 emergency_timestamp,
264 } => {
265 buf.push(op_type::ACK_EMERGENCY);
266 buf.extend_from_slice(&node_id.as_u32().to_le_bytes());
267 buf.extend_from_slice(&emergency_timestamp.to_le_bytes());
268 }
269 Operation::ClearEmergency {
270 emergency_timestamp,
271 } => {
272 buf.push(op_type::CLEAR_EMERGENCY);
273 buf.extend_from_slice(&emergency_timestamp.to_le_bytes());
274 }
275 }
276
277 buf
278 }
279
280 pub fn decode(data: &[u8]) -> Option<(Self, usize)> {
282 if data.is_empty() {
283 return None;
284 }
285
286 let op_type = data[0];
287
288 match op_type {
289 op_type::INCREMENT_COUNTER => {
290 if data.len() < 22 {
291 return None;
292 }
293 let counter_id = data[1];
294 let node_id = NodeId::new(u32::from_le_bytes([data[2], data[3], data[4], data[5]]));
295 let amount = u64::from_le_bytes([
296 data[6], data[7], data[8], data[9], data[10], data[11], data[12], data[13],
297 ]);
298 let timestamp = u64::from_le_bytes([
299 data[14], data[15], data[16], data[17], data[18], data[19], data[20], data[21],
300 ]);
301 Some((
302 Operation::IncrementCounter {
303 counter_id,
304 node_id,
305 amount,
306 timestamp,
307 },
308 22,
309 ))
310 }
311 op_type::UPDATE_PERIPHERAL => {
312 if data.len() < 11 {
313 return None;
314 }
315 let timestamp = u64::from_le_bytes([
316 data[1], data[2], data[3], data[4], data[5], data[6], data[7], data[8],
317 ]);
318 let plen = u16::from_le_bytes([data[9], data[10]]) as usize;
319 if data.len() < 11 + plen {
320 return None;
321 }
322 let peripheral = Peripheral::decode(&data[11..11 + plen])?;
323 Some((
324 Operation::UpdatePeripheral {
325 peripheral,
326 timestamp,
327 },
328 11 + plen,
329 ))
330 }
331 op_type::SET_EMERGENCY => {
332 if data.len() < 14 {
333 return None;
334 }
335 let source_node =
336 NodeId::new(u32::from_le_bytes([data[1], data[2], data[3], data[4]]));
337 let timestamp = u64::from_le_bytes([
338 data[5], data[6], data[7], data[8], data[9], data[10], data[11], data[12],
339 ]);
340 let peer_count = data[13] as usize;
341 if data.len() < 14 + peer_count * 4 {
342 return None;
343 }
344 let mut known_peers = Vec::with_capacity(peer_count);
345 let mut offset = 14;
346 for _ in 0..peer_count {
347 known_peers.push(u32::from_le_bytes([
348 data[offset],
349 data[offset + 1],
350 data[offset + 2],
351 data[offset + 3],
352 ]));
353 offset += 4;
354 }
355 Some((
356 Operation::SetEmergency {
357 source_node,
358 timestamp,
359 known_peers,
360 },
361 offset,
362 ))
363 }
364 op_type::ACK_EMERGENCY => {
365 if data.len() < 13 {
366 return None;
367 }
368 let node_id = NodeId::new(u32::from_le_bytes([data[1], data[2], data[3], data[4]]));
369 let emergency_timestamp = u64::from_le_bytes([
370 data[5], data[6], data[7], data[8], data[9], data[10], data[11], data[12],
371 ]);
372 Some((
373 Operation::AckEmergency {
374 node_id,
375 emergency_timestamp,
376 },
377 13,
378 ))
379 }
380 op_type::CLEAR_EMERGENCY => {
381 if data.len() < 9 {
382 return None;
383 }
384 let emergency_timestamp = u64::from_le_bytes([
385 data[1], data[2], data[3], data[4], data[5], data[6], data[7], data[8],
386 ]);
387 Some((
388 Operation::ClearEmergency {
389 emergency_timestamp,
390 },
391 9,
392 ))
393 }
394 _ => None,
395 }
396 }
397}
398
399#[derive(Debug, Clone)]
401pub struct DeltaDocument {
402 pub origin_node: NodeId,
404
405 pub timestamp_ms: u64,
407
408 pub flags: DeltaFlags,
410
411 pub vector_clock: Option<VectorClock>,
413
414 pub operations: Vec<Operation>,
416}
417
418impl DeltaDocument {
419 pub fn new(origin_node: NodeId, timestamp_ms: u64) -> Self {
421 Self {
422 origin_node,
423 timestamp_ms,
424 flags: DeltaFlags::default(),
425 vector_clock: None,
426 operations: Vec::new(),
427 }
428 }
429
430 pub fn with_vector_clock(mut self, clock: VectorClock) -> Self {
432 self.vector_clock = Some(clock);
433 self.flags.has_vector_clock = true;
434 self
435 }
436
437 pub fn as_response(mut self) -> Self {
439 self.flags.is_response = true;
440 self
441 }
442
443 pub fn add_operation(&mut self, op: Operation) {
445 self.operations.push(op);
446 }
447
448 pub fn is_empty(&self) -> bool {
450 self.operations.is_empty()
451 }
452
453 pub fn operation_count(&self) -> usize {
455 self.operations.len()
456 }
457
458 pub fn is_delta_document(data: &[u8]) -> bool {
460 !data.is_empty() && data[0] == DELTA_DOCUMENT_MARKER
461 }
462
463 pub fn encode(&self) -> Vec<u8> {
465 let mut buf = Vec::new();
466
467 buf.push(DELTA_DOCUMENT_MARKER);
469
470 buf.push(self.flags.to_byte());
472
473 buf.extend_from_slice(&self.origin_node.as_u32().to_le_bytes());
475
476 buf.extend_from_slice(&self.timestamp_ms.to_le_bytes());
478
479 if let Some(ref clock) = self.vector_clock {
481 let clock_data = clock.encode();
482 buf.extend_from_slice(&clock_data);
483 }
484
485 buf.extend_from_slice(&(self.operations.len() as u16).to_le_bytes());
487
488 for op in &self.operations {
490 buf.extend_from_slice(&op.encode());
491 }
492
493 buf
494 }
495
496 pub fn decode(data: &[u8]) -> Option<Self> {
498 if data.len() < 16 {
500 return None;
501 }
502
503 if data[0] != DELTA_DOCUMENT_MARKER {
504 return None;
505 }
506
507 let flags = DeltaFlags::from_byte(data[1]);
508 let origin_node = NodeId::new(u32::from_le_bytes([data[2], data[3], data[4], data[5]]));
509 let timestamp_ms = u64::from_le_bytes([
510 data[6], data[7], data[8], data[9], data[10], data[11], data[12], data[13],
511 ]);
512
513 let mut offset = 14;
514
515 let vector_clock = if flags.has_vector_clock {
517 let clock = VectorClock::decode(&data[offset..])?;
518 let count = u32::from_le_bytes([
520 data[offset],
521 data[offset + 1],
522 data[offset + 2],
523 data[offset + 3],
524 ]) as usize;
525 offset += 4 + count * 12;
526 Some(clock)
527 } else {
528 None
529 };
530
531 if data.len() < offset + 2 {
533 return None;
534 }
535 let op_count = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
536 offset += 2;
537
538 let mut operations = Vec::with_capacity(op_count);
540 for _ in 0..op_count {
541 if offset >= data.len() {
542 return None;
543 }
544 let (op, size) = Operation::decode(&data[offset..])?;
545 operations.push(op);
546 offset += size;
547 }
548
549 Some(Self {
550 origin_node,
551 timestamp_ms,
552 flags,
553 vector_clock,
554 operations,
555 })
556 }
557
558 pub fn encoded_size(&self) -> usize {
560 let base = 16; let clock_size = self
562 .vector_clock
563 .as_ref()
564 .map(|c| c.encode().len())
565 .unwrap_or(0);
566 let ops_size: usize = self.operations.iter().map(|op| op.encode().len()).sum();
567 base + clock_size + ops_size
568 }
569}
570
571#[cfg(test)]
572mod tests {
573 use super::*;
574 use crate::sync::crdt::PeripheralType;
575
576 #[test]
577 fn test_operation_increment_counter_encode_decode() {
578 let op = Operation::IncrementCounter {
579 counter_id: 0,
580 node_id: NodeId::new(0x12345678),
581 amount: 42,
582 timestamp: 1000,
583 };
584
585 let encoded = op.encode();
586 let (decoded, size) = Operation::decode(&encoded).unwrap();
587
588 assert_eq!(size, encoded.len());
589 if let Operation::IncrementCounter {
590 counter_id,
591 node_id,
592 amount,
593 timestamp,
594 } = decoded
595 {
596 assert_eq!(counter_id, 0);
597 assert_eq!(node_id.as_u32(), 0x12345678);
598 assert_eq!(amount, 42);
599 assert_eq!(timestamp, 1000);
600 } else {
601 panic!("Wrong operation type");
602 }
603 }
604
605 #[test]
606 fn test_operation_update_peripheral_encode_decode() {
607 let peripheral =
608 Peripheral::new(0xAABBCCDD, PeripheralType::SoldierSensor).with_callsign("ALPHA-1");
609
610 let op = Operation::UpdatePeripheral {
611 peripheral: peripheral.clone(),
612 timestamp: 2000,
613 };
614
615 let encoded = op.encode();
616 let (decoded, size) = Operation::decode(&encoded).unwrap();
617
618 assert_eq!(size, encoded.len());
619 if let Operation::UpdatePeripheral {
620 peripheral: p,
621 timestamp: t,
622 } = decoded
623 {
624 assert_eq!(p.id, peripheral.id);
625 assert_eq!(p.callsign_str(), "ALPHA-1");
626 assert_eq!(t, 2000);
627 } else {
628 panic!("Wrong operation type");
629 }
630 }
631
632 #[test]
633 fn test_operation_set_emergency_encode_decode() {
634 let op = Operation::SetEmergency {
635 source_node: NodeId::new(0x11111111),
636 timestamp: 3000,
637 known_peers: vec![0x22222222, 0x33333333],
638 };
639
640 let encoded = op.encode();
641 let (decoded, size) = Operation::decode(&encoded).unwrap();
642
643 assert_eq!(size, encoded.len());
644 if let Operation::SetEmergency {
645 source_node,
646 timestamp,
647 known_peers,
648 } = decoded
649 {
650 assert_eq!(source_node.as_u32(), 0x11111111);
651 assert_eq!(timestamp, 3000);
652 assert_eq!(known_peers, vec![0x22222222, 0x33333333]);
653 } else {
654 panic!("Wrong operation type");
655 }
656 }
657
658 #[test]
659 fn test_operation_ack_emergency_encode_decode() {
660 let op = Operation::AckEmergency {
661 node_id: NodeId::new(0x22222222),
662 emergency_timestamp: 3000,
663 };
664
665 let encoded = op.encode();
666 let (decoded, size) = Operation::decode(&encoded).unwrap();
667
668 assert_eq!(size, encoded.len());
669 if let Operation::AckEmergency {
670 node_id,
671 emergency_timestamp,
672 } = decoded
673 {
674 assert_eq!(node_id.as_u32(), 0x22222222);
675 assert_eq!(emergency_timestamp, 3000);
676 } else {
677 panic!("Wrong operation type");
678 }
679 }
680
681 #[test]
682 fn test_operation_clear_emergency_encode_decode() {
683 let op = Operation::ClearEmergency {
684 emergency_timestamp: 3000,
685 };
686
687 let encoded = op.encode();
688 let (decoded, size) = Operation::decode(&encoded).unwrap();
689
690 assert_eq!(size, encoded.len());
691 if let Operation::ClearEmergency {
692 emergency_timestamp,
693 } = decoded
694 {
695 assert_eq!(emergency_timestamp, 3000);
696 } else {
697 panic!("Wrong operation type");
698 }
699 }
700
701 #[test]
702 fn test_delta_document_empty() {
703 let delta = DeltaDocument::new(NodeId::new(0x12345678), 1000);
704
705 assert!(delta.is_empty());
706 assert_eq!(delta.operation_count(), 0);
707
708 let encoded = delta.encode();
709 assert!(DeltaDocument::is_delta_document(&encoded));
710
711 let decoded = DeltaDocument::decode(&encoded).unwrap();
712 assert_eq!(decoded.origin_node.as_u32(), 0x12345678);
713 assert_eq!(decoded.timestamp_ms, 1000);
714 assert!(decoded.is_empty());
715 }
716
717 #[test]
718 fn test_delta_document_with_operations() {
719 let mut delta = DeltaDocument::new(NodeId::new(0x12345678), 1000);
720
721 delta.add_operation(Operation::IncrementCounter {
722 counter_id: 0,
723 node_id: NodeId::new(0x12345678),
724 amount: 1,
725 timestamp: 1000,
726 });
727
728 delta.add_operation(Operation::AckEmergency {
729 node_id: NodeId::new(0x12345678),
730 emergency_timestamp: 500,
731 });
732
733 assert_eq!(delta.operation_count(), 2);
734
735 let encoded = delta.encode();
736 let decoded = DeltaDocument::decode(&encoded).unwrap();
737
738 assert_eq!(decoded.operation_count(), 2);
739 }
740
741 #[test]
742 fn test_delta_document_with_vector_clock() {
743 let mut clock = VectorClock::new();
744 clock.update(&NodeId::new(0x11111111), 5);
745 clock.update(&NodeId::new(0x22222222), 3);
746
747 let delta =
748 DeltaDocument::new(NodeId::new(0x12345678), 1000).with_vector_clock(clock.clone());
749
750 assert!(delta.flags.has_vector_clock);
751
752 let encoded = delta.encode();
753 let decoded = DeltaDocument::decode(&encoded).unwrap();
754
755 assert!(decoded.flags.has_vector_clock);
756 assert!(decoded.vector_clock.is_some());
757
758 let decoded_clock = decoded.vector_clock.unwrap();
759 assert_eq!(decoded_clock.get(&NodeId::new(0x11111111)), 5);
760 assert_eq!(decoded_clock.get(&NodeId::new(0x22222222)), 3);
761 }
762
763 #[test]
764 fn test_delta_document_is_delta_document() {
765 let delta = DeltaDocument::new(NodeId::new(0x12345678), 1000);
766 let encoded = delta.encode();
767
768 assert!(DeltaDocument::is_delta_document(&encoded));
769
770 let non_delta = vec![0x00, 0x01, 0x02, 0x03];
772 assert!(!DeltaDocument::is_delta_document(&non_delta));
773
774 let empty: Vec<u8> = vec![];
775 assert!(!DeltaDocument::is_delta_document(&empty));
776 }
777
778 #[test]
779 fn test_operation_key() {
780 let op1 = Operation::IncrementCounter {
781 counter_id: 0,
782 node_id: NodeId::new(0x11111111),
783 amount: 1,
784 timestamp: 1000,
785 };
786 let op2 = Operation::IncrementCounter {
787 counter_id: 0,
788 node_id: NodeId::new(0x11111111),
789 amount: 2,
790 timestamp: 2000,
791 };
792 let op3 = Operation::IncrementCounter {
793 counter_id: 0,
794 node_id: NodeId::new(0x22222222),
795 amount: 1,
796 timestamp: 1000,
797 };
798
799 assert_eq!(op1.key(), op2.key());
801
802 assert_ne!(op1.key(), op3.key());
804 }
805}