1#[cfg(not(feature = "std"))]
67use alloc::{string::String, vec::Vec};
68
69use crate::registry::AppOperation;
70use crate::sync::crdt::Peripheral;
71use crate::sync::delta::VectorClock;
72use crate::NodeId;
73
74pub const DELTA_DOCUMENT_MARKER: u8 = 0xB2;
76
77pub mod op_type {
79 pub const INCREMENT_COUNTER: u8 = 0x01;
81 pub const UPDATE_PERIPHERAL: u8 = 0x02;
83 pub const SET_EMERGENCY: u8 = 0x03;
85 pub const ACK_EMERGENCY: u8 = 0x04;
87 pub const CLEAR_EMERGENCY: u8 = 0x05;
89}
90
91#[derive(Debug, Clone, Copy, Default)]
93pub struct DeltaFlags {
94 pub has_vector_clock: bool,
96 pub is_response: bool,
98}
99
100impl DeltaFlags {
101 pub fn to_byte(&self) -> u8 {
103 let mut flags = 0u8;
104 if self.has_vector_clock {
105 flags |= 0x01;
106 }
107 if self.is_response {
108 flags |= 0x02;
109 }
110 flags
111 }
112
113 pub fn from_byte(byte: u8) -> Self {
115 Self {
116 has_vector_clock: byte & 0x01 != 0,
117 is_response: byte & 0x02 != 0,
118 }
119 }
120}
121
122#[derive(Debug, Clone)]
124pub enum Operation {
125 IncrementCounter {
127 counter_id: u8,
129 node_id: NodeId,
131 amount: u64,
133 timestamp: u64,
135 },
136
137 UpdatePeripheral {
139 peripheral: Peripheral,
141 timestamp: u64,
143 },
144
145 SetEmergency {
147 source_node: NodeId,
149 timestamp: u64,
151 known_peers: Vec<u32>,
153 },
154
155 AckEmergency {
157 node_id: NodeId,
159 emergency_timestamp: u64,
161 },
162
163 ClearEmergency {
165 emergency_timestamp: u64,
167 },
168
169 App(AppOperation),
174}
175
176impl Operation {
177 pub fn timestamp(&self) -> u64 {
179 match self {
180 Operation::IncrementCounter { timestamp, .. } => *timestamp,
181 Operation::UpdatePeripheral { timestamp, .. } => *timestamp,
182 Operation::SetEmergency { timestamp, .. } => *timestamp,
183 Operation::AckEmergency {
184 emergency_timestamp,
185 ..
186 } => *emergency_timestamp,
187 Operation::ClearEmergency {
188 emergency_timestamp,
189 } => *emergency_timestamp,
190 Operation::App(op) => op.timestamp,
191 }
192 }
193
194 pub fn key(&self) -> String {
196 match self {
197 Operation::IncrementCounter {
198 counter_id,
199 node_id,
200 ..
201 } => {
202 #[cfg(feature = "std")]
203 return format!("counter:{}:{}", counter_id, node_id.as_u32());
204 #[cfg(not(feature = "std"))]
205 return alloc::format!("counter:{}:{}", counter_id, node_id.as_u32());
206 }
207 Operation::UpdatePeripheral { peripheral, .. } => {
208 #[cfg(feature = "std")]
209 return format!("peripheral:{}", peripheral.id);
210 #[cfg(not(feature = "std"))]
211 return alloc::format!("peripheral:{}", peripheral.id);
212 }
213 Operation::SetEmergency { source_node, .. } => {
214 #[cfg(feature = "std")]
215 return format!("emergency:{}", source_node.as_u32());
216 #[cfg(not(feature = "std"))]
217 return alloc::format!("emergency:{}", source_node.as_u32());
218 }
219 Operation::AckEmergency { node_id, .. } => {
220 #[cfg(feature = "std")]
221 return format!("ack:{}", node_id.as_u32());
222 #[cfg(not(feature = "std"))]
223 return alloc::format!("ack:{}", node_id.as_u32());
224 }
225 Operation::ClearEmergency { .. } => "clear_emergency".into(),
226 Operation::App(op) => {
227 #[cfg(feature = "std")]
229 return format!("app:{}:{}:{}", op.type_id, op.source_node, op.timestamp);
230 #[cfg(not(feature = "std"))]
231 return alloc::format!("app:{}:{}:{}", op.type_id, op.source_node, op.timestamp);
232 }
233 }
234 }
235
236 pub fn encode(&self) -> Vec<u8> {
238 let mut buf = Vec::new();
239
240 match self {
241 Operation::IncrementCounter {
242 counter_id,
243 node_id,
244 amount,
245 timestamp,
246 } => {
247 buf.push(op_type::INCREMENT_COUNTER);
248 buf.push(*counter_id);
249 buf.extend_from_slice(&node_id.as_u32().to_le_bytes());
250 buf.extend_from_slice(&amount.to_le_bytes());
251 buf.extend_from_slice(×tamp.to_le_bytes());
252 }
253 Operation::UpdatePeripheral {
254 peripheral,
255 timestamp,
256 } => {
257 buf.push(op_type::UPDATE_PERIPHERAL);
258 buf.extend_from_slice(×tamp.to_le_bytes());
259 let pdata = peripheral.encode();
260 buf.extend_from_slice(&(pdata.len() as u16).to_le_bytes());
261 buf.extend_from_slice(&pdata);
262 }
263 Operation::SetEmergency {
264 source_node,
265 timestamp,
266 known_peers,
267 } => {
268 buf.push(op_type::SET_EMERGENCY);
269 buf.extend_from_slice(&source_node.as_u32().to_le_bytes());
270 buf.extend_from_slice(×tamp.to_le_bytes());
271 buf.push(known_peers.len() as u8);
272 for peer in known_peers {
273 buf.extend_from_slice(&peer.to_le_bytes());
274 }
275 }
276 Operation::AckEmergency {
277 node_id,
278 emergency_timestamp,
279 } => {
280 buf.push(op_type::ACK_EMERGENCY);
281 buf.extend_from_slice(&node_id.as_u32().to_le_bytes());
282 buf.extend_from_slice(&emergency_timestamp.to_le_bytes());
283 }
284 Operation::ClearEmergency {
285 emergency_timestamp,
286 } => {
287 buf.push(op_type::CLEAR_EMERGENCY);
288 buf.extend_from_slice(&emergency_timestamp.to_le_bytes());
289 }
290 Operation::App(op) => {
291 buf.extend_from_slice(&op.encode());
293 }
294 }
295
296 buf
297 }
298
299 pub fn decode(data: &[u8]) -> Option<(Self, usize)> {
301 if data.is_empty() {
302 return None;
303 }
304
305 let op_type = data[0];
306
307 match op_type {
308 op_type::INCREMENT_COUNTER => {
309 if data.len() < 22 {
310 return None;
311 }
312 let counter_id = data[1];
313 let node_id = NodeId::new(u32::from_le_bytes([data[2], data[3], data[4], data[5]]));
314 let amount = u64::from_le_bytes([
315 data[6], data[7], data[8], data[9], data[10], data[11], data[12], data[13],
316 ]);
317 let timestamp = u64::from_le_bytes([
318 data[14], data[15], data[16], data[17], data[18], data[19], data[20], data[21],
319 ]);
320 Some((
321 Operation::IncrementCounter {
322 counter_id,
323 node_id,
324 amount,
325 timestamp,
326 },
327 22,
328 ))
329 }
330 op_type::UPDATE_PERIPHERAL => {
331 if data.len() < 11 {
332 return None;
333 }
334 let timestamp = u64::from_le_bytes([
335 data[1], data[2], data[3], data[4], data[5], data[6], data[7], data[8],
336 ]);
337 let plen = u16::from_le_bytes([data[9], data[10]]) as usize;
338 if data.len() < 11 + plen {
339 return None;
340 }
341 let peripheral = Peripheral::decode(&data[11..11 + plen])?;
342 Some((
343 Operation::UpdatePeripheral {
344 peripheral,
345 timestamp,
346 },
347 11 + plen,
348 ))
349 }
350 op_type::SET_EMERGENCY => {
351 if data.len() < 14 {
352 return None;
353 }
354 let source_node =
355 NodeId::new(u32::from_le_bytes([data[1], data[2], data[3], data[4]]));
356 let timestamp = u64::from_le_bytes([
357 data[5], data[6], data[7], data[8], data[9], data[10], data[11], data[12],
358 ]);
359 let peer_count = data[13] as usize;
360 if data.len() < 14 + peer_count * 4 {
361 return None;
362 }
363 let mut known_peers = Vec::with_capacity(peer_count);
364 let mut offset = 14;
365 for _ in 0..peer_count {
366 known_peers.push(u32::from_le_bytes([
367 data[offset],
368 data[offset + 1],
369 data[offset + 2],
370 data[offset + 3],
371 ]));
372 offset += 4;
373 }
374 Some((
375 Operation::SetEmergency {
376 source_node,
377 timestamp,
378 known_peers,
379 },
380 offset,
381 ))
382 }
383 op_type::ACK_EMERGENCY => {
384 if data.len() < 13 {
385 return None;
386 }
387 let node_id = NodeId::new(u32::from_le_bytes([data[1], data[2], data[3], data[4]]));
388 let emergency_timestamp = u64::from_le_bytes([
389 data[5], data[6], data[7], data[8], data[9], data[10], data[11], data[12],
390 ]);
391 Some((
392 Operation::AckEmergency {
393 node_id,
394 emergency_timestamp,
395 },
396 13,
397 ))
398 }
399 op_type::CLEAR_EMERGENCY => {
400 if data.len() < 9 {
401 return None;
402 }
403 let emergency_timestamp = u64::from_le_bytes([
404 data[1], data[2], data[3], data[4], data[5], data[6], data[7], data[8],
405 ]);
406 Some((
407 Operation::ClearEmergency {
408 emergency_timestamp,
409 },
410 9,
411 ))
412 }
413 op if AppOperation::is_app_op_type(op) => {
415 let (app_op, consumed) = AppOperation::decode(data)?;
416 Some((Operation::App(app_op), consumed))
417 }
418 _ => None,
419 }
420 }
421}
422
423#[derive(Debug, Clone)]
425pub struct DeltaDocument {
426 pub origin_node: NodeId,
428
429 pub timestamp_ms: u64,
431
432 pub flags: DeltaFlags,
434
435 pub vector_clock: Option<VectorClock>,
437
438 pub operations: Vec<Operation>,
440}
441
442impl DeltaDocument {
443 pub fn new(origin_node: NodeId, timestamp_ms: u64) -> Self {
445 Self {
446 origin_node,
447 timestamp_ms,
448 flags: DeltaFlags::default(),
449 vector_clock: None,
450 operations: Vec::new(),
451 }
452 }
453
454 pub fn with_vector_clock(mut self, clock: VectorClock) -> Self {
456 self.vector_clock = Some(clock);
457 self.flags.has_vector_clock = true;
458 self
459 }
460
461 pub fn as_response(mut self) -> Self {
463 self.flags.is_response = true;
464 self
465 }
466
467 pub fn add_operation(&mut self, op: Operation) {
469 self.operations.push(op);
470 }
471
472 pub fn is_empty(&self) -> bool {
474 self.operations.is_empty()
475 }
476
477 pub fn operation_count(&self) -> usize {
479 self.operations.len()
480 }
481
482 pub fn is_delta_document(data: &[u8]) -> bool {
484 !data.is_empty() && data[0] == DELTA_DOCUMENT_MARKER
485 }
486
487 pub fn encode(&self) -> Vec<u8> {
489 let mut buf = Vec::new();
490
491 buf.push(DELTA_DOCUMENT_MARKER);
493
494 buf.push(self.flags.to_byte());
496
497 buf.extend_from_slice(&self.origin_node.as_u32().to_le_bytes());
499
500 buf.extend_from_slice(&self.timestamp_ms.to_le_bytes());
502
503 if let Some(ref clock) = self.vector_clock {
505 let clock_data = clock.encode();
506 buf.extend_from_slice(&clock_data);
507 }
508
509 buf.extend_from_slice(&(self.operations.len() as u16).to_le_bytes());
511
512 for op in &self.operations {
514 buf.extend_from_slice(&op.encode());
515 }
516
517 buf
518 }
519
520 pub fn decode(data: &[u8]) -> Option<Self> {
522 if data.len() < 16 {
524 return None;
525 }
526
527 if data[0] != DELTA_DOCUMENT_MARKER {
528 return None;
529 }
530
531 let flags = DeltaFlags::from_byte(data[1]);
532 let origin_node = NodeId::new(u32::from_le_bytes([data[2], data[3], data[4], data[5]]));
533 let timestamp_ms = u64::from_le_bytes([
534 data[6], data[7], data[8], data[9], data[10], data[11], data[12], data[13],
535 ]);
536
537 let mut offset = 14;
538
539 let vector_clock = if flags.has_vector_clock {
541 let clock = VectorClock::decode(&data[offset..])?;
542 let count = u32::from_le_bytes([
544 data[offset],
545 data[offset + 1],
546 data[offset + 2],
547 data[offset + 3],
548 ]) as usize;
549 offset += 4 + count * 12;
550 Some(clock)
551 } else {
552 None
553 };
554
555 if data.len() < offset + 2 {
557 return None;
558 }
559 let op_count = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
560 offset += 2;
561
562 let mut operations = Vec::with_capacity(op_count);
564 for _ in 0..op_count {
565 if offset >= data.len() {
566 return None;
567 }
568 let (op, size) = Operation::decode(&data[offset..])?;
569 operations.push(op);
570 offset += size;
571 }
572
573 Some(Self {
574 origin_node,
575 timestamp_ms,
576 flags,
577 vector_clock,
578 operations,
579 })
580 }
581
582 pub fn encoded_size(&self) -> usize {
584 let base = 16; let clock_size = self
586 .vector_clock
587 .as_ref()
588 .map(|c| c.encode().len())
589 .unwrap_or(0);
590 let ops_size: usize = self.operations.iter().map(|op| op.encode().len()).sum();
591 base + clock_size + ops_size
592 }
593}
594
595#[cfg(test)]
596mod tests {
597 use super::*;
598 use crate::sync::crdt::PeripheralType;
599
600 #[test]
601 fn test_operation_increment_counter_encode_decode() {
602 let op = Operation::IncrementCounter {
603 counter_id: 0,
604 node_id: NodeId::new(0x12345678),
605 amount: 42,
606 timestamp: 1000,
607 };
608
609 let encoded = op.encode();
610 let (decoded, size) = Operation::decode(&encoded).unwrap();
611
612 assert_eq!(size, encoded.len());
613 if let Operation::IncrementCounter {
614 counter_id,
615 node_id,
616 amount,
617 timestamp,
618 } = decoded
619 {
620 assert_eq!(counter_id, 0);
621 assert_eq!(node_id.as_u32(), 0x12345678);
622 assert_eq!(amount, 42);
623 assert_eq!(timestamp, 1000);
624 } else {
625 panic!("Wrong operation type");
626 }
627 }
628
629 #[test]
630 fn test_operation_update_peripheral_encode_decode() {
631 let peripheral =
632 Peripheral::new(0xAABBCCDD, PeripheralType::SoldierSensor).with_callsign("ALPHA-1");
633
634 let op = Operation::UpdatePeripheral {
635 peripheral: peripheral.clone(),
636 timestamp: 2000,
637 };
638
639 let encoded = op.encode();
640 let (decoded, size) = Operation::decode(&encoded).unwrap();
641
642 assert_eq!(size, encoded.len());
643 if let Operation::UpdatePeripheral {
644 peripheral: p,
645 timestamp: t,
646 } = decoded
647 {
648 assert_eq!(p.id, peripheral.id);
649 assert_eq!(p.callsign_str(), "ALPHA-1");
650 assert_eq!(t, 2000);
651 } else {
652 panic!("Wrong operation type");
653 }
654 }
655
656 #[test]
657 fn test_operation_set_emergency_encode_decode() {
658 let op = Operation::SetEmergency {
659 source_node: NodeId::new(0x11111111),
660 timestamp: 3000,
661 known_peers: vec![0x22222222, 0x33333333],
662 };
663
664 let encoded = op.encode();
665 let (decoded, size) = Operation::decode(&encoded).unwrap();
666
667 assert_eq!(size, encoded.len());
668 if let Operation::SetEmergency {
669 source_node,
670 timestamp,
671 known_peers,
672 } = decoded
673 {
674 assert_eq!(source_node.as_u32(), 0x11111111);
675 assert_eq!(timestamp, 3000);
676 assert_eq!(known_peers, vec![0x22222222, 0x33333333]);
677 } else {
678 panic!("Wrong operation type");
679 }
680 }
681
682 #[test]
683 fn test_operation_ack_emergency_encode_decode() {
684 let op = Operation::AckEmergency {
685 node_id: NodeId::new(0x22222222),
686 emergency_timestamp: 3000,
687 };
688
689 let encoded = op.encode();
690 let (decoded, size) = Operation::decode(&encoded).unwrap();
691
692 assert_eq!(size, encoded.len());
693 if let Operation::AckEmergency {
694 node_id,
695 emergency_timestamp,
696 } = decoded
697 {
698 assert_eq!(node_id.as_u32(), 0x22222222);
699 assert_eq!(emergency_timestamp, 3000);
700 } else {
701 panic!("Wrong operation type");
702 }
703 }
704
705 #[test]
706 fn test_operation_clear_emergency_encode_decode() {
707 let op = Operation::ClearEmergency {
708 emergency_timestamp: 3000,
709 };
710
711 let encoded = op.encode();
712 let (decoded, size) = Operation::decode(&encoded).unwrap();
713
714 assert_eq!(size, encoded.len());
715 if let Operation::ClearEmergency {
716 emergency_timestamp,
717 } = decoded
718 {
719 assert_eq!(emergency_timestamp, 3000);
720 } else {
721 panic!("Wrong operation type");
722 }
723 }
724
725 #[test]
726 fn test_delta_document_empty() {
727 let delta = DeltaDocument::new(NodeId::new(0x12345678), 1000);
728
729 assert!(delta.is_empty());
730 assert_eq!(delta.operation_count(), 0);
731
732 let encoded = delta.encode();
733 assert!(DeltaDocument::is_delta_document(&encoded));
734
735 let decoded = DeltaDocument::decode(&encoded).unwrap();
736 assert_eq!(decoded.origin_node.as_u32(), 0x12345678);
737 assert_eq!(decoded.timestamp_ms, 1000);
738 assert!(decoded.is_empty());
739 }
740
741 #[test]
742 fn test_delta_document_with_operations() {
743 let mut delta = DeltaDocument::new(NodeId::new(0x12345678), 1000);
744
745 delta.add_operation(Operation::IncrementCounter {
746 counter_id: 0,
747 node_id: NodeId::new(0x12345678),
748 amount: 1,
749 timestamp: 1000,
750 });
751
752 delta.add_operation(Operation::AckEmergency {
753 node_id: NodeId::new(0x12345678),
754 emergency_timestamp: 500,
755 });
756
757 assert_eq!(delta.operation_count(), 2);
758
759 let encoded = delta.encode();
760 let decoded = DeltaDocument::decode(&encoded).unwrap();
761
762 assert_eq!(decoded.operation_count(), 2);
763 }
764
765 #[test]
766 fn test_delta_document_with_vector_clock() {
767 let mut clock = VectorClock::new();
768 clock.update(&NodeId::new(0x11111111), 5);
769 clock.update(&NodeId::new(0x22222222), 3);
770
771 let delta =
772 DeltaDocument::new(NodeId::new(0x12345678), 1000).with_vector_clock(clock.clone());
773
774 assert!(delta.flags.has_vector_clock);
775
776 let encoded = delta.encode();
777 let decoded = DeltaDocument::decode(&encoded).unwrap();
778
779 assert!(decoded.flags.has_vector_clock);
780 assert!(decoded.vector_clock.is_some());
781
782 let decoded_clock = decoded.vector_clock.unwrap();
783 assert_eq!(decoded_clock.get(&NodeId::new(0x11111111)), 5);
784 assert_eq!(decoded_clock.get(&NodeId::new(0x22222222)), 3);
785 }
786
787 #[test]
788 fn test_delta_document_is_delta_document() {
789 let delta = DeltaDocument::new(NodeId::new(0x12345678), 1000);
790 let encoded = delta.encode();
791
792 assert!(DeltaDocument::is_delta_document(&encoded));
793
794 let non_delta = vec![0x00, 0x01, 0x02, 0x03];
796 assert!(!DeltaDocument::is_delta_document(&non_delta));
797
798 let empty: Vec<u8> = vec![];
799 assert!(!DeltaDocument::is_delta_document(&empty));
800 }
801
802 #[test]
803 fn test_operation_key() {
804 let op1 = Operation::IncrementCounter {
805 counter_id: 0,
806 node_id: NodeId::new(0x11111111),
807 amount: 1,
808 timestamp: 1000,
809 };
810 let op2 = Operation::IncrementCounter {
811 counter_id: 0,
812 node_id: NodeId::new(0x11111111),
813 amount: 2,
814 timestamp: 2000,
815 };
816 let op3 = Operation::IncrementCounter {
817 counter_id: 0,
818 node_id: NodeId::new(0x22222222),
819 amount: 1,
820 timestamp: 1000,
821 };
822
823 assert_eq!(op1.key(), op2.key());
825
826 assert_ne!(op1.key(), op3.key());
828 }
829}