1#[cfg(not(feature = "std"))]
49use alloc::{string::String, vec::Vec};
50#[cfg(feature = "std")]
51use std::sync::RwLock;
52
53#[cfg(not(feature = "std"))]
54use spin::RwLock;
55
56use core::sync::atomic::{AtomicU32, Ordering};
57
58use crate::document::{HiveDocument, MergeResult};
59use crate::sync::crdt::{
60 ChatCRDT, ChatMessage, EmergencyEvent, EventType, GCounter, Peripheral, PeripheralType,
61};
62use crate::NodeId;
63
64pub struct DocumentSync {
77 node_id: NodeId,
79
80 counter: RwLock<GCounter>,
82
83 peripheral: RwLock<Peripheral>,
85
86 emergency: RwLock<Option<EmergencyEvent>>,
88
89 chat: RwLock<Option<ChatCRDT>>,
91
92 version: AtomicU32,
94}
95
96impl DocumentSync {
97 pub fn new(node_id: NodeId, callsign: &str) -> Self {
99 let peripheral = Peripheral::new(node_id.as_u32(), PeripheralType::SoldierSensor)
100 .with_callsign(callsign);
101
102 Self {
103 node_id,
104 counter: RwLock::new(GCounter::new()),
105 peripheral: RwLock::new(peripheral),
106 emergency: RwLock::new(None),
107 chat: RwLock::new(None),
108 version: AtomicU32::new(1),
109 }
110 }
111
112 pub fn with_peripheral_type(node_id: NodeId, callsign: &str, ptype: PeripheralType) -> Self {
114 let peripheral = Peripheral::new(node_id.as_u32(), ptype).with_callsign(callsign);
115
116 Self {
117 node_id,
118 counter: RwLock::new(GCounter::new()),
119 peripheral: RwLock::new(peripheral),
120 emergency: RwLock::new(None),
121 chat: RwLock::new(None),
122 version: AtomicU32::new(1),
123 }
124 }
125
126 pub fn node_id(&self) -> NodeId {
128 self.node_id
129 }
130
131 pub fn version(&self) -> u32 {
133 self.version.load(Ordering::Relaxed)
134 }
135
136 pub fn total_count(&self) -> u64 {
138 self.counter.read().unwrap().value()
139 }
140
141 pub fn local_count(&self) -> u64 {
143 self.counter.read().unwrap().node_count(&self.node_id)
144 }
145
146 pub fn current_event(&self) -> Option<EventType> {
148 self.peripheral
149 .read()
150 .unwrap()
151 .last_event
152 .as_ref()
153 .map(|e| e.event_type)
154 }
155
156 pub fn is_emergency_active(&self) -> bool {
158 self.current_event() == Some(EventType::Emergency)
159 }
160
161 pub fn is_ack_active(&self) -> bool {
163 self.current_event() == Some(EventType::Ack)
164 }
165
166 pub fn callsign(&self) -> String {
168 self.peripheral.read().unwrap().callsign_str().to_string()
169 }
170
171 pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
175 {
177 let mut peripheral = self.peripheral.write().unwrap();
178 peripheral.set_event(EventType::Emergency, timestamp);
179 }
180
181 self.increment_counter_internal();
183
184 self.build_document()
186 }
187
188 pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
190 {
192 let mut peripheral = self.peripheral.write().unwrap();
193 peripheral.set_event(EventType::Ack, timestamp);
194 }
195
196 self.increment_counter_internal();
198
199 self.build_document()
201 }
202
203 pub fn clear_event(&self) {
205 let mut peripheral = self.peripheral.write().unwrap();
206 peripheral.clear_event();
207 self.bump_version();
208 }
209
210 pub fn increment_counter(&self) {
212 self.increment_counter_internal();
213 }
214
215 pub fn update_health(&self, battery_percent: u8) {
217 let mut peripheral = self.peripheral.write().unwrap();
218 peripheral.health.battery_percent = battery_percent;
219 self.bump_version();
220 }
221
222 pub fn update_activity(&self, activity: u8) {
224 let mut peripheral = self.peripheral.write().unwrap();
225 peripheral.health.activity = activity;
226 self.bump_version();
227 }
228
229 pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
231 let mut peripheral = self.peripheral.write().unwrap();
232 peripheral.health.battery_percent = battery_percent;
233 peripheral.health.activity = activity;
234 self.bump_version();
235 }
236
237 pub fn update_heart_rate(&self, heart_rate: u8) {
239 let mut peripheral = self.peripheral.write().unwrap();
240 peripheral.health.heart_rate = Some(heart_rate);
241 self.bump_version();
242 }
243
244 pub fn update_location(&self, latitude: f32, longitude: f32, altitude: Option<f32>) {
246 let mut peripheral = self.peripheral.write().unwrap();
247 peripheral.set_location(latitude, longitude, altitude);
248 self.bump_version();
249 }
250
251 pub fn clear_location(&self) {
253 let mut peripheral = self.peripheral.write().unwrap();
254 peripheral.clear_location();
255 self.bump_version();
256 }
257
258 pub fn update_callsign(&self, callsign: &str) {
260 let mut peripheral = self.peripheral.write().unwrap();
261 peripheral.set_callsign(callsign);
262 self.bump_version();
263 }
264
265 pub fn set_peripheral_event(&self, event_type: EventType, timestamp: u64) {
267 let mut peripheral = self.peripheral.write().unwrap();
268 peripheral.set_event(event_type, timestamp);
269 self.bump_version();
270 }
271
272 pub fn clear_peripheral_event(&self) {
274 let mut peripheral = self.peripheral.write().unwrap();
275 peripheral.clear_event();
276 self.bump_version();
277 }
278
279 #[allow(clippy::too_many_arguments)]
284 pub fn update_peripheral_state(
285 &self,
286 callsign: &str,
287 battery_percent: u8,
288 heart_rate: Option<u8>,
289 latitude: Option<f32>,
290 longitude: Option<f32>,
291 altitude: Option<f32>,
292 event_type: Option<EventType>,
293 timestamp: u64,
294 ) {
295 let mut peripheral = self.peripheral.write().unwrap();
296 peripheral.set_callsign(callsign);
297 peripheral.health.battery_percent = battery_percent;
298 if let Some(hr) = heart_rate {
299 peripheral.health.heart_rate = Some(hr);
300 }
301 if let (Some(lat), Some(lon)) = (latitude, longitude) {
302 peripheral.set_location(lat, lon, altitude);
303 } else {
304 peripheral.clear_location();
305 }
306 if let Some(evt) = event_type {
307 peripheral.set_event(evt, timestamp);
308 }
309 peripheral.timestamp = timestamp;
310 drop(peripheral);
311 self.bump_version();
312 }
313
314 pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
321 {
323 let mut emergency = self.emergency.write().unwrap();
324 *emergency = Some(EmergencyEvent::new(
325 self.node_id.as_u32(),
326 timestamp,
327 known_peers,
328 ));
329 }
330
331 {
333 let mut peripheral = self.peripheral.write().unwrap();
334 peripheral.set_event(EventType::Emergency, timestamp);
335 }
336
337 self.increment_counter_internal();
338 self.build_document()
339 }
340
341 pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
345 let changed = {
346 let mut emergency = self.emergency.write().unwrap();
347 if let Some(ref mut e) = *emergency {
348 e.ack(self.node_id.as_u32())
349 } else {
350 return None;
351 }
352 };
353
354 if changed {
355 {
357 let mut peripheral = self.peripheral.write().unwrap();
358 peripheral.set_event(EventType::Ack, timestamp);
359 }
360
361 self.increment_counter_internal();
362 }
363
364 Some(self.build_document())
365 }
366
367 pub fn clear_emergency(&self) {
369 let mut emergency = self.emergency.write().unwrap();
370 if emergency.is_some() {
371 *emergency = None;
372 drop(emergency);
373
374 let mut peripheral = self.peripheral.write().unwrap();
376 peripheral.clear_event();
377
378 self.bump_version();
379 }
380 }
381
382 pub fn has_active_emergency(&self) -> bool {
384 self.emergency.read().unwrap().is_some()
385 }
386
387 pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
391 let emergency = self.emergency.read().unwrap();
392 emergency.as_ref().map(|e| {
393 (
394 e.source_node(),
395 e.timestamp(),
396 e.ack_count(),
397 e.pending_nodes().len(),
398 )
399 })
400 }
401
402 pub fn has_peer_acked(&self, peer_id: u32) -> bool {
404 let emergency = self.emergency.read().unwrap();
405 emergency
406 .as_ref()
407 .map(|e| e.has_acked(peer_id))
408 .unwrap_or(false)
409 }
410
411 pub fn all_peers_acked(&self) -> bool {
413 let emergency = self.emergency.read().unwrap();
414 emergency.as_ref().map(|e| e.all_acked()).unwrap_or(true)
415 }
416
417 pub fn add_chat_message(&self, sender: &str, text: &str, timestamp: u64) -> bool {
423 let mut chat = self.chat.write().unwrap();
424
425 let our_chat = chat.get_or_insert_with(ChatCRDT::new);
426 let msg = ChatMessage::new(self.node_id.as_u32(), timestamp, sender, text);
427
428 if our_chat.add_message(msg) {
429 self.bump_version();
430 true
431 } else {
432 false
433 }
434 }
435
436 pub fn add_chat_reply(
440 &self,
441 sender: &str,
442 text: &str,
443 reply_to_node: u32,
444 reply_to_timestamp: u64,
445 timestamp: u64,
446 ) -> bool {
447 let mut chat = self.chat.write().unwrap();
448
449 let our_chat = chat.get_or_insert_with(ChatCRDT::new);
450 let mut msg = ChatMessage::new(self.node_id.as_u32(), timestamp, sender, text);
451 msg.set_reply_to(reply_to_node, reply_to_timestamp);
452
453 if our_chat.add_message(msg) {
454 self.bump_version();
455 true
456 } else {
457 false
458 }
459 }
460
461 pub fn chat_count(&self) -> usize {
463 self.chat.read().unwrap().as_ref().map_or(0, |c| c.len())
464 }
465
466 pub fn chat_messages_since(
470 &self,
471 since_timestamp: u64,
472 ) -> Vec<(u32, u64, String, String, u32, u64)> {
473 let chat = self.chat.read().unwrap();
474 chat.as_ref()
475 .map(|c| {
476 c.messages_since(since_timestamp)
477 .map(|m| {
478 (
479 m.origin_node,
480 m.timestamp,
481 m.sender().to_string(),
482 m.text().to_string(),
483 m.reply_to_node,
484 m.reply_to_timestamp,
485 )
486 })
487 .collect()
488 })
489 .unwrap_or_default()
490 }
491
492 pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
496 self.chat_messages_since(0)
497 }
498
499 pub fn chat_snapshot(&self) -> Option<ChatCRDT> {
501 self.chat.read().unwrap().clone()
502 }
503
504 pub fn counter_entries(&self) -> Vec<(u32, u64)> {
511 self.counter.read().unwrap().entries().collect()
512 }
513
514 pub fn peripheral_snapshot(&self) -> Peripheral {
518 self.peripheral.read().unwrap().clone()
519 }
520
521 pub fn emergency_snapshot(&self) -> Option<EmergencyEvent> {
525 self.emergency.read().unwrap().clone()
526 }
527
528 pub fn build_document(&self) -> Vec<u8> {
534 let counter = self.counter.read().unwrap().clone();
535 let peripheral = self.peripheral.read().unwrap().clone();
536 let emergency = self.emergency.read().unwrap().clone();
537
538 let chat = self.chat.read().unwrap().as_ref().map(|c| c.for_sync());
541
542 let doc = HiveDocument {
543 version: self.version.load(Ordering::Relaxed),
544 node_id: self.node_id,
545 counter,
546 peripheral: Some(peripheral),
547 emergency,
548 chat,
549 };
550
551 doc.encode()
552 }
553
554 pub fn merge_document(&self, data: &[u8]) -> Option<MergeResult> {
559 let received = HiveDocument::decode(data)?;
560
561 if received.node_id == self.node_id {
563 return None;
564 }
565
566 let counter_changed = {
568 let mut counter = self.counter.write().unwrap();
569 let old_value = counter.value();
570 counter.merge(&received.counter);
571 counter.value() != old_value
572 };
573
574 let emergency_changed = if let Some(ref received_emergency) = received.emergency {
576 let mut emergency = self.emergency.write().unwrap();
577 match &mut *emergency {
578 Some(ref mut our_emergency) => our_emergency.merge(received_emergency),
579 None => {
580 *emergency = Some(received_emergency.clone());
581 true
582 }
583 }
584 } else {
585 false
586 };
587
588 let chat_changed = if let Some(ref received_chat) = received.chat {
590 if !received_chat.is_empty() {
591 let mut chat = self.chat.write().unwrap();
592 match &mut *chat {
593 Some(ref mut our_chat) => our_chat.merge(received_chat),
594 None => {
595 *chat = Some(received_chat.clone());
596 true
597 }
598 }
599 } else {
600 false
601 }
602 } else {
603 false
604 };
605
606 if counter_changed || emergency_changed || chat_changed {
607 self.bump_version();
608 }
609
610 let event = received
612 .peripheral
613 .as_ref()
614 .and_then(|p| p.last_event.clone());
615
616 Some(MergeResult {
617 source_node: received.node_id,
618 event,
619 peer_peripheral: received.peripheral,
620 counter_changed,
621 emergency_changed,
622 chat_changed,
623 total_count: self.total_count(),
624 })
625 }
626
627 pub fn decode_document(data: &[u8]) -> Option<HiveDocument> {
629 HiveDocument::decode(data)
630 }
631
632 fn increment_counter_internal(&self) {
635 let mut counter = self.counter.write().unwrap();
636 counter.increment(&self.node_id, 1);
637 drop(counter);
638 self.bump_version();
639 }
640
641 fn bump_version(&self) {
642 self.version.fetch_add(1, Ordering::Relaxed);
643 }
644}
645
646#[derive(Debug, Clone)]
648pub struct DocumentCheck {
649 pub node_id: NodeId,
651 pub is_emergency: bool,
653 pub is_ack: bool,
655}
656
657impl DocumentCheck {
658 pub fn from_document(data: &[u8]) -> Option<Self> {
660 let doc = HiveDocument::decode(data)?;
661
662 let (is_emergency, is_ack) = doc
663 .peripheral
664 .as_ref()
665 .and_then(|p| p.last_event.as_ref())
666 .map(|e| {
667 (
668 e.event_type == EventType::Emergency,
669 e.event_type == EventType::Ack,
670 )
671 })
672 .unwrap_or((false, false));
673
674 Some(Self {
675 node_id: doc.node_id,
676 is_emergency,
677 is_ack,
678 })
679 }
680}
681
682#[cfg(test)]
683mod tests {
684 use super::*;
685
686 const TEST_TIMESTAMP: u64 = 1705276800000;
688
689 #[test]
690 fn test_document_sync_new() {
691 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
692
693 assert_eq!(sync.node_id().as_u32(), 0x12345678);
694 assert_eq!(sync.version(), 1);
695 assert_eq!(sync.total_count(), 0);
696 assert_eq!(sync.callsign(), "ALPHA-1");
697 assert!(sync.current_event().is_none());
698 }
699
700 #[test]
701 fn test_send_emergency() {
702 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
703
704 let doc_bytes = sync.send_emergency(TEST_TIMESTAMP);
705
706 assert!(!doc_bytes.is_empty());
707 assert_eq!(sync.total_count(), 1);
708 assert!(sync.is_emergency_active());
709 assert!(!sync.is_ack_active());
710
711 let doc = HiveDocument::decode(&doc_bytes).unwrap();
713 assert_eq!(doc.node_id.as_u32(), 0x12345678);
714 assert!(doc.peripheral.is_some());
715 let event = doc.peripheral.unwrap().last_event.unwrap();
716 assert_eq!(event.event_type, EventType::Emergency);
717 }
718
719 #[test]
720 fn test_send_ack() {
721 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
722
723 let doc_bytes = sync.send_ack(TEST_TIMESTAMP);
724
725 assert!(!doc_bytes.is_empty());
726 assert_eq!(sync.total_count(), 1);
727 assert!(sync.is_ack_active());
728 assert!(!sync.is_emergency_active());
729 }
730
731 #[test]
732 fn test_clear_event() {
733 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
734
735 sync.send_emergency(TEST_TIMESTAMP);
736 assert!(sync.is_emergency_active());
737
738 sync.clear_event();
739 assert!(sync.current_event().is_none());
740 }
741
742 #[test]
743 fn test_merge_document() {
744 let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
745 let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
746
747 let doc_bytes = sync2.send_emergency(TEST_TIMESTAMP);
749
750 let result = sync1.merge_document(&doc_bytes);
752 assert!(result.is_some());
753
754 let result = result.unwrap();
755 assert_eq!(result.source_node.as_u32(), 0x22222222);
756 assert!(result.is_emergency());
757 assert!(result.counter_changed);
758 assert_eq!(result.total_count, 1);
759
760 assert_eq!(sync1.local_count(), 0);
762 assert_eq!(sync1.total_count(), 1);
763 }
764
765 #[test]
766 fn test_merge_own_document_ignored() {
767 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
768
769 let doc_bytes = sync.send_emergency(TEST_TIMESTAMP);
770
771 let result = sync.merge_document(&doc_bytes);
773 assert!(result.is_none());
774 }
775
776 #[test]
777 fn test_version_increments() {
778 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
779
780 assert_eq!(sync.version(), 1);
781
782 sync.increment_counter();
783 assert_eq!(sync.version(), 2);
784
785 sync.send_emergency(TEST_TIMESTAMP);
786 assert_eq!(sync.version(), 3);
787
788 sync.clear_event();
789 assert_eq!(sync.version(), 4);
790 }
791
792 #[test]
793 fn test_document_check() {
794 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
795
796 let emergency_doc = sync.send_emergency(TEST_TIMESTAMP);
797 let check = DocumentCheck::from_document(&emergency_doc).unwrap();
798 assert_eq!(check.node_id.as_u32(), 0x12345678);
799 assert!(check.is_emergency);
800 assert!(!check.is_ack);
801
802 sync.clear_event();
803 let ack_doc = sync.send_ack(TEST_TIMESTAMP + 1000);
804 let check = DocumentCheck::from_document(&ack_doc).unwrap();
805 assert!(!check.is_emergency);
806 assert!(check.is_ack);
807 }
808
809 #[test]
810 fn test_counter_merge_idempotent() {
811 let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
812 let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
813
814 let doc_bytes = sync2.send_emergency(TEST_TIMESTAMP);
816
817 let result1 = sync1.merge_document(&doc_bytes).unwrap();
819 assert!(result1.counter_changed);
820 assert_eq!(sync1.total_count(), 1);
821
822 let result2 = sync1.merge_document(&doc_bytes).unwrap();
823 assert!(!result2.counter_changed); assert_eq!(sync1.total_count(), 1);
825 }
826}