1#[cfg(not(feature = "std"))]
49use alloc::{string::String, vec::Vec};
50#[cfg(feature = "std")]
51use std::sync::RwLock;
52
53#[cfg(not(feature = "std"))]
54use spin::RwLock;
55
56use core::sync::atomic::{AtomicU32, Ordering};
57
58use crate::document::{HiveDocument, MergeResult};
59#[cfg(feature = "legacy-chat")]
60use crate::sync::crdt::{ChatCRDT, ChatMessage};
61use crate::sync::crdt::{EmergencyEvent, EventType, GCounter, Peripheral, PeripheralType};
62use crate::NodeId;
63
64pub struct DocumentSync {
77 node_id: NodeId,
79
80 counter: RwLock<GCounter>,
82
83 peripheral: RwLock<Peripheral>,
85
86 emergency: RwLock<Option<EmergencyEvent>>,
88
89 #[cfg(feature = "legacy-chat")]
91 chat: RwLock<Option<ChatCRDT>>,
92
93 version: AtomicU32,
95}
96
97impl DocumentSync {
98 pub fn new(node_id: NodeId, callsign: &str) -> Self {
100 let peripheral = Peripheral::new(node_id.as_u32(), PeripheralType::SoldierSensor)
101 .with_callsign(callsign);
102
103 Self {
104 node_id,
105 counter: RwLock::new(GCounter::new()),
106 peripheral: RwLock::new(peripheral),
107 emergency: RwLock::new(None),
108 #[cfg(feature = "legacy-chat")]
109 chat: RwLock::new(None),
110 version: AtomicU32::new(1),
111 }
112 }
113
114 pub fn with_peripheral_type(node_id: NodeId, callsign: &str, ptype: PeripheralType) -> Self {
116 let peripheral = Peripheral::new(node_id.as_u32(), ptype).with_callsign(callsign);
117
118 Self {
119 node_id,
120 counter: RwLock::new(GCounter::new()),
121 peripheral: RwLock::new(peripheral),
122 emergency: RwLock::new(None),
123 #[cfg(feature = "legacy-chat")]
124 chat: RwLock::new(None),
125 version: AtomicU32::new(1),
126 }
127 }
128
129 pub fn node_id(&self) -> NodeId {
131 self.node_id
132 }
133
134 pub fn version(&self) -> u32 {
136 self.version.load(Ordering::Relaxed)
137 }
138
139 pub fn total_count(&self) -> u64 {
141 self.counter.read().unwrap().value()
142 }
143
144 pub fn local_count(&self) -> u64 {
146 self.counter.read().unwrap().node_count(&self.node_id)
147 }
148
149 pub fn current_event(&self) -> Option<EventType> {
151 self.peripheral
152 .read()
153 .unwrap()
154 .last_event
155 .as_ref()
156 .map(|e| e.event_type)
157 }
158
159 pub fn is_emergency_active(&self) -> bool {
161 self.current_event() == Some(EventType::Emergency)
162 }
163
164 pub fn is_ack_active(&self) -> bool {
166 self.current_event() == Some(EventType::Ack)
167 }
168
169 pub fn callsign(&self) -> String {
171 self.peripheral.read().unwrap().callsign_str().to_string()
172 }
173
174 pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
178 {
180 let mut peripheral = self.peripheral.write().unwrap();
181 peripheral.set_event(EventType::Emergency, timestamp);
182 }
183
184 self.increment_counter_internal();
186
187 self.build_document()
189 }
190
191 pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
193 {
195 let mut peripheral = self.peripheral.write().unwrap();
196 peripheral.set_event(EventType::Ack, timestamp);
197 }
198
199 self.increment_counter_internal();
201
202 self.build_document()
204 }
205
206 pub fn clear_event(&self) {
208 let mut peripheral = self.peripheral.write().unwrap();
209 peripheral.clear_event();
210 self.bump_version();
211 }
212
213 pub fn increment_counter(&self) {
215 self.increment_counter_internal();
216 }
217
218 pub fn update_health(&self, battery_percent: u8) {
220 let mut peripheral = self.peripheral.write().unwrap();
221 peripheral.health.battery_percent = battery_percent;
222 self.bump_version();
223 }
224
225 pub fn update_activity(&self, activity: u8) {
227 let mut peripheral = self.peripheral.write().unwrap();
228 peripheral.health.activity = activity;
229 self.bump_version();
230 }
231
232 pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
234 let mut peripheral = self.peripheral.write().unwrap();
235 peripheral.health.battery_percent = battery_percent;
236 peripheral.health.activity = activity;
237 self.bump_version();
238 }
239
240 pub fn update_heart_rate(&self, heart_rate: u8) {
242 let mut peripheral = self.peripheral.write().unwrap();
243 peripheral.health.heart_rate = Some(heart_rate);
244 self.bump_version();
245 }
246
247 pub fn update_location(&self, latitude: f32, longitude: f32, altitude: Option<f32>) {
249 let mut peripheral = self.peripheral.write().unwrap();
250 peripheral.set_location(latitude, longitude, altitude);
251 self.bump_version();
252 }
253
254 pub fn clear_location(&self) {
256 let mut peripheral = self.peripheral.write().unwrap();
257 peripheral.clear_location();
258 self.bump_version();
259 }
260
261 pub fn update_callsign(&self, callsign: &str) {
263 let mut peripheral = self.peripheral.write().unwrap();
264 peripheral.set_callsign(callsign);
265 self.bump_version();
266 }
267
268 pub fn set_peripheral_event(&self, event_type: EventType, timestamp: u64) {
270 let mut peripheral = self.peripheral.write().unwrap();
271 peripheral.set_event(event_type, timestamp);
272 self.bump_version();
273 }
274
275 pub fn clear_peripheral_event(&self) {
277 let mut peripheral = self.peripheral.write().unwrap();
278 peripheral.clear_event();
279 self.bump_version();
280 }
281
282 #[allow(clippy::too_many_arguments)]
287 pub fn update_peripheral_state(
288 &self,
289 callsign: &str,
290 battery_percent: u8,
291 heart_rate: Option<u8>,
292 latitude: Option<f32>,
293 longitude: Option<f32>,
294 altitude: Option<f32>,
295 event_type: Option<EventType>,
296 timestamp: u64,
297 ) {
298 let mut peripheral = self.peripheral.write().unwrap();
299 peripheral.set_callsign(callsign);
300 peripheral.health.battery_percent = battery_percent;
301 if let Some(hr) = heart_rate {
302 peripheral.health.heart_rate = Some(hr);
303 }
304 if let (Some(lat), Some(lon)) = (latitude, longitude) {
305 peripheral.set_location(lat, lon, altitude);
306 } else {
307 peripheral.clear_location();
308 }
309 if let Some(evt) = event_type {
310 peripheral.set_event(evt, timestamp);
311 }
312 peripheral.timestamp = timestamp;
313 drop(peripheral);
314 self.bump_version();
315 }
316
317 pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
324 {
326 let mut emergency = self.emergency.write().unwrap();
327 *emergency = Some(EmergencyEvent::new(
328 self.node_id.as_u32(),
329 timestamp,
330 known_peers,
331 ));
332 }
333
334 {
336 let mut peripheral = self.peripheral.write().unwrap();
337 peripheral.set_event(EventType::Emergency, timestamp);
338 }
339
340 self.increment_counter_internal();
341 self.build_document()
342 }
343
344 pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
348 let changed = {
349 let mut emergency = self.emergency.write().unwrap();
350 if let Some(ref mut e) = *emergency {
351 e.ack(self.node_id.as_u32())
352 } else {
353 return None;
354 }
355 };
356
357 if changed {
358 {
360 let mut peripheral = self.peripheral.write().unwrap();
361 peripheral.set_event(EventType::Ack, timestamp);
362 }
363
364 self.increment_counter_internal();
365 }
366
367 Some(self.build_document())
368 }
369
370 pub fn clear_emergency(&self) {
372 let mut emergency = self.emergency.write().unwrap();
373 if emergency.is_some() {
374 *emergency = None;
375 drop(emergency);
376
377 let mut peripheral = self.peripheral.write().unwrap();
379 peripheral.clear_event();
380
381 self.bump_version();
382 }
383 }
384
385 pub fn has_active_emergency(&self) -> bool {
387 self.emergency.read().unwrap().is_some()
388 }
389
390 pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
394 let emergency = self.emergency.read().unwrap();
395 emergency.as_ref().map(|e| {
396 (
397 e.source_node(),
398 e.timestamp(),
399 e.ack_count(),
400 e.pending_nodes().len(),
401 )
402 })
403 }
404
405 pub fn has_peer_acked(&self, peer_id: u32) -> bool {
407 let emergency = self.emergency.read().unwrap();
408 emergency
409 .as_ref()
410 .map(|e| e.has_acked(peer_id))
411 .unwrap_or(false)
412 }
413
414 pub fn all_peers_acked(&self) -> bool {
416 let emergency = self.emergency.read().unwrap();
417 emergency.as_ref().map(|e| e.all_acked()).unwrap_or(true)
418 }
419
420 #[cfg(feature = "legacy-chat")]
426 pub fn add_chat_message(&self, sender: &str, text: &str, timestamp: u64) -> bool {
427 let mut chat = self.chat.write().unwrap();
428
429 let our_chat = chat.get_or_insert_with(ChatCRDT::new);
430 let msg = ChatMessage::new(self.node_id.as_u32(), timestamp, sender, text);
431
432 if our_chat.add_message(msg) {
433 self.bump_version();
434 true
435 } else {
436 false
437 }
438 }
439
440 #[cfg(feature = "legacy-chat")]
444 pub fn add_chat_reply(
445 &self,
446 sender: &str,
447 text: &str,
448 reply_to_node: u32,
449 reply_to_timestamp: u64,
450 timestamp: u64,
451 ) -> bool {
452 let mut chat = self.chat.write().unwrap();
453
454 let our_chat = chat.get_or_insert_with(ChatCRDT::new);
455 let mut msg = ChatMessage::new(self.node_id.as_u32(), timestamp, sender, text);
456 msg.set_reply_to(reply_to_node, reply_to_timestamp);
457
458 if our_chat.add_message(msg) {
459 self.bump_version();
460 true
461 } else {
462 false
463 }
464 }
465
466 #[cfg(feature = "legacy-chat")]
468 pub fn chat_count(&self) -> usize {
469 self.chat.read().unwrap().as_ref().map_or(0, |c| c.len())
470 }
471
472 #[cfg(feature = "legacy-chat")]
476 pub fn chat_messages_since(
477 &self,
478 since_timestamp: u64,
479 ) -> Vec<(u32, u64, String, String, u32, u64)> {
480 let chat = self.chat.read().unwrap();
481 chat.as_ref()
482 .map(|c| {
483 c.messages_since(since_timestamp)
484 .map(|m| {
485 (
486 m.origin_node,
487 m.timestamp,
488 m.sender().to_string(),
489 m.text().to_string(),
490 m.reply_to_node,
491 m.reply_to_timestamp,
492 )
493 })
494 .collect()
495 })
496 .unwrap_or_default()
497 }
498
499 #[cfg(feature = "legacy-chat")]
503 pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
504 self.chat_messages_since(0)
505 }
506
507 #[cfg(feature = "legacy-chat")]
509 pub fn chat_snapshot(&self) -> Option<ChatCRDT> {
510 self.chat.read().unwrap().clone()
511 }
512
513 pub fn counter_entries(&self) -> Vec<(u32, u64)> {
520 self.counter.read().unwrap().entries().collect()
521 }
522
523 pub fn peripheral_snapshot(&self) -> Peripheral {
527 self.peripheral.read().unwrap().clone()
528 }
529
530 pub fn emergency_snapshot(&self) -> Option<EmergencyEvent> {
534 self.emergency.read().unwrap().clone()
535 }
536
537 pub fn build_document(&self) -> Vec<u8> {
543 let counter = self.counter.read().unwrap().clone();
544 let peripheral = self.peripheral.read().unwrap().clone();
545 let emergency = self.emergency.read().unwrap().clone();
546
547 #[cfg(feature = "legacy-chat")]
550 let chat = self.chat.read().unwrap().as_ref().map(|c| c.for_sync());
551
552 let doc = HiveDocument {
553 version: self.version.load(Ordering::Relaxed),
554 node_id: self.node_id,
555 counter,
556 peripheral: Some(peripheral),
557 emergency,
558 #[cfg(feature = "legacy-chat")]
559 chat,
560 };
561
562 doc.encode()
563 }
564
565 pub fn merge_document(&self, data: &[u8]) -> Option<MergeResult> {
570 let received = HiveDocument::decode(data)?;
571
572 if received.node_id == self.node_id {
574 return None;
575 }
576
577 let counter_changed = {
579 let mut counter = self.counter.write().unwrap();
580 let old_value = counter.value();
581 counter.merge(&received.counter);
582 counter.value() != old_value
583 };
584
585 let emergency_changed = if let Some(ref received_emergency) = received.emergency {
587 let mut emergency = self.emergency.write().unwrap();
588 match &mut *emergency {
589 Some(ref mut our_emergency) => our_emergency.merge(received_emergency),
590 None => {
591 *emergency = Some(received_emergency.clone());
592 true
593 }
594 }
595 } else {
596 false
597 };
598
599 #[cfg(feature = "legacy-chat")]
601 let chat_changed = if let Some(ref received_chat) = received.chat {
602 if !received_chat.is_empty() {
603 let mut chat = self.chat.write().unwrap();
604 match &mut *chat {
605 Some(ref mut our_chat) => our_chat.merge(received_chat),
606 None => {
607 *chat = Some(received_chat.clone());
608 true
609 }
610 }
611 } else {
612 false
613 }
614 } else {
615 false
616 };
617 #[cfg(not(feature = "legacy-chat"))]
618 let chat_changed = false;
619
620 if counter_changed || emergency_changed || chat_changed {
621 self.bump_version();
622 }
623
624 let event = received
626 .peripheral
627 .as_ref()
628 .and_then(|p| p.last_event.clone());
629
630 Some(MergeResult {
631 source_node: received.node_id,
632 event,
633 peer_peripheral: received.peripheral,
634 counter_changed,
635 emergency_changed,
636 chat_changed,
637 total_count: self.total_count(),
638 })
639 }
640
641 pub fn decode_document(data: &[u8]) -> Option<HiveDocument> {
643 HiveDocument::decode(data)
644 }
645
646 fn increment_counter_internal(&self) {
649 let mut counter = self.counter.write().unwrap();
650 counter.increment(&self.node_id, 1);
651 drop(counter);
652 self.bump_version();
653 }
654
655 fn bump_version(&self) {
656 self.version.fetch_add(1, Ordering::Relaxed);
657 }
658}
659
660#[derive(Debug, Clone)]
662pub struct DocumentCheck {
663 pub node_id: NodeId,
665 pub is_emergency: bool,
667 pub is_ack: bool,
669}
670
671impl DocumentCheck {
672 pub fn from_document(data: &[u8]) -> Option<Self> {
674 let doc = HiveDocument::decode(data)?;
675
676 let (is_emergency, is_ack) = doc
677 .peripheral
678 .as_ref()
679 .and_then(|p| p.last_event.as_ref())
680 .map(|e| {
681 (
682 e.event_type == EventType::Emergency,
683 e.event_type == EventType::Ack,
684 )
685 })
686 .unwrap_or((false, false));
687
688 Some(Self {
689 node_id: doc.node_id,
690 is_emergency,
691 is_ack,
692 })
693 }
694}
695
696#[cfg(test)]
697mod tests {
698 use super::*;
699
700 const TEST_TIMESTAMP: u64 = 1705276800000;
702
703 #[test]
704 fn test_document_sync_new() {
705 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
706
707 assert_eq!(sync.node_id().as_u32(), 0x12345678);
708 assert_eq!(sync.version(), 1);
709 assert_eq!(sync.total_count(), 0);
710 assert_eq!(sync.callsign(), "ALPHA-1");
711 assert!(sync.current_event().is_none());
712 }
713
714 #[test]
715 fn test_send_emergency() {
716 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
717
718 let doc_bytes = sync.send_emergency(TEST_TIMESTAMP);
719
720 assert!(!doc_bytes.is_empty());
721 assert_eq!(sync.total_count(), 1);
722 assert!(sync.is_emergency_active());
723 assert!(!sync.is_ack_active());
724
725 let doc = HiveDocument::decode(&doc_bytes).unwrap();
727 assert_eq!(doc.node_id.as_u32(), 0x12345678);
728 assert!(doc.peripheral.is_some());
729 let event = doc.peripheral.unwrap().last_event.unwrap();
730 assert_eq!(event.event_type, EventType::Emergency);
731 }
732
733 #[test]
734 fn test_send_ack() {
735 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
736
737 let doc_bytes = sync.send_ack(TEST_TIMESTAMP);
738
739 assert!(!doc_bytes.is_empty());
740 assert_eq!(sync.total_count(), 1);
741 assert!(sync.is_ack_active());
742 assert!(!sync.is_emergency_active());
743 }
744
745 #[test]
746 fn test_clear_event() {
747 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
748
749 sync.send_emergency(TEST_TIMESTAMP);
750 assert!(sync.is_emergency_active());
751
752 sync.clear_event();
753 assert!(sync.current_event().is_none());
754 }
755
756 #[test]
757 fn test_merge_document() {
758 let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
759 let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
760
761 let doc_bytes = sync2.send_emergency(TEST_TIMESTAMP);
763
764 let result = sync1.merge_document(&doc_bytes);
766 assert!(result.is_some());
767
768 let result = result.unwrap();
769 assert_eq!(result.source_node.as_u32(), 0x22222222);
770 assert!(result.is_emergency());
771 assert!(result.counter_changed);
772 assert_eq!(result.total_count, 1);
773
774 assert_eq!(sync1.local_count(), 0);
776 assert_eq!(sync1.total_count(), 1);
777 }
778
779 #[test]
780 fn test_merge_own_document_ignored() {
781 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
782
783 let doc_bytes = sync.send_emergency(TEST_TIMESTAMP);
784
785 let result = sync.merge_document(&doc_bytes);
787 assert!(result.is_none());
788 }
789
790 #[test]
791 fn test_version_increments() {
792 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
793
794 assert_eq!(sync.version(), 1);
795
796 sync.increment_counter();
797 assert_eq!(sync.version(), 2);
798
799 sync.send_emergency(TEST_TIMESTAMP);
800 assert_eq!(sync.version(), 3);
801
802 sync.clear_event();
803 assert_eq!(sync.version(), 4);
804 }
805
806 #[test]
807 fn test_document_check() {
808 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
809
810 let emergency_doc = sync.send_emergency(TEST_TIMESTAMP);
811 let check = DocumentCheck::from_document(&emergency_doc).unwrap();
812 assert_eq!(check.node_id.as_u32(), 0x12345678);
813 assert!(check.is_emergency);
814 assert!(!check.is_ack);
815
816 sync.clear_event();
817 let ack_doc = sync.send_ack(TEST_TIMESTAMP + 1000);
818 let check = DocumentCheck::from_document(&ack_doc).unwrap();
819 assert!(!check.is_emergency);
820 assert!(check.is_ack);
821 }
822
823 #[test]
824 fn test_counter_merge_idempotent() {
825 let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
826 let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
827
828 let doc_bytes = sync2.send_emergency(TEST_TIMESTAMP);
830
831 let result1 = sync1.merge_document(&doc_bytes).unwrap();
833 assert!(result1.counter_changed);
834 assert_eq!(sync1.total_count(), 1);
835
836 let result2 = sync1.merge_document(&doc_bytes).unwrap();
837 assert!(!result2.counter_changed); assert_eq!(sync1.total_count(), 1);
839 }
840}