1#[cfg(not(feature = "std"))]
49use alloc::{string::String, vec::Vec};
50#[cfg(feature = "std")]
51use std::sync::RwLock;
52
53#[cfg(not(feature = "std"))]
54use spin::RwLock;
55
56use core::sync::atomic::{AtomicU32, Ordering};
57
58use crate::document::{HiveDocument, MergeResult};
59use crate::sync::crdt::{
60 ChatCRDT, ChatMessage, EmergencyEvent, EventType, GCounter, Peripheral, PeripheralType,
61};
62use crate::NodeId;
63
64pub struct DocumentSync {
77 node_id: NodeId,
79
80 counter: RwLock<GCounter>,
82
83 peripheral: RwLock<Peripheral>,
85
86 emergency: RwLock<Option<EmergencyEvent>>,
88
89 chat: RwLock<Option<ChatCRDT>>,
91
92 version: AtomicU32,
94}
95
96impl DocumentSync {
97 pub fn new(node_id: NodeId, callsign: &str) -> Self {
99 let peripheral = Peripheral::new(node_id.as_u32(), PeripheralType::SoldierSensor)
100 .with_callsign(callsign);
101
102 Self {
103 node_id,
104 counter: RwLock::new(GCounter::new()),
105 peripheral: RwLock::new(peripheral),
106 emergency: RwLock::new(None),
107 chat: RwLock::new(None),
108 version: AtomicU32::new(1),
109 }
110 }
111
112 pub fn with_peripheral_type(node_id: NodeId, callsign: &str, ptype: PeripheralType) -> Self {
114 let peripheral = Peripheral::new(node_id.as_u32(), ptype).with_callsign(callsign);
115
116 Self {
117 node_id,
118 counter: RwLock::new(GCounter::new()),
119 peripheral: RwLock::new(peripheral),
120 emergency: RwLock::new(None),
121 chat: RwLock::new(None),
122 version: AtomicU32::new(1),
123 }
124 }
125
126 pub fn node_id(&self) -> NodeId {
128 self.node_id
129 }
130
131 pub fn version(&self) -> u32 {
133 self.version.load(Ordering::Relaxed)
134 }
135
136 pub fn total_count(&self) -> u64 {
138 self.counter.read().unwrap().value()
139 }
140
141 pub fn local_count(&self) -> u64 {
143 self.counter.read().unwrap().node_count(&self.node_id)
144 }
145
146 pub fn current_event(&self) -> Option<EventType> {
148 self.peripheral
149 .read()
150 .unwrap()
151 .last_event
152 .as_ref()
153 .map(|e| e.event_type)
154 }
155
156 pub fn is_emergency_active(&self) -> bool {
158 self.current_event() == Some(EventType::Emergency)
159 }
160
161 pub fn is_ack_active(&self) -> bool {
163 self.current_event() == Some(EventType::Ack)
164 }
165
166 pub fn callsign(&self) -> String {
168 self.peripheral.read().unwrap().callsign_str().to_string()
169 }
170
171 pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
175 {
177 let mut peripheral = self.peripheral.write().unwrap();
178 peripheral.set_event(EventType::Emergency, timestamp);
179 }
180
181 self.increment_counter_internal();
183
184 self.build_document()
186 }
187
188 pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
190 {
192 let mut peripheral = self.peripheral.write().unwrap();
193 peripheral.set_event(EventType::Ack, timestamp);
194 }
195
196 self.increment_counter_internal();
198
199 self.build_document()
201 }
202
203 pub fn clear_event(&self) {
205 let mut peripheral = self.peripheral.write().unwrap();
206 peripheral.clear_event();
207 self.bump_version();
208 }
209
210 pub fn increment_counter(&self) {
212 self.increment_counter_internal();
213 }
214
215 pub fn update_health(&self, battery_percent: u8) {
217 let mut peripheral = self.peripheral.write().unwrap();
218 peripheral.health.battery_percent = battery_percent;
219 self.bump_version();
220 }
221
222 pub fn update_activity(&self, activity: u8) {
224 let mut peripheral = self.peripheral.write().unwrap();
225 peripheral.health.activity = activity;
226 self.bump_version();
227 }
228
229 pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
231 let mut peripheral = self.peripheral.write().unwrap();
232 peripheral.health.battery_percent = battery_percent;
233 peripheral.health.activity = activity;
234 self.bump_version();
235 }
236
237 pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
244 {
246 let mut emergency = self.emergency.write().unwrap();
247 *emergency = Some(EmergencyEvent::new(
248 self.node_id.as_u32(),
249 timestamp,
250 known_peers,
251 ));
252 }
253
254 {
256 let mut peripheral = self.peripheral.write().unwrap();
257 peripheral.set_event(EventType::Emergency, timestamp);
258 }
259
260 self.increment_counter_internal();
261 self.build_document()
262 }
263
264 pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
268 let changed = {
269 let mut emergency = self.emergency.write().unwrap();
270 if let Some(ref mut e) = *emergency {
271 e.ack(self.node_id.as_u32())
272 } else {
273 return None;
274 }
275 };
276
277 if changed {
278 {
280 let mut peripheral = self.peripheral.write().unwrap();
281 peripheral.set_event(EventType::Ack, timestamp);
282 }
283
284 self.increment_counter_internal();
285 }
286
287 Some(self.build_document())
288 }
289
290 pub fn clear_emergency(&self) {
292 let mut emergency = self.emergency.write().unwrap();
293 if emergency.is_some() {
294 *emergency = None;
295 drop(emergency);
296
297 let mut peripheral = self.peripheral.write().unwrap();
299 peripheral.clear_event();
300
301 self.bump_version();
302 }
303 }
304
305 pub fn has_active_emergency(&self) -> bool {
307 self.emergency.read().unwrap().is_some()
308 }
309
310 pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
314 let emergency = self.emergency.read().unwrap();
315 emergency.as_ref().map(|e| {
316 (
317 e.source_node(),
318 e.timestamp(),
319 e.ack_count(),
320 e.pending_nodes().len(),
321 )
322 })
323 }
324
325 pub fn has_peer_acked(&self, peer_id: u32) -> bool {
327 let emergency = self.emergency.read().unwrap();
328 emergency
329 .as_ref()
330 .map(|e| e.has_acked(peer_id))
331 .unwrap_or(false)
332 }
333
334 pub fn all_peers_acked(&self) -> bool {
336 let emergency = self.emergency.read().unwrap();
337 emergency.as_ref().map(|e| e.all_acked()).unwrap_or(true)
338 }
339
340 pub fn add_chat_message(&self, sender: &str, text: &str, timestamp: u64) -> bool {
346 let mut chat = self.chat.write().unwrap();
347
348 let our_chat = chat.get_or_insert_with(ChatCRDT::new);
349 let msg = ChatMessage::new(self.node_id.as_u32(), timestamp, sender, text);
350
351 if our_chat.add_message(msg) {
352 self.bump_version();
353 true
354 } else {
355 false
356 }
357 }
358
359 pub fn add_chat_reply(
363 &self,
364 sender: &str,
365 text: &str,
366 reply_to_node: u32,
367 reply_to_timestamp: u64,
368 timestamp: u64,
369 ) -> bool {
370 let mut chat = self.chat.write().unwrap();
371
372 let our_chat = chat.get_or_insert_with(ChatCRDT::new);
373 let mut msg = ChatMessage::new(self.node_id.as_u32(), timestamp, sender, text);
374 msg.set_reply_to(reply_to_node, reply_to_timestamp);
375
376 if our_chat.add_message(msg) {
377 self.bump_version();
378 true
379 } else {
380 false
381 }
382 }
383
384 pub fn chat_count(&self) -> usize {
386 self.chat.read().unwrap().as_ref().map_or(0, |c| c.len())
387 }
388
389 pub fn chat_messages_since(
393 &self,
394 since_timestamp: u64,
395 ) -> Vec<(u32, u64, String, String, u32, u64)> {
396 let chat = self.chat.read().unwrap();
397 chat.as_ref()
398 .map(|c| {
399 c.messages_since(since_timestamp)
400 .map(|m| {
401 (
402 m.origin_node,
403 m.timestamp,
404 m.sender().to_string(),
405 m.text().to_string(),
406 m.reply_to_node,
407 m.reply_to_timestamp,
408 )
409 })
410 .collect()
411 })
412 .unwrap_or_default()
413 }
414
415 pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
419 self.chat_messages_since(0)
420 }
421
422 pub fn chat_snapshot(&self) -> Option<ChatCRDT> {
424 self.chat.read().unwrap().clone()
425 }
426
427 pub fn counter_entries(&self) -> Vec<(u32, u64)> {
434 self.counter.read().unwrap().entries().collect()
435 }
436
437 pub fn peripheral_snapshot(&self) -> Peripheral {
441 self.peripheral.read().unwrap().clone()
442 }
443
444 pub fn emergency_snapshot(&self) -> Option<EmergencyEvent> {
448 self.emergency.read().unwrap().clone()
449 }
450
451 pub fn build_document(&self) -> Vec<u8> {
457 let counter = self.counter.read().unwrap().clone();
458 let peripheral = self.peripheral.read().unwrap().clone();
459 let emergency = self.emergency.read().unwrap().clone();
460
461 let chat = self.chat.read().unwrap().as_ref().map(|c| c.for_sync());
464
465 let doc = HiveDocument {
466 version: self.version.load(Ordering::Relaxed),
467 node_id: self.node_id,
468 counter,
469 peripheral: Some(peripheral),
470 emergency,
471 chat,
472 };
473
474 doc.encode()
475 }
476
477 pub fn merge_document(&self, data: &[u8]) -> Option<MergeResult> {
482 let received = HiveDocument::decode(data)?;
483
484 if received.node_id == self.node_id {
486 return None;
487 }
488
489 let counter_changed = {
491 let mut counter = self.counter.write().unwrap();
492 let old_value = counter.value();
493 counter.merge(&received.counter);
494 counter.value() != old_value
495 };
496
497 let emergency_changed = if let Some(ref received_emergency) = received.emergency {
499 let mut emergency = self.emergency.write().unwrap();
500 match &mut *emergency {
501 Some(ref mut our_emergency) => our_emergency.merge(received_emergency),
502 None => {
503 *emergency = Some(received_emergency.clone());
504 true
505 }
506 }
507 } else {
508 false
509 };
510
511 let chat_changed = if let Some(ref received_chat) = received.chat {
513 if !received_chat.is_empty() {
514 let mut chat = self.chat.write().unwrap();
515 match &mut *chat {
516 Some(ref mut our_chat) => our_chat.merge(received_chat),
517 None => {
518 *chat = Some(received_chat.clone());
519 true
520 }
521 }
522 } else {
523 false
524 }
525 } else {
526 false
527 };
528
529 if counter_changed || emergency_changed || chat_changed {
530 self.bump_version();
531 }
532
533 let event = received
535 .peripheral
536 .as_ref()
537 .and_then(|p| p.last_event.clone());
538
539 Some(MergeResult {
540 source_node: received.node_id,
541 event,
542 counter_changed,
543 emergency_changed,
544 chat_changed,
545 total_count: self.total_count(),
546 })
547 }
548
549 pub fn decode_document(data: &[u8]) -> Option<HiveDocument> {
551 HiveDocument::decode(data)
552 }
553
554 fn increment_counter_internal(&self) {
557 let mut counter = self.counter.write().unwrap();
558 counter.increment(&self.node_id, 1);
559 drop(counter);
560 self.bump_version();
561 }
562
563 fn bump_version(&self) {
564 self.version.fetch_add(1, Ordering::Relaxed);
565 }
566}
567
568#[derive(Debug, Clone)]
570pub struct DocumentCheck {
571 pub node_id: NodeId,
573 pub is_emergency: bool,
575 pub is_ack: bool,
577}
578
579impl DocumentCheck {
580 pub fn from_document(data: &[u8]) -> Option<Self> {
582 let doc = HiveDocument::decode(data)?;
583
584 let (is_emergency, is_ack) = doc
585 .peripheral
586 .as_ref()
587 .and_then(|p| p.last_event.as_ref())
588 .map(|e| {
589 (
590 e.event_type == EventType::Emergency,
591 e.event_type == EventType::Ack,
592 )
593 })
594 .unwrap_or((false, false));
595
596 Some(Self {
597 node_id: doc.node_id,
598 is_emergency,
599 is_ack,
600 })
601 }
602}
603
604#[cfg(test)]
605mod tests {
606 use super::*;
607
608 const TEST_TIMESTAMP: u64 = 1705276800000;
610
611 #[test]
612 fn test_document_sync_new() {
613 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
614
615 assert_eq!(sync.node_id().as_u32(), 0x12345678);
616 assert_eq!(sync.version(), 1);
617 assert_eq!(sync.total_count(), 0);
618 assert_eq!(sync.callsign(), "ALPHA-1");
619 assert!(sync.current_event().is_none());
620 }
621
622 #[test]
623 fn test_send_emergency() {
624 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
625
626 let doc_bytes = sync.send_emergency(TEST_TIMESTAMP);
627
628 assert!(!doc_bytes.is_empty());
629 assert_eq!(sync.total_count(), 1);
630 assert!(sync.is_emergency_active());
631 assert!(!sync.is_ack_active());
632
633 let doc = HiveDocument::decode(&doc_bytes).unwrap();
635 assert_eq!(doc.node_id.as_u32(), 0x12345678);
636 assert!(doc.peripheral.is_some());
637 let event = doc.peripheral.unwrap().last_event.unwrap();
638 assert_eq!(event.event_type, EventType::Emergency);
639 }
640
641 #[test]
642 fn test_send_ack() {
643 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
644
645 let doc_bytes = sync.send_ack(TEST_TIMESTAMP);
646
647 assert!(!doc_bytes.is_empty());
648 assert_eq!(sync.total_count(), 1);
649 assert!(sync.is_ack_active());
650 assert!(!sync.is_emergency_active());
651 }
652
653 #[test]
654 fn test_clear_event() {
655 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
656
657 sync.send_emergency(TEST_TIMESTAMP);
658 assert!(sync.is_emergency_active());
659
660 sync.clear_event();
661 assert!(sync.current_event().is_none());
662 }
663
664 #[test]
665 fn test_merge_document() {
666 let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
667 let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
668
669 let doc_bytes = sync2.send_emergency(TEST_TIMESTAMP);
671
672 let result = sync1.merge_document(&doc_bytes);
674 assert!(result.is_some());
675
676 let result = result.unwrap();
677 assert_eq!(result.source_node.as_u32(), 0x22222222);
678 assert!(result.is_emergency());
679 assert!(result.counter_changed);
680 assert_eq!(result.total_count, 1);
681
682 assert_eq!(sync1.local_count(), 0);
684 assert_eq!(sync1.total_count(), 1);
685 }
686
687 #[test]
688 fn test_merge_own_document_ignored() {
689 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
690
691 let doc_bytes = sync.send_emergency(TEST_TIMESTAMP);
692
693 let result = sync.merge_document(&doc_bytes);
695 assert!(result.is_none());
696 }
697
698 #[test]
699 fn test_version_increments() {
700 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
701
702 assert_eq!(sync.version(), 1);
703
704 sync.increment_counter();
705 assert_eq!(sync.version(), 2);
706
707 sync.send_emergency(TEST_TIMESTAMP);
708 assert_eq!(sync.version(), 3);
709
710 sync.clear_event();
711 assert_eq!(sync.version(), 4);
712 }
713
714 #[test]
715 fn test_document_check() {
716 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
717
718 let emergency_doc = sync.send_emergency(TEST_TIMESTAMP);
719 let check = DocumentCheck::from_document(&emergency_doc).unwrap();
720 assert_eq!(check.node_id.as_u32(), 0x12345678);
721 assert!(check.is_emergency);
722 assert!(!check.is_ack);
723
724 sync.clear_event();
725 let ack_doc = sync.send_ack(TEST_TIMESTAMP + 1000);
726 let check = DocumentCheck::from_document(&ack_doc).unwrap();
727 assert!(!check.is_emergency);
728 assert!(check.is_ack);
729 }
730
731 #[test]
732 fn test_counter_merge_idempotent() {
733 let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
734 let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
735
736 let doc_bytes = sync2.send_emergency(TEST_TIMESTAMP);
738
739 let result1 = sync1.merge_document(&doc_bytes).unwrap();
741 assert!(result1.counter_changed);
742 assert_eq!(sync1.total_count(), 1);
743
744 let result2 = sync1.merge_document(&doc_bytes).unwrap();
745 assert!(!result2.counter_changed); assert_eq!(sync1.total_count(), 1);
747 }
748}