1#[cfg(not(feature = "std"))]
49use alloc::{string::String, vec::Vec};
50#[cfg(feature = "std")]
51use std::sync::RwLock;
52
53#[cfg(not(feature = "std"))]
54use spin::RwLock;
55
56use core::sync::atomic::{AtomicU32, Ordering};
57
58use crate::document::{HiveDocument, MergeResult};
59use crate::sync::crdt::{EmergencyEvent, EventType, GCounter, Peripheral, PeripheralType};
60use crate::NodeId;
61
62pub struct DocumentSync {
75 node_id: NodeId,
77
78 counter: RwLock<GCounter>,
80
81 peripheral: RwLock<Peripheral>,
83
84 emergency: RwLock<Option<EmergencyEvent>>,
86
87 version: AtomicU32,
89}
90
91impl DocumentSync {
92 pub fn new(node_id: NodeId, callsign: &str) -> Self {
94 let peripheral = Peripheral::new(node_id.as_u32(), PeripheralType::SoldierSensor)
95 .with_callsign(callsign);
96
97 Self {
98 node_id,
99 counter: RwLock::new(GCounter::new()),
100 peripheral: RwLock::new(peripheral),
101 emergency: RwLock::new(None),
102 version: AtomicU32::new(1),
103 }
104 }
105
106 pub fn with_peripheral_type(node_id: NodeId, callsign: &str, ptype: PeripheralType) -> Self {
108 let peripheral = Peripheral::new(node_id.as_u32(), ptype).with_callsign(callsign);
109
110 Self {
111 node_id,
112 counter: RwLock::new(GCounter::new()),
113 peripheral: RwLock::new(peripheral),
114 emergency: RwLock::new(None),
115 version: AtomicU32::new(1),
116 }
117 }
118
119 pub fn node_id(&self) -> NodeId {
121 self.node_id
122 }
123
124 pub fn version(&self) -> u32 {
126 self.version.load(Ordering::Relaxed)
127 }
128
129 pub fn total_count(&self) -> u64 {
131 self.counter.read().unwrap().value()
132 }
133
134 pub fn local_count(&self) -> u64 {
136 self.counter.read().unwrap().node_count(&self.node_id)
137 }
138
139 pub fn current_event(&self) -> Option<EventType> {
141 self.peripheral
142 .read()
143 .unwrap()
144 .last_event
145 .as_ref()
146 .map(|e| e.event_type)
147 }
148
149 pub fn is_emergency_active(&self) -> bool {
151 self.current_event() == Some(EventType::Emergency)
152 }
153
154 pub fn is_ack_active(&self) -> bool {
156 self.current_event() == Some(EventType::Ack)
157 }
158
159 pub fn callsign(&self) -> String {
161 self.peripheral.read().unwrap().callsign_str().to_string()
162 }
163
164 pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
168 {
170 let mut peripheral = self.peripheral.write().unwrap();
171 peripheral.set_event(EventType::Emergency, timestamp);
172 }
173
174 self.increment_counter_internal();
176
177 self.build_document()
179 }
180
181 pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
183 {
185 let mut peripheral = self.peripheral.write().unwrap();
186 peripheral.set_event(EventType::Ack, timestamp);
187 }
188
189 self.increment_counter_internal();
191
192 self.build_document()
194 }
195
196 pub fn clear_event(&self) {
198 let mut peripheral = self.peripheral.write().unwrap();
199 peripheral.clear_event();
200 self.bump_version();
201 }
202
203 pub fn increment_counter(&self) {
205 self.increment_counter_internal();
206 }
207
208 pub fn update_health(&self, battery_percent: u8) {
210 let mut peripheral = self.peripheral.write().unwrap();
211 peripheral.health.battery_percent = battery_percent;
212 self.bump_version();
213 }
214
215 pub fn update_activity(&self, activity: u8) {
217 let mut peripheral = self.peripheral.write().unwrap();
218 peripheral.health.activity = activity;
219 self.bump_version();
220 }
221
222 pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
224 let mut peripheral = self.peripheral.write().unwrap();
225 peripheral.health.battery_percent = battery_percent;
226 peripheral.health.activity = activity;
227 self.bump_version();
228 }
229
230 pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
237 {
239 let mut emergency = self.emergency.write().unwrap();
240 *emergency = Some(EmergencyEvent::new(
241 self.node_id.as_u32(),
242 timestamp,
243 known_peers,
244 ));
245 }
246
247 {
249 let mut peripheral = self.peripheral.write().unwrap();
250 peripheral.set_event(EventType::Emergency, timestamp);
251 }
252
253 self.increment_counter_internal();
254 self.build_document()
255 }
256
257 pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
261 let changed = {
262 let mut emergency = self.emergency.write().unwrap();
263 if let Some(ref mut e) = *emergency {
264 e.ack(self.node_id.as_u32())
265 } else {
266 return None;
267 }
268 };
269
270 if changed {
271 {
273 let mut peripheral = self.peripheral.write().unwrap();
274 peripheral.set_event(EventType::Ack, timestamp);
275 }
276
277 self.increment_counter_internal();
278 }
279
280 Some(self.build_document())
281 }
282
283 pub fn clear_emergency(&self) {
285 let mut emergency = self.emergency.write().unwrap();
286 if emergency.is_some() {
287 *emergency = None;
288 drop(emergency);
289
290 let mut peripheral = self.peripheral.write().unwrap();
292 peripheral.clear_event();
293
294 self.bump_version();
295 }
296 }
297
298 pub fn has_active_emergency(&self) -> bool {
300 self.emergency.read().unwrap().is_some()
301 }
302
303 pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
307 let emergency = self.emergency.read().unwrap();
308 emergency.as_ref().map(|e| {
309 (
310 e.source_node(),
311 e.timestamp(),
312 e.ack_count(),
313 e.pending_nodes().len(),
314 )
315 })
316 }
317
318 pub fn has_peer_acked(&self, peer_id: u32) -> bool {
320 let emergency = self.emergency.read().unwrap();
321 emergency
322 .as_ref()
323 .map(|e| e.has_acked(peer_id))
324 .unwrap_or(false)
325 }
326
327 pub fn all_peers_acked(&self) -> bool {
329 let emergency = self.emergency.read().unwrap();
330 emergency.as_ref().map(|e| e.all_acked()).unwrap_or(true)
331 }
332
333 pub fn build_document(&self) -> Vec<u8> {
339 let counter = self.counter.read().unwrap().clone();
340 let peripheral = self.peripheral.read().unwrap().clone();
341 let emergency = self.emergency.read().unwrap().clone();
342
343 let doc = HiveDocument {
344 version: self.version.load(Ordering::Relaxed),
345 node_id: self.node_id,
346 counter,
347 peripheral: Some(peripheral),
348 emergency,
349 };
350
351 doc.encode()
352 }
353
354 pub fn merge_document(&self, data: &[u8]) -> Option<MergeResult> {
359 let received = HiveDocument::decode(data)?;
360
361 if received.node_id == self.node_id {
363 return None;
364 }
365
366 let counter_changed = {
368 let mut counter = self.counter.write().unwrap();
369 let old_value = counter.value();
370 counter.merge(&received.counter);
371 counter.value() != old_value
372 };
373
374 let emergency_changed = if let Some(ref received_emergency) = received.emergency {
376 let mut emergency = self.emergency.write().unwrap();
377 match &mut *emergency {
378 Some(ref mut our_emergency) => our_emergency.merge(received_emergency),
379 None => {
380 *emergency = Some(received_emergency.clone());
381 true
382 }
383 }
384 } else {
385 false
386 };
387
388 if counter_changed || emergency_changed {
389 self.bump_version();
390 }
391
392 let event = received
394 .peripheral
395 .as_ref()
396 .and_then(|p| p.last_event.clone());
397
398 Some(MergeResult {
399 source_node: received.node_id,
400 event,
401 counter_changed,
402 emergency_changed,
403 total_count: self.total_count(),
404 })
405 }
406
407 pub fn decode_document(data: &[u8]) -> Option<HiveDocument> {
409 HiveDocument::decode(data)
410 }
411
412 fn increment_counter_internal(&self) {
415 let mut counter = self.counter.write().unwrap();
416 counter.increment(&self.node_id, 1);
417 drop(counter);
418 self.bump_version();
419 }
420
421 fn bump_version(&self) {
422 self.version.fetch_add(1, Ordering::Relaxed);
423 }
424}
425
426#[derive(Debug, Clone)]
428pub struct DocumentCheck {
429 pub node_id: NodeId,
431 pub is_emergency: bool,
433 pub is_ack: bool,
435}
436
437impl DocumentCheck {
438 pub fn from_document(data: &[u8]) -> Option<Self> {
440 let doc = HiveDocument::decode(data)?;
441
442 let (is_emergency, is_ack) = doc
443 .peripheral
444 .as_ref()
445 .and_then(|p| p.last_event.as_ref())
446 .map(|e| {
447 (
448 e.event_type == EventType::Emergency,
449 e.event_type == EventType::Ack,
450 )
451 })
452 .unwrap_or((false, false));
453
454 Some(Self {
455 node_id: doc.node_id,
456 is_emergency,
457 is_ack,
458 })
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use super::*;
465
466 #[test]
467 fn test_document_sync_new() {
468 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
469
470 assert_eq!(sync.node_id().as_u32(), 0x12345678);
471 assert_eq!(sync.version(), 1);
472 assert_eq!(sync.total_count(), 0);
473 assert_eq!(sync.callsign(), "ALPHA-1");
474 assert!(sync.current_event().is_none());
475 }
476
477 #[test]
478 fn test_send_emergency() {
479 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
480
481 let doc_bytes = sync.send_emergency(1234567890);
482
483 assert!(!doc_bytes.is_empty());
484 assert_eq!(sync.total_count(), 1);
485 assert!(sync.is_emergency_active());
486 assert!(!sync.is_ack_active());
487
488 let doc = HiveDocument::decode(&doc_bytes).unwrap();
490 assert_eq!(doc.node_id.as_u32(), 0x12345678);
491 assert!(doc.peripheral.is_some());
492 let event = doc.peripheral.unwrap().last_event.unwrap();
493 assert_eq!(event.event_type, EventType::Emergency);
494 }
495
496 #[test]
497 fn test_send_ack() {
498 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
499
500 let doc_bytes = sync.send_ack(1234567890);
501
502 assert!(!doc_bytes.is_empty());
503 assert_eq!(sync.total_count(), 1);
504 assert!(sync.is_ack_active());
505 assert!(!sync.is_emergency_active());
506 }
507
508 #[test]
509 fn test_clear_event() {
510 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
511
512 sync.send_emergency(1000);
513 assert!(sync.is_emergency_active());
514
515 sync.clear_event();
516 assert!(sync.current_event().is_none());
517 }
518
519 #[test]
520 fn test_merge_document() {
521 let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
522 let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
523
524 let doc_bytes = sync2.send_emergency(1000);
526
527 let result = sync1.merge_document(&doc_bytes);
529 assert!(result.is_some());
530
531 let result = result.unwrap();
532 assert_eq!(result.source_node.as_u32(), 0x22222222);
533 assert!(result.is_emergency());
534 assert!(result.counter_changed);
535 assert_eq!(result.total_count, 1);
536
537 assert_eq!(sync1.local_count(), 0);
539 assert_eq!(sync1.total_count(), 1);
540 }
541
542 #[test]
543 fn test_merge_own_document_ignored() {
544 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
545
546 let doc_bytes = sync.send_emergency(1000);
547
548 let result = sync.merge_document(&doc_bytes);
550 assert!(result.is_none());
551 }
552
553 #[test]
554 fn test_version_increments() {
555 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
556
557 assert_eq!(sync.version(), 1);
558
559 sync.increment_counter();
560 assert_eq!(sync.version(), 2);
561
562 sync.send_emergency(1000);
563 assert_eq!(sync.version(), 3);
564
565 sync.clear_event();
566 assert_eq!(sync.version(), 4);
567 }
568
569 #[test]
570 fn test_document_check() {
571 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
572
573 let emergency_doc = sync.send_emergency(1000);
574 let check = DocumentCheck::from_document(&emergency_doc).unwrap();
575 assert_eq!(check.node_id.as_u32(), 0x12345678);
576 assert!(check.is_emergency);
577 assert!(!check.is_ack);
578
579 sync.clear_event();
580 let ack_doc = sync.send_ack(2000);
581 let check = DocumentCheck::from_document(&ack_doc).unwrap();
582 assert!(!check.is_emergency);
583 assert!(check.is_ack);
584 }
585
586 #[test]
587 fn test_counter_merge_idempotent() {
588 let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
589 let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
590
591 let doc_bytes = sync2.send_emergency(1000);
593
594 let result1 = sync1.merge_document(&doc_bytes).unwrap();
596 assert!(result1.counter_changed);
597 assert_eq!(sync1.total_count(), 1);
598
599 let result2 = sync1.merge_document(&doc_bytes).unwrap();
600 assert!(!result2.counter_changed); assert_eq!(sync1.total_count(), 1);
602 }
603}