1#[cfg(not(feature = "std"))]
49use alloc::{string::String, vec::Vec};
50#[cfg(feature = "std")]
51use std::sync::RwLock;
52
53#[cfg(not(feature = "std"))]
54use spin::RwLock;
55
56use core::sync::atomic::{AtomicU32, Ordering};
57
58use crate::document::{HiveDocument, MergeResult};
59use crate::sync::crdt::{
60 ChatCRDT, ChatMessage, EmergencyEvent, EventType, GCounter, Peripheral, PeripheralType,
61};
62use crate::NodeId;
63
64pub struct DocumentSync {
77 node_id: NodeId,
79
80 counter: RwLock<GCounter>,
82
83 peripheral: RwLock<Peripheral>,
85
86 emergency: RwLock<Option<EmergencyEvent>>,
88
89 chat: RwLock<Option<ChatCRDT>>,
91
92 version: AtomicU32,
94}
95
96impl DocumentSync {
97 pub fn new(node_id: NodeId, callsign: &str) -> Self {
99 let peripheral = Peripheral::new(node_id.as_u32(), PeripheralType::SoldierSensor)
100 .with_callsign(callsign);
101
102 Self {
103 node_id,
104 counter: RwLock::new(GCounter::new()),
105 peripheral: RwLock::new(peripheral),
106 emergency: RwLock::new(None),
107 chat: RwLock::new(None),
108 version: AtomicU32::new(1),
109 }
110 }
111
112 pub fn with_peripheral_type(node_id: NodeId, callsign: &str, ptype: PeripheralType) -> Self {
114 let peripheral = Peripheral::new(node_id.as_u32(), ptype).with_callsign(callsign);
115
116 Self {
117 node_id,
118 counter: RwLock::new(GCounter::new()),
119 peripheral: RwLock::new(peripheral),
120 emergency: RwLock::new(None),
121 chat: RwLock::new(None),
122 version: AtomicU32::new(1),
123 }
124 }
125
126 pub fn node_id(&self) -> NodeId {
128 self.node_id
129 }
130
131 pub fn version(&self) -> u32 {
133 self.version.load(Ordering::Relaxed)
134 }
135
136 pub fn total_count(&self) -> u64 {
138 self.counter.read().unwrap().value()
139 }
140
141 pub fn local_count(&self) -> u64 {
143 self.counter.read().unwrap().node_count(&self.node_id)
144 }
145
146 pub fn current_event(&self) -> Option<EventType> {
148 self.peripheral
149 .read()
150 .unwrap()
151 .last_event
152 .as_ref()
153 .map(|e| e.event_type)
154 }
155
156 pub fn is_emergency_active(&self) -> bool {
158 self.current_event() == Some(EventType::Emergency)
159 }
160
161 pub fn is_ack_active(&self) -> bool {
163 self.current_event() == Some(EventType::Ack)
164 }
165
166 pub fn callsign(&self) -> String {
168 self.peripheral.read().unwrap().callsign_str().to_string()
169 }
170
171 pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
175 {
177 let mut peripheral = self.peripheral.write().unwrap();
178 peripheral.set_event(EventType::Emergency, timestamp);
179 }
180
181 self.increment_counter_internal();
183
184 self.build_document()
186 }
187
188 pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
190 {
192 let mut peripheral = self.peripheral.write().unwrap();
193 peripheral.set_event(EventType::Ack, timestamp);
194 }
195
196 self.increment_counter_internal();
198
199 self.build_document()
201 }
202
203 pub fn clear_event(&self) {
205 let mut peripheral = self.peripheral.write().unwrap();
206 peripheral.clear_event();
207 self.bump_version();
208 }
209
210 pub fn increment_counter(&self) {
212 self.increment_counter_internal();
213 }
214
215 pub fn update_health(&self, battery_percent: u8) {
217 let mut peripheral = self.peripheral.write().unwrap();
218 peripheral.health.battery_percent = battery_percent;
219 self.bump_version();
220 }
221
222 pub fn update_activity(&self, activity: u8) {
224 let mut peripheral = self.peripheral.write().unwrap();
225 peripheral.health.activity = activity;
226 self.bump_version();
227 }
228
229 pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
231 let mut peripheral = self.peripheral.write().unwrap();
232 peripheral.health.battery_percent = battery_percent;
233 peripheral.health.activity = activity;
234 self.bump_version();
235 }
236
237 pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
244 {
246 let mut emergency = self.emergency.write().unwrap();
247 *emergency = Some(EmergencyEvent::new(
248 self.node_id.as_u32(),
249 timestamp,
250 known_peers,
251 ));
252 }
253
254 {
256 let mut peripheral = self.peripheral.write().unwrap();
257 peripheral.set_event(EventType::Emergency, timestamp);
258 }
259
260 self.increment_counter_internal();
261 self.build_document()
262 }
263
264 pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
268 let changed = {
269 let mut emergency = self.emergency.write().unwrap();
270 if let Some(ref mut e) = *emergency {
271 e.ack(self.node_id.as_u32())
272 } else {
273 return None;
274 }
275 };
276
277 if changed {
278 {
280 let mut peripheral = self.peripheral.write().unwrap();
281 peripheral.set_event(EventType::Ack, timestamp);
282 }
283
284 self.increment_counter_internal();
285 }
286
287 Some(self.build_document())
288 }
289
290 pub fn clear_emergency(&self) {
292 let mut emergency = self.emergency.write().unwrap();
293 if emergency.is_some() {
294 *emergency = None;
295 drop(emergency);
296
297 let mut peripheral = self.peripheral.write().unwrap();
299 peripheral.clear_event();
300
301 self.bump_version();
302 }
303 }
304
305 pub fn has_active_emergency(&self) -> bool {
307 self.emergency.read().unwrap().is_some()
308 }
309
310 pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
314 let emergency = self.emergency.read().unwrap();
315 emergency.as_ref().map(|e| {
316 (
317 e.source_node(),
318 e.timestamp(),
319 e.ack_count(),
320 e.pending_nodes().len(),
321 )
322 })
323 }
324
325 pub fn has_peer_acked(&self, peer_id: u32) -> bool {
327 let emergency = self.emergency.read().unwrap();
328 emergency
329 .as_ref()
330 .map(|e| e.has_acked(peer_id))
331 .unwrap_or(false)
332 }
333
334 pub fn all_peers_acked(&self) -> bool {
336 let emergency = self.emergency.read().unwrap();
337 emergency.as_ref().map(|e| e.all_acked()).unwrap_or(true)
338 }
339
340 pub fn add_chat_message(&self, sender: &str, text: &str, timestamp: u64) -> bool {
346 let mut chat = self.chat.write().unwrap();
347
348 let our_chat = chat.get_or_insert_with(ChatCRDT::new);
349 let msg = ChatMessage::new(self.node_id.as_u32(), timestamp, sender, text);
350
351 if our_chat.add_message(msg) {
352 self.bump_version();
353 true
354 } else {
355 false
356 }
357 }
358
359 pub fn add_chat_reply(
363 &self,
364 sender: &str,
365 text: &str,
366 reply_to_node: u32,
367 reply_to_timestamp: u64,
368 timestamp: u64,
369 ) -> bool {
370 let mut chat = self.chat.write().unwrap();
371
372 let our_chat = chat.get_or_insert_with(ChatCRDT::new);
373 let mut msg = ChatMessage::new(self.node_id.as_u32(), timestamp, sender, text);
374 msg.set_reply_to(reply_to_node, reply_to_timestamp);
375
376 if our_chat.add_message(msg) {
377 self.bump_version();
378 true
379 } else {
380 false
381 }
382 }
383
384 pub fn chat_count(&self) -> usize {
386 self.chat.read().unwrap().as_ref().map_or(0, |c| c.len())
387 }
388
389 pub fn chat_messages_since(
393 &self,
394 since_timestamp: u64,
395 ) -> Vec<(u32, u64, String, String, u32, u64)> {
396 let chat = self.chat.read().unwrap();
397 chat.as_ref()
398 .map(|c| {
399 c.messages_since(since_timestamp)
400 .map(|m| {
401 (
402 m.origin_node,
403 m.timestamp,
404 m.sender().to_string(),
405 m.text().to_string(),
406 m.reply_to_node,
407 m.reply_to_timestamp,
408 )
409 })
410 .collect()
411 })
412 .unwrap_or_default()
413 }
414
415 pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
419 self.chat_messages_since(0)
420 }
421
422 pub fn chat_snapshot(&self) -> Option<ChatCRDT> {
424 self.chat.read().unwrap().clone()
425 }
426
427 pub fn counter_entries(&self) -> Vec<(u32, u64)> {
434 self.counter.read().unwrap().entries().collect()
435 }
436
437 pub fn peripheral_snapshot(&self) -> Peripheral {
441 self.peripheral.read().unwrap().clone()
442 }
443
444 pub fn emergency_snapshot(&self) -> Option<EmergencyEvent> {
448 self.emergency.read().unwrap().clone()
449 }
450
451 pub fn build_document(&self) -> Vec<u8> {
457 let counter = self.counter.read().unwrap().clone();
458 let peripheral = self.peripheral.read().unwrap().clone();
459 let emergency = self.emergency.read().unwrap().clone();
460
461 let chat = self.chat.read().unwrap().as_ref().map(|c| c.for_sync());
464
465 let doc = HiveDocument {
466 version: self.version.load(Ordering::Relaxed),
467 node_id: self.node_id,
468 counter,
469 peripheral: Some(peripheral),
470 emergency,
471 chat,
472 };
473
474 doc.encode()
475 }
476
477 pub fn merge_document(&self, data: &[u8]) -> Option<MergeResult> {
482 let received = HiveDocument::decode(data)?;
483
484 if received.node_id == self.node_id {
486 return None;
487 }
488
489 let counter_changed = {
491 let mut counter = self.counter.write().unwrap();
492 let old_value = counter.value();
493 counter.merge(&received.counter);
494 counter.value() != old_value
495 };
496
497 let emergency_changed = if let Some(ref received_emergency) = received.emergency {
499 let mut emergency = self.emergency.write().unwrap();
500 match &mut *emergency {
501 Some(ref mut our_emergency) => our_emergency.merge(received_emergency),
502 None => {
503 *emergency = Some(received_emergency.clone());
504 true
505 }
506 }
507 } else {
508 false
509 };
510
511 let chat_changed = if let Some(ref received_chat) = received.chat {
513 if !received_chat.is_empty() {
514 let mut chat = self.chat.write().unwrap();
515 match &mut *chat {
516 Some(ref mut our_chat) => our_chat.merge(received_chat),
517 None => {
518 *chat = Some(received_chat.clone());
519 true
520 }
521 }
522 } else {
523 false
524 }
525 } else {
526 false
527 };
528
529 if counter_changed || emergency_changed || chat_changed {
530 self.bump_version();
531 }
532
533 let event = received
535 .peripheral
536 .as_ref()
537 .and_then(|p| p.last_event.clone());
538
539 Some(MergeResult {
540 source_node: received.node_id,
541 event,
542 counter_changed,
543 emergency_changed,
544 chat_changed,
545 total_count: self.total_count(),
546 })
547 }
548
549 pub fn decode_document(data: &[u8]) -> Option<HiveDocument> {
551 HiveDocument::decode(data)
552 }
553
554 fn increment_counter_internal(&self) {
557 let mut counter = self.counter.write().unwrap();
558 counter.increment(&self.node_id, 1);
559 drop(counter);
560 self.bump_version();
561 }
562
563 fn bump_version(&self) {
564 self.version.fetch_add(1, Ordering::Relaxed);
565 }
566}
567
568#[derive(Debug, Clone)]
570pub struct DocumentCheck {
571 pub node_id: NodeId,
573 pub is_emergency: bool,
575 pub is_ack: bool,
577}
578
579impl DocumentCheck {
580 pub fn from_document(data: &[u8]) -> Option<Self> {
582 let doc = HiveDocument::decode(data)?;
583
584 let (is_emergency, is_ack) = doc
585 .peripheral
586 .as_ref()
587 .and_then(|p| p.last_event.as_ref())
588 .map(|e| {
589 (
590 e.event_type == EventType::Emergency,
591 e.event_type == EventType::Ack,
592 )
593 })
594 .unwrap_or((false, false));
595
596 Some(Self {
597 node_id: doc.node_id,
598 is_emergency,
599 is_ack,
600 })
601 }
602}
603
604#[cfg(test)]
605mod tests {
606 use super::*;
607
608 #[test]
609 fn test_document_sync_new() {
610 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
611
612 assert_eq!(sync.node_id().as_u32(), 0x12345678);
613 assert_eq!(sync.version(), 1);
614 assert_eq!(sync.total_count(), 0);
615 assert_eq!(sync.callsign(), "ALPHA-1");
616 assert!(sync.current_event().is_none());
617 }
618
619 #[test]
620 fn test_send_emergency() {
621 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
622
623 let doc_bytes = sync.send_emergency(1234567890);
624
625 assert!(!doc_bytes.is_empty());
626 assert_eq!(sync.total_count(), 1);
627 assert!(sync.is_emergency_active());
628 assert!(!sync.is_ack_active());
629
630 let doc = HiveDocument::decode(&doc_bytes).unwrap();
632 assert_eq!(doc.node_id.as_u32(), 0x12345678);
633 assert!(doc.peripheral.is_some());
634 let event = doc.peripheral.unwrap().last_event.unwrap();
635 assert_eq!(event.event_type, EventType::Emergency);
636 }
637
638 #[test]
639 fn test_send_ack() {
640 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
641
642 let doc_bytes = sync.send_ack(1234567890);
643
644 assert!(!doc_bytes.is_empty());
645 assert_eq!(sync.total_count(), 1);
646 assert!(sync.is_ack_active());
647 assert!(!sync.is_emergency_active());
648 }
649
650 #[test]
651 fn test_clear_event() {
652 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
653
654 sync.send_emergency(1000);
655 assert!(sync.is_emergency_active());
656
657 sync.clear_event();
658 assert!(sync.current_event().is_none());
659 }
660
661 #[test]
662 fn test_merge_document() {
663 let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
664 let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
665
666 let doc_bytes = sync2.send_emergency(1000);
668
669 let result = sync1.merge_document(&doc_bytes);
671 assert!(result.is_some());
672
673 let result = result.unwrap();
674 assert_eq!(result.source_node.as_u32(), 0x22222222);
675 assert!(result.is_emergency());
676 assert!(result.counter_changed);
677 assert_eq!(result.total_count, 1);
678
679 assert_eq!(sync1.local_count(), 0);
681 assert_eq!(sync1.total_count(), 1);
682 }
683
684 #[test]
685 fn test_merge_own_document_ignored() {
686 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
687
688 let doc_bytes = sync.send_emergency(1000);
689
690 let result = sync.merge_document(&doc_bytes);
692 assert!(result.is_none());
693 }
694
695 #[test]
696 fn test_version_increments() {
697 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
698
699 assert_eq!(sync.version(), 1);
700
701 sync.increment_counter();
702 assert_eq!(sync.version(), 2);
703
704 sync.send_emergency(1000);
705 assert_eq!(sync.version(), 3);
706
707 sync.clear_event();
708 assert_eq!(sync.version(), 4);
709 }
710
711 #[test]
712 fn test_document_check() {
713 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
714
715 let emergency_doc = sync.send_emergency(1000);
716 let check = DocumentCheck::from_document(&emergency_doc).unwrap();
717 assert_eq!(check.node_id.as_u32(), 0x12345678);
718 assert!(check.is_emergency);
719 assert!(!check.is_ack);
720
721 sync.clear_event();
722 let ack_doc = sync.send_ack(2000);
723 let check = DocumentCheck::from_document(&ack_doc).unwrap();
724 assert!(!check.is_emergency);
725 assert!(check.is_ack);
726 }
727
728 #[test]
729 fn test_counter_merge_idempotent() {
730 let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
731 let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
732
733 let doc_bytes = sync2.send_emergency(1000);
735
736 let result1 = sync1.merge_document(&doc_bytes).unwrap();
738 assert!(result1.counter_changed);
739 assert_eq!(sync1.total_count(), 1);
740
741 let result2 = sync1.merge_document(&doc_bytes).unwrap();
742 assert!(!result2.counter_changed); assert_eq!(sync1.total_count(), 1);
744 }
745}