1#[cfg(not(feature = "std"))]
34use alloc::{string::String, vec::Vec};
35#[cfg(feature = "std")]
36use std::sync::RwLock;
37
38#[cfg(not(feature = "std"))]
39use spin::RwLock;
40
41use core::sync::atomic::{AtomicU32, Ordering};
42
43use crate::document::{HiveDocument, MergeResult};
44use crate::sync::crdt::{EmergencyEvent, EventType, GCounter, Peripheral, PeripheralType};
45use crate::NodeId;
46
47pub struct DocumentSync {
60 node_id: NodeId,
62
63 counter: RwLock<GCounter>,
65
66 peripheral: RwLock<Peripheral>,
68
69 emergency: RwLock<Option<EmergencyEvent>>,
71
72 version: AtomicU32,
74}
75
76impl DocumentSync {
77 pub fn new(node_id: NodeId, callsign: &str) -> Self {
79 let peripheral = Peripheral::new(node_id.as_u32(), PeripheralType::SoldierSensor)
80 .with_callsign(callsign);
81
82 Self {
83 node_id,
84 counter: RwLock::new(GCounter::new()),
85 peripheral: RwLock::new(peripheral),
86 emergency: RwLock::new(None),
87 version: AtomicU32::new(1),
88 }
89 }
90
91 pub fn with_peripheral_type(node_id: NodeId, callsign: &str, ptype: PeripheralType) -> Self {
93 let peripheral = Peripheral::new(node_id.as_u32(), ptype).with_callsign(callsign);
94
95 Self {
96 node_id,
97 counter: RwLock::new(GCounter::new()),
98 peripheral: RwLock::new(peripheral),
99 emergency: RwLock::new(None),
100 version: AtomicU32::new(1),
101 }
102 }
103
104 pub fn node_id(&self) -> NodeId {
106 self.node_id
107 }
108
109 pub fn version(&self) -> u32 {
111 self.version.load(Ordering::Relaxed)
112 }
113
114 pub fn total_count(&self) -> u64 {
116 self.counter.read().unwrap().value()
117 }
118
119 pub fn local_count(&self) -> u64 {
121 self.counter.read().unwrap().node_count(&self.node_id)
122 }
123
124 pub fn current_event(&self) -> Option<EventType> {
126 self.peripheral
127 .read()
128 .unwrap()
129 .last_event
130 .as_ref()
131 .map(|e| e.event_type)
132 }
133
134 pub fn is_emergency_active(&self) -> bool {
136 self.current_event() == Some(EventType::Emergency)
137 }
138
139 pub fn is_ack_active(&self) -> bool {
141 self.current_event() == Some(EventType::Ack)
142 }
143
144 pub fn callsign(&self) -> String {
146 self.peripheral.read().unwrap().callsign_str().to_string()
147 }
148
149 pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
153 {
155 let mut peripheral = self.peripheral.write().unwrap();
156 peripheral.set_event(EventType::Emergency, timestamp);
157 }
158
159 self.increment_counter_internal();
161
162 self.build_document()
164 }
165
166 pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
168 {
170 let mut peripheral = self.peripheral.write().unwrap();
171 peripheral.set_event(EventType::Ack, timestamp);
172 }
173
174 self.increment_counter_internal();
176
177 self.build_document()
179 }
180
181 pub fn clear_event(&self) {
183 let mut peripheral = self.peripheral.write().unwrap();
184 peripheral.clear_event();
185 self.bump_version();
186 }
187
188 pub fn increment_counter(&self) {
190 self.increment_counter_internal();
191 }
192
193 pub fn update_health(&self, battery_percent: u8) {
195 let mut peripheral = self.peripheral.write().unwrap();
196 peripheral.health.battery_percent = battery_percent;
197 self.bump_version();
198 }
199
200 pub fn update_activity(&self, activity: u8) {
202 let mut peripheral = self.peripheral.write().unwrap();
203 peripheral.health.activity = activity;
204 self.bump_version();
205 }
206
207 pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
209 let mut peripheral = self.peripheral.write().unwrap();
210 peripheral.health.battery_percent = battery_percent;
211 peripheral.health.activity = activity;
212 self.bump_version();
213 }
214
215 pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
222 {
224 let mut emergency = self.emergency.write().unwrap();
225 *emergency = Some(EmergencyEvent::new(
226 self.node_id.as_u32(),
227 timestamp,
228 known_peers,
229 ));
230 }
231
232 {
234 let mut peripheral = self.peripheral.write().unwrap();
235 peripheral.set_event(EventType::Emergency, timestamp);
236 }
237
238 self.increment_counter_internal();
239 self.build_document()
240 }
241
242 pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
246 let changed = {
247 let mut emergency = self.emergency.write().unwrap();
248 if let Some(ref mut e) = *emergency {
249 e.ack(self.node_id.as_u32())
250 } else {
251 return None;
252 }
253 };
254
255 if changed {
256 {
258 let mut peripheral = self.peripheral.write().unwrap();
259 peripheral.set_event(EventType::Ack, timestamp);
260 }
261
262 self.increment_counter_internal();
263 }
264
265 Some(self.build_document())
266 }
267
268 pub fn clear_emergency(&self) {
270 let mut emergency = self.emergency.write().unwrap();
271 if emergency.is_some() {
272 *emergency = None;
273 drop(emergency);
274
275 let mut peripheral = self.peripheral.write().unwrap();
277 peripheral.clear_event();
278
279 self.bump_version();
280 }
281 }
282
283 pub fn has_active_emergency(&self) -> bool {
285 self.emergency.read().unwrap().is_some()
286 }
287
288 pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
292 let emergency = self.emergency.read().unwrap();
293 emergency.as_ref().map(|e| {
294 (
295 e.source_node(),
296 e.timestamp(),
297 e.ack_count(),
298 e.pending_nodes().len(),
299 )
300 })
301 }
302
303 pub fn has_peer_acked(&self, peer_id: u32) -> bool {
305 let emergency = self.emergency.read().unwrap();
306 emergency
307 .as_ref()
308 .map(|e| e.has_acked(peer_id))
309 .unwrap_or(false)
310 }
311
312 pub fn all_peers_acked(&self) -> bool {
314 let emergency = self.emergency.read().unwrap();
315 emergency.as_ref().map(|e| e.all_acked()).unwrap_or(true)
316 }
317
318 pub fn build_document(&self) -> Vec<u8> {
324 let counter = self.counter.read().unwrap().clone();
325 let peripheral = self.peripheral.read().unwrap().clone();
326 let emergency = self.emergency.read().unwrap().clone();
327
328 let doc = HiveDocument {
329 version: self.version.load(Ordering::Relaxed),
330 node_id: self.node_id,
331 counter,
332 peripheral: Some(peripheral),
333 emergency,
334 };
335
336 doc.encode()
337 }
338
339 pub fn merge_document(&self, data: &[u8]) -> Option<MergeResult> {
344 let received = HiveDocument::decode(data)?;
345
346 if received.node_id == self.node_id {
348 return None;
349 }
350
351 let counter_changed = {
353 let mut counter = self.counter.write().unwrap();
354 let old_value = counter.value();
355 counter.merge(&received.counter);
356 counter.value() != old_value
357 };
358
359 let emergency_changed = if let Some(ref received_emergency) = received.emergency {
361 let mut emergency = self.emergency.write().unwrap();
362 match &mut *emergency {
363 Some(ref mut our_emergency) => our_emergency.merge(received_emergency),
364 None => {
365 *emergency = Some(received_emergency.clone());
366 true
367 }
368 }
369 } else {
370 false
371 };
372
373 if counter_changed || emergency_changed {
374 self.bump_version();
375 }
376
377 let event = received
379 .peripheral
380 .as_ref()
381 .and_then(|p| p.last_event.clone());
382
383 Some(MergeResult {
384 source_node: received.node_id,
385 event,
386 counter_changed,
387 emergency_changed,
388 total_count: self.total_count(),
389 })
390 }
391
392 pub fn decode_document(data: &[u8]) -> Option<HiveDocument> {
394 HiveDocument::decode(data)
395 }
396
397 fn increment_counter_internal(&self) {
400 let mut counter = self.counter.write().unwrap();
401 counter.increment(&self.node_id, 1);
402 drop(counter);
403 self.bump_version();
404 }
405
406 fn bump_version(&self) {
407 self.version.fetch_add(1, Ordering::Relaxed);
408 }
409}
410
411#[derive(Debug, Clone)]
413pub struct DocumentCheck {
414 pub node_id: NodeId,
416 pub is_emergency: bool,
418 pub is_ack: bool,
420}
421
422impl DocumentCheck {
423 pub fn from_document(data: &[u8]) -> Option<Self> {
425 let doc = HiveDocument::decode(data)?;
426
427 let (is_emergency, is_ack) = doc
428 .peripheral
429 .as_ref()
430 .and_then(|p| p.last_event.as_ref())
431 .map(|e| {
432 (
433 e.event_type == EventType::Emergency,
434 e.event_type == EventType::Ack,
435 )
436 })
437 .unwrap_or((false, false));
438
439 Some(Self {
440 node_id: doc.node_id,
441 is_emergency,
442 is_ack,
443 })
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450
451 #[test]
452 fn test_document_sync_new() {
453 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
454
455 assert_eq!(sync.node_id().as_u32(), 0x12345678);
456 assert_eq!(sync.version(), 1);
457 assert_eq!(sync.total_count(), 0);
458 assert_eq!(sync.callsign(), "ALPHA-1");
459 assert!(sync.current_event().is_none());
460 }
461
462 #[test]
463 fn test_send_emergency() {
464 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
465
466 let doc_bytes = sync.send_emergency(1234567890);
467
468 assert!(!doc_bytes.is_empty());
469 assert_eq!(sync.total_count(), 1);
470 assert!(sync.is_emergency_active());
471 assert!(!sync.is_ack_active());
472
473 let doc = HiveDocument::decode(&doc_bytes).unwrap();
475 assert_eq!(doc.node_id.as_u32(), 0x12345678);
476 assert!(doc.peripheral.is_some());
477 let event = doc.peripheral.unwrap().last_event.unwrap();
478 assert_eq!(event.event_type, EventType::Emergency);
479 }
480
481 #[test]
482 fn test_send_ack() {
483 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
484
485 let doc_bytes = sync.send_ack(1234567890);
486
487 assert!(!doc_bytes.is_empty());
488 assert_eq!(sync.total_count(), 1);
489 assert!(sync.is_ack_active());
490 assert!(!sync.is_emergency_active());
491 }
492
493 #[test]
494 fn test_clear_event() {
495 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
496
497 sync.send_emergency(1000);
498 assert!(sync.is_emergency_active());
499
500 sync.clear_event();
501 assert!(sync.current_event().is_none());
502 }
503
504 #[test]
505 fn test_merge_document() {
506 let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
507 let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
508
509 let doc_bytes = sync2.send_emergency(1000);
511
512 let result = sync1.merge_document(&doc_bytes);
514 assert!(result.is_some());
515
516 let result = result.unwrap();
517 assert_eq!(result.source_node.as_u32(), 0x22222222);
518 assert!(result.is_emergency());
519 assert!(result.counter_changed);
520 assert_eq!(result.total_count, 1);
521
522 assert_eq!(sync1.local_count(), 0);
524 assert_eq!(sync1.total_count(), 1);
525 }
526
527 #[test]
528 fn test_merge_own_document_ignored() {
529 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
530
531 let doc_bytes = sync.send_emergency(1000);
532
533 let result = sync.merge_document(&doc_bytes);
535 assert!(result.is_none());
536 }
537
538 #[test]
539 fn test_version_increments() {
540 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
541
542 assert_eq!(sync.version(), 1);
543
544 sync.increment_counter();
545 assert_eq!(sync.version(), 2);
546
547 sync.send_emergency(1000);
548 assert_eq!(sync.version(), 3);
549
550 sync.clear_event();
551 assert_eq!(sync.version(), 4);
552 }
553
554 #[test]
555 fn test_document_check() {
556 let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
557
558 let emergency_doc = sync.send_emergency(1000);
559 let check = DocumentCheck::from_document(&emergency_doc).unwrap();
560 assert_eq!(check.node_id.as_u32(), 0x12345678);
561 assert!(check.is_emergency);
562 assert!(!check.is_ack);
563
564 sync.clear_event();
565 let ack_doc = sync.send_ack(2000);
566 let check = DocumentCheck::from_document(&ack_doc).unwrap();
567 assert!(!check.is_emergency);
568 assert!(check.is_ack);
569 }
570
571 #[test]
572 fn test_counter_merge_idempotent() {
573 let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
574 let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
575
576 let doc_bytes = sync2.send_emergency(1000);
578
579 let result1 = sync1.merge_document(&doc_bytes).unwrap();
581 assert!(result1.counter_changed);
582 assert_eq!(sync1.total_count(), 1);
583
584 let result2 = sync1.merge_document(&doc_bytes).unwrap();
585 assert!(!result2.counter_changed); assert_eq!(sync1.total_count(), 1);
587 }
588}