#[cfg(not(feature = "std"))]
use alloc::{string::String, vec::Vec};
#[cfg(feature = "std")]
use std::sync::RwLock;
#[cfg(not(feature = "std"))]
use spin::RwLock;
use core::sync::atomic::{AtomicU32, Ordering};
use crate::document::{MergeResult, PeatDocument};
#[cfg(feature = "legacy-chat")]
use crate::sync::crdt::{ChatCRDT, ChatMessage};
use crate::sync::crdt::{EmergencyEvent, EventType, GCounter, Peripheral, PeripheralType};
use crate::NodeId;
pub struct DocumentSync {
node_id: NodeId,
counter: RwLock<GCounter>,
peripheral: RwLock<Peripheral>,
emergency: RwLock<Option<EmergencyEvent>>,
#[cfg(feature = "legacy-chat")]
chat: RwLock<Option<ChatCRDT>>,
version: AtomicU32,
}
impl DocumentSync {
pub fn new(node_id: NodeId, callsign: &str) -> Self {
let peripheral = Peripheral::new(node_id.as_u32(), PeripheralType::SoldierSensor)
.with_callsign(callsign);
Self {
node_id,
counter: RwLock::new(GCounter::new()),
peripheral: RwLock::new(peripheral),
emergency: RwLock::new(None),
#[cfg(feature = "legacy-chat")]
chat: RwLock::new(None),
version: AtomicU32::new(1),
}
}
pub fn with_peripheral_type(node_id: NodeId, callsign: &str, ptype: PeripheralType) -> Self {
let peripheral = Peripheral::new(node_id.as_u32(), ptype).with_callsign(callsign);
Self {
node_id,
counter: RwLock::new(GCounter::new()),
peripheral: RwLock::new(peripheral),
emergency: RwLock::new(None),
#[cfg(feature = "legacy-chat")]
chat: RwLock::new(None),
version: AtomicU32::new(1),
}
}
pub fn node_id(&self) -> NodeId {
self.node_id
}
pub fn version(&self) -> u32 {
self.version.load(Ordering::Relaxed)
}
pub fn total_count(&self) -> u64 {
self.counter.read().unwrap().value()
}
pub fn local_count(&self) -> u64 {
self.counter.read().unwrap().node_count(&self.node_id)
}
pub fn current_event(&self) -> Option<EventType> {
self.peripheral
.read()
.unwrap()
.last_event
.as_ref()
.map(|e| e.event_type)
}
pub fn is_emergency_active(&self) -> bool {
self.current_event() == Some(EventType::Emergency)
}
pub fn is_ack_active(&self) -> bool {
self.current_event() == Some(EventType::Ack)
}
pub fn callsign(&self) -> String {
self.peripheral.read().unwrap().callsign_str().to_string()
}
pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
{
let mut peripheral = self.peripheral.write().unwrap();
peripheral.set_event(EventType::Emergency, timestamp);
}
self.increment_counter_internal();
self.build_document()
}
pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
{
let mut peripheral = self.peripheral.write().unwrap();
peripheral.set_event(EventType::Ack, timestamp);
}
self.increment_counter_internal();
self.build_document()
}
pub fn clear_event(&self) {
let mut peripheral = self.peripheral.write().unwrap();
peripheral.clear_event();
self.bump_version();
}
pub fn increment_counter(&self) {
self.increment_counter_internal();
}
pub fn update_health(&self, battery_percent: u8) {
let mut peripheral = self.peripheral.write().unwrap();
peripheral.health.battery_percent = battery_percent;
self.bump_version();
}
pub fn update_activity(&self, activity: u8) {
let mut peripheral = self.peripheral.write().unwrap();
peripheral.health.activity = activity;
self.bump_version();
}
pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
let mut peripheral = self.peripheral.write().unwrap();
peripheral.health.battery_percent = battery_percent;
peripheral.health.activity = activity;
self.bump_version();
}
pub fn update_heart_rate(&self, heart_rate: u8) {
let mut peripheral = self.peripheral.write().unwrap();
peripheral.health.heart_rate = Some(heart_rate);
self.bump_version();
}
pub fn update_alerts(&self, alerts: u8) {
let mut peripheral = self.peripheral.write().unwrap();
peripheral.health.alerts = alerts;
self.bump_version();
}
pub fn update_location(&self, latitude: f32, longitude: f32, altitude: Option<f32>) {
let mut peripheral = self.peripheral.write().unwrap();
peripheral.set_location(latitude, longitude, altitude);
self.bump_version();
}
pub fn clear_location(&self) {
let mut peripheral = self.peripheral.write().unwrap();
peripheral.clear_location();
self.bump_version();
}
pub fn update_callsign(&self, callsign: &str) {
let mut peripheral = self.peripheral.write().unwrap();
peripheral.set_callsign(callsign);
self.bump_version();
}
pub fn set_peripheral_event(&self, event_type: EventType, timestamp: u64) {
let mut peripheral = self.peripheral.write().unwrap();
peripheral.set_event(event_type, timestamp);
self.bump_version();
}
pub fn clear_peripheral_event(&self) {
let mut peripheral = self.peripheral.write().unwrap();
peripheral.clear_event();
self.bump_version();
}
#[allow(clippy::too_many_arguments)]
pub fn update_peripheral_state(
&self,
callsign: &str,
battery_percent: u8,
heart_rate: Option<u8>,
latitude: Option<f32>,
longitude: Option<f32>,
altitude: Option<f32>,
event_type: Option<EventType>,
timestamp: u64,
) {
let mut peripheral = self.peripheral.write().unwrap();
peripheral.set_callsign(callsign);
peripheral.health.battery_percent = battery_percent;
if let Some(hr) = heart_rate {
peripheral.health.heart_rate = Some(hr);
}
if let (Some(lat), Some(lon)) = (latitude, longitude) {
peripheral.set_location(lat, lon, altitude);
}
if let Some(evt) = event_type {
peripheral.set_event(evt, timestamp);
}
peripheral.timestamp = timestamp;
drop(peripheral);
self.bump_version();
}
pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
{
let mut emergency = self.emergency.write().unwrap();
*emergency = Some(EmergencyEvent::new(
self.node_id.as_u32(),
timestamp,
known_peers,
));
}
{
let mut peripheral = self.peripheral.write().unwrap();
peripheral.set_event(EventType::Emergency, timestamp);
}
self.increment_counter_internal();
self.build_document()
}
pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
let changed = {
let mut emergency = self.emergency.write().unwrap();
if let Some(ref mut e) = *emergency {
e.ack(self.node_id.as_u32())
} else {
return None;
}
};
if changed {
{
let mut peripheral = self.peripheral.write().unwrap();
peripheral.set_event(EventType::Ack, timestamp);
}
self.increment_counter_internal();
}
Some(self.build_document())
}
pub fn clear_emergency(&self) {
let mut emergency = self.emergency.write().unwrap();
if emergency.is_some() {
*emergency = None;
drop(emergency);
let mut peripheral = self.peripheral.write().unwrap();
peripheral.clear_event();
self.bump_version();
}
}
pub fn has_active_emergency(&self) -> bool {
self.emergency.read().unwrap().is_some()
}
pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
let emergency = self.emergency.read().unwrap();
emergency.as_ref().map(|e| {
(
e.source_node(),
e.timestamp(),
e.ack_count(),
e.pending_nodes().len(),
)
})
}
pub fn has_peer_acked(&self, peer_id: u32) -> bool {
let emergency = self.emergency.read().unwrap();
emergency
.as_ref()
.map(|e| e.has_acked(peer_id))
.unwrap_or(false)
}
pub fn all_peers_acked(&self) -> bool {
let emergency = self.emergency.read().unwrap();
emergency.as_ref().map(|e| e.all_acked()).unwrap_or(true)
}
#[cfg(feature = "legacy-chat")]
pub fn add_chat_message(&self, sender: &str, text: &str, timestamp: u64) -> bool {
let mut chat = self.chat.write().unwrap();
let our_chat = chat.get_or_insert_with(ChatCRDT::new);
let msg = ChatMessage::new(self.node_id.as_u32(), timestamp, sender, text);
if our_chat.add_message(msg) {
self.bump_version();
true
} else {
false
}
}
#[cfg(feature = "legacy-chat")]
pub fn add_chat_reply(
&self,
sender: &str,
text: &str,
reply_to_node: u32,
reply_to_timestamp: u64,
timestamp: u64,
) -> bool {
let mut chat = self.chat.write().unwrap();
let our_chat = chat.get_or_insert_with(ChatCRDT::new);
let mut msg = ChatMessage::new(self.node_id.as_u32(), timestamp, sender, text);
msg.set_reply_to(reply_to_node, reply_to_timestamp);
if our_chat.add_message(msg) {
self.bump_version();
true
} else {
false
}
}
#[cfg(feature = "legacy-chat")]
pub fn chat_count(&self) -> usize {
self.chat.read().unwrap().as_ref().map_or(0, |c| c.len())
}
#[cfg(feature = "legacy-chat")]
pub fn chat_messages_since(
&self,
since_timestamp: u64,
) -> Vec<(u32, u64, String, String, u32, u64)> {
let chat = self.chat.read().unwrap();
chat.as_ref()
.map(|c| {
c.messages_since(since_timestamp)
.map(|m| {
(
m.origin_node,
m.timestamp,
m.sender().to_string(),
m.text().to_string(),
m.reply_to_node,
m.reply_to_timestamp,
)
})
.collect()
})
.unwrap_or_default()
}
#[cfg(feature = "legacy-chat")]
pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
self.chat_messages_since(0)
}
#[cfg(feature = "legacy-chat")]
pub fn chat_snapshot(&self) -> Option<ChatCRDT> {
self.chat.read().unwrap().clone()
}
pub fn counter_entries(&self) -> Vec<(u32, u64)> {
self.counter.read().unwrap().entries().collect()
}
pub fn peripheral_snapshot(&self) -> Peripheral {
self.peripheral.read().unwrap().clone()
}
pub fn emergency_snapshot(&self) -> Option<EmergencyEvent> {
self.emergency.read().unwrap().clone()
}
pub fn build_document(&self) -> Vec<u8> {
let counter = self.counter.read().unwrap().clone();
let peripheral = self.peripheral.read().unwrap().clone();
let emergency = self.emergency.read().unwrap().clone();
#[cfg(feature = "legacy-chat")]
let chat = self.chat.read().unwrap().as_ref().map(|c| c.for_sync());
let doc = PeatDocument {
version: self.version.load(Ordering::Relaxed),
node_id: self.node_id,
counter,
peripheral: Some(peripheral),
emergency,
#[cfg(feature = "legacy-chat")]
chat,
};
doc.encode()
}
pub fn merge_document(&self, data: &[u8]) -> Option<MergeResult> {
let received = PeatDocument::decode(data)?;
if received.node_id == self.node_id {
return None;
}
let counter_changed = {
let mut counter = self.counter.write().unwrap();
let old_value = counter.value();
counter.merge(&received.counter);
counter.value() != old_value
};
let emergency_changed = if let Some(ref received_emergency) = received.emergency {
let mut emergency = self.emergency.write().unwrap();
match &mut *emergency {
Some(ref mut our_emergency) => our_emergency.merge(received_emergency),
None => {
*emergency = Some(received_emergency.clone());
true
}
}
} else {
false
};
#[cfg(feature = "legacy-chat")]
let chat_changed = if let Some(ref received_chat) = received.chat {
if !received_chat.is_empty() {
let mut chat = self.chat.write().unwrap();
match &mut *chat {
Some(ref mut our_chat) => our_chat.merge(received_chat),
None => {
*chat = Some(received_chat.clone());
true
}
}
} else {
false
}
} else {
false
};
#[cfg(not(feature = "legacy-chat"))]
let chat_changed = false;
if counter_changed || emergency_changed || chat_changed {
self.bump_version();
}
let event = received
.peripheral
.as_ref()
.and_then(|p| p.last_event.clone());
Some(MergeResult {
source_node: received.node_id,
event,
peer_peripheral: received.peripheral,
counter_changed,
emergency_changed,
chat_changed,
total_count: self.total_count(),
})
}
pub fn decode_document(data: &[u8]) -> Option<PeatDocument> {
PeatDocument::decode(data)
}
fn increment_counter_internal(&self) {
let mut counter = self.counter.write().unwrap();
counter.increment(&self.node_id, 1);
drop(counter);
self.bump_version();
}
fn bump_version(&self) {
self.version.fetch_add(1, Ordering::Relaxed);
}
}
#[derive(Debug, Clone)]
pub struct DocumentCheck {
pub node_id: NodeId,
pub is_emergency: bool,
pub is_ack: bool,
}
impl DocumentCheck {
pub fn from_document(data: &[u8]) -> Option<Self> {
let doc = PeatDocument::decode(data)?;
let (is_emergency, is_ack) = doc
.peripheral
.as_ref()
.and_then(|p| p.last_event.as_ref())
.map(|e| {
(
e.event_type == EventType::Emergency,
e.event_type == EventType::Ack,
)
})
.unwrap_or((false, false));
Some(Self {
node_id: doc.node_id,
is_emergency,
is_ack,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
const TEST_TIMESTAMP: u64 = 1705276800000;
#[test]
fn test_document_sync_new() {
let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
assert_eq!(sync.node_id().as_u32(), 0x12345678);
assert_eq!(sync.version(), 1);
assert_eq!(sync.total_count(), 0);
assert_eq!(sync.callsign(), "ALPHA-1");
assert!(sync.current_event().is_none());
}
#[test]
fn test_send_emergency() {
let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
let doc_bytes = sync.send_emergency(TEST_TIMESTAMP);
assert!(!doc_bytes.is_empty());
assert_eq!(sync.total_count(), 1);
assert!(sync.is_emergency_active());
assert!(!sync.is_ack_active());
let doc = PeatDocument::decode(&doc_bytes).unwrap();
assert_eq!(doc.node_id.as_u32(), 0x12345678);
assert!(doc.peripheral.is_some());
let event = doc.peripheral.unwrap().last_event.unwrap();
assert_eq!(event.event_type, EventType::Emergency);
}
#[test]
fn test_send_ack() {
let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
let doc_bytes = sync.send_ack(TEST_TIMESTAMP);
assert!(!doc_bytes.is_empty());
assert_eq!(sync.total_count(), 1);
assert!(sync.is_ack_active());
assert!(!sync.is_emergency_active());
}
#[test]
fn test_clear_event() {
let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
sync.send_emergency(TEST_TIMESTAMP);
assert!(sync.is_emergency_active());
sync.clear_event();
assert!(sync.current_event().is_none());
}
#[test]
fn test_merge_document() {
let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
let doc_bytes = sync2.send_emergency(TEST_TIMESTAMP);
let result = sync1.merge_document(&doc_bytes);
assert!(result.is_some());
let result = result.unwrap();
assert_eq!(result.source_node.as_u32(), 0x22222222);
assert!(result.is_emergency());
assert!(result.counter_changed);
assert_eq!(result.total_count, 1);
assert_eq!(sync1.local_count(), 0);
assert_eq!(sync1.total_count(), 1);
}
#[test]
fn test_merge_own_document_ignored() {
let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
let doc_bytes = sync.send_emergency(TEST_TIMESTAMP);
let result = sync.merge_document(&doc_bytes);
assert!(result.is_none());
}
#[test]
fn test_version_increments() {
let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
assert_eq!(sync.version(), 1);
sync.increment_counter();
assert_eq!(sync.version(), 2);
sync.send_emergency(TEST_TIMESTAMP);
assert_eq!(sync.version(), 3);
sync.clear_event();
assert_eq!(sync.version(), 4);
}
#[test]
fn test_document_check() {
let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
let emergency_doc = sync.send_emergency(TEST_TIMESTAMP);
let check = DocumentCheck::from_document(&emergency_doc).unwrap();
assert_eq!(check.node_id.as_u32(), 0x12345678);
assert!(check.is_emergency);
assert!(!check.is_ack);
sync.clear_event();
let ack_doc = sync.send_ack(TEST_TIMESTAMP + 1000);
let check = DocumentCheck::from_document(&ack_doc).unwrap();
assert!(!check.is_emergency);
assert!(check.is_ack);
}
#[test]
fn test_counter_merge_idempotent() {
let sync1 = DocumentSync::new(NodeId::new(0x11111111), "ALPHA-1");
let sync2 = DocumentSync::new(NodeId::new(0x22222222), "BRAVO-1");
let doc_bytes = sync2.send_emergency(TEST_TIMESTAMP);
let result1 = sync1.merge_document(&doc_bytes).unwrap();
assert!(result1.counter_changed);
assert_eq!(sync1.total_count(), 1);
let result2 = sync1.merge_document(&doc_bytes).unwrap();
assert!(!result2.counter_changed); assert_eq!(sync1.total_count(), 1);
}
#[test]
fn test_update_peripheral_state_preserves_location_on_none() {
let sync = DocumentSync::new(NodeId::new(0x12345678), "ALPHA-1");
sync.update_peripheral_state(
"ALPHA-1",
87,
Some(72),
Some(33.715794),
Some(-84.41128),
Some(285.5),
None,
TEST_TIMESTAMP,
);
let after_set = sync.peripheral_snapshot();
let loc = after_set.location.expect("location should be set");
assert_eq!(loc.latitude, 33.715794);
assert_eq!(loc.longitude, -84.41128);
sync.update_peripheral_state(
"ALPHA-1",
85,
Some(73),
None,
None,
None,
None,
TEST_TIMESTAMP + 1000,
);
let after_partial = sync.peripheral_snapshot();
let loc = after_partial
.location
.expect("partial update without lat/lon must preserve prior location");
assert_eq!(loc.latitude, 33.715794);
assert_eq!(loc.longitude, -84.41128);
assert_eq!(after_partial.health.battery_percent, 85);
}
}