1#[cfg(not(feature = "std"))]
57use alloc::{string::String, sync::Arc, vec::Vec};
58#[cfg(feature = "std")]
59use std::collections::HashMap;
60#[cfg(feature = "std")]
61use std::sync::Arc;
62
63use crate::document::{ENCRYPTED_MARKER, KEY_EXCHANGE_MARKER, PEER_E2EE_MARKER};
64#[cfg(feature = "mesh-translator")]
65use crate::document::{
66 TRANSLATOR_FRAME_MARKER, TRANSLATOR_RESERVED_MARKER_END, TRANSLATOR_RESERVED_MARKER_START,
67};
68use crate::document_sync::DocumentSync;
69use crate::gossip::{GossipStrategy, RandomFanout};
70use crate::observer::{DisconnectReason, PeatEvent, PeatObserver, SecurityViolationKind};
71use crate::peer::{
72 ConnectionStateGraph, FullStateCountSummary, IndirectPeer, PeatPeer, PeerConnectionState,
73 PeerDegree, PeerManagerConfig, StateCountSummary,
74};
75use crate::peer_manager::PeerManager;
76use crate::relay::{
77 MessageId, RelayEnvelope, SeenMessageCache, DEFAULT_MAX_HOPS, DEFAULT_SEEN_TTL_MS,
78 RELAY_ENVELOPE_MARKER,
79};
80use crate::security::{
81 DeviceIdentity, IdentityAttestation, IdentityRegistry, KeyExchangeMessage, MeshEncryptionKey,
82 PeerEncryptedMessage, PeerSessionManager, RegistryResult, SessionState,
83};
84use crate::sync::crdt::{EventType, Peripheral, PeripheralType};
85use crate::sync::delta::{DeltaEncoder, DeltaStats};
86use crate::sync::delta_document::{DeltaDocument, Operation};
87use crate::NodeId;
88
89#[cfg(feature = "std")]
90use crate::observer::ObserverManager;
91
92use crate::registry::DocumentRegistry;
93
94#[derive(Debug, Clone)]
96pub struct PeatMeshConfig {
97 pub node_id: NodeId,
99
100 pub callsign: String,
102
103 pub mesh_id: String,
105
106 pub peripheral_type: PeripheralType,
108
109 pub peer_config: PeerManagerConfig,
111
112 pub sync_interval_ms: u64,
114
115 pub auto_broadcast_events: bool,
117
118 pub encryption_secret: Option<[u8; 32]>,
124
125 pub strict_encryption: bool,
133
134 pub enable_relay: bool,
141
142 pub max_relay_hops: u8,
147
148 pub relay_fanout: usize,
154
155 pub seen_cache_ttl_ms: u64,
160}
161
162impl PeatMeshConfig {
163 pub fn new(node_id: NodeId, callsign: &str, mesh_id: &str) -> Self {
165 Self {
166 node_id,
167 callsign: callsign.into(),
168 mesh_id: mesh_id.into(),
169 peripheral_type: PeripheralType::SoldierSensor,
170 peer_config: PeerManagerConfig::with_mesh_id(mesh_id),
171 sync_interval_ms: 5000,
172 auto_broadcast_events: true,
173 encryption_secret: None,
174 strict_encryption: false,
175 enable_relay: false,
176 max_relay_hops: DEFAULT_MAX_HOPS,
177 relay_fanout: 2,
178 seen_cache_ttl_ms: DEFAULT_SEEN_TTL_MS,
179 }
180 }
181
182 pub fn with_encryption(mut self, secret: [u8; 32]) -> Self {
187 self.encryption_secret = Some(secret);
188 self
189 }
190
191 pub fn with_peripheral_type(mut self, ptype: PeripheralType) -> Self {
193 self.peripheral_type = ptype;
194 self
195 }
196
197 pub fn with_sync_interval(mut self, interval_ms: u64) -> Self {
199 self.sync_interval_ms = interval_ms;
200 self
201 }
202
203 pub fn with_peer_timeout(mut self, timeout_ms: u64) -> Self {
205 self.peer_config.peer_timeout_ms = timeout_ms;
206 self
207 }
208
209 pub fn with_max_peers(mut self, max: usize) -> Self {
211 self.peer_config.max_peers = max;
212 self
213 }
214
215 pub fn with_strict_encryption(mut self) -> Self {
223 self.strict_encryption = true;
224 self
225 }
226
227 pub fn with_relay(mut self) -> Self {
232 self.enable_relay = true;
233 self
234 }
235
236 pub fn with_max_relay_hops(mut self, max_hops: u8) -> Self {
240 self.max_relay_hops = max_hops;
241 self
242 }
243
244 pub fn with_relay_fanout(mut self, fanout: usize) -> Self {
248 self.relay_fanout = fanout.max(1);
249 self
250 }
251
252 pub fn with_seen_cache_ttl(mut self, ttl_ms: u64) -> Self {
256 self.seen_cache_ttl_ms = ttl_ms;
257 self
258 }
259}
260
261#[cfg(feature = "std")]
263type AppDocumentStore =
264 std::sync::RwLock<HashMap<(u8, u32, u64), Box<dyn core::any::Any + Send + Sync>>>;
265
266#[cfg(feature = "std")]
271pub struct PeatMesh {
272 config: PeatMeshConfig,
274
275 peer_manager: PeerManager,
277
278 document_sync: DocumentSync,
280
281 observers: ObserverManager,
283
284 last_sync_ms: std::sync::atomic::AtomicU32,
286
287 last_cleanup_ms: std::sync::atomic::AtomicU32,
289
290 encryption_key: Option<MeshEncryptionKey>,
292
293 peer_sessions: std::sync::Mutex<Option<PeerSessionManager>>,
295
296 connection_graph: std::sync::Mutex<ConnectionStateGraph>,
298
299 seen_cache: std::sync::Mutex<SeenMessageCache>,
301
302 gossip_strategy: Box<dyn GossipStrategy>,
304
305 delta_encoder: std::sync::Mutex<DeltaEncoder>,
310
311 identity: Option<DeviceIdentity>,
316
317 identity_registry: std::sync::Mutex<IdentityRegistry>,
321
322 peer_peripherals: std::sync::RwLock<HashMap<NodeId, Peripheral>>,
327
328 document_registry: DocumentRegistry,
333
334 app_documents: AppDocumentStore,
340
341 #[cfg(feature = "mesh-translator")]
350 ble_translator: std::sync::RwLock<crate::translator::BleTranslator>,
351
352 #[cfg(feature = "mesh-translator")]
362 decoded_document_callback: std::sync::RwLock<Option<Arc<dyn crate::DecodedDocumentCallback>>>,
363}
364
365#[cfg(feature = "std")]
366impl PeatMesh {
367 pub fn new(config: PeatMeshConfig) -> Self {
369 let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
370 let document_sync = DocumentSync::with_peripheral_type(
371 config.node_id,
372 &config.callsign,
373 config.peripheral_type,
374 );
375
376 let encryption_key = config
378 .encryption_secret
379 .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
380
381 let connection_graph = ConnectionStateGraph::with_config(
383 config.peer_config.rssi_degraded_threshold,
384 config.peer_config.lost_timeout_ms,
385 );
386
387 let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
389
390 let gossip_strategy: Box<dyn GossipStrategy> =
392 Box::new(RandomFanout::new(config.relay_fanout));
393
394 let delta_encoder = DeltaEncoder::new(config.node_id);
396
397 let document_registry = DocumentRegistry::new();
398
399 Self {
400 config,
401 peer_manager,
402 document_sync,
403 observers: ObserverManager::new(),
404 last_sync_ms: std::sync::atomic::AtomicU32::new(0),
405 last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
406 encryption_key,
407 peer_sessions: std::sync::Mutex::new(None),
408 connection_graph: std::sync::Mutex::new(connection_graph),
409 seen_cache: std::sync::Mutex::new(seen_cache),
410 gossip_strategy,
411 delta_encoder: std::sync::Mutex::new(delta_encoder),
412 identity: None,
413 identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
414 peer_peripherals: std::sync::RwLock::new(HashMap::new()),
415 document_registry,
416 app_documents: std::sync::RwLock::new(HashMap::new()),
417 #[cfg(feature = "mesh-translator")]
418 ble_translator: std::sync::RwLock::new(
419 crate::translator::BleTranslator::with_defaults(),
420 ),
421 #[cfg(feature = "mesh-translator")]
422 decoded_document_callback: std::sync::RwLock::new(None),
423 }
424 }
425
426 pub fn with_identity(config: PeatMeshConfig, identity: DeviceIdentity) -> Self {
432 let mut config = config;
434 config.node_id = identity.node_id();
435
436 let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
437 let document_sync = DocumentSync::with_peripheral_type(
438 config.node_id,
439 &config.callsign,
440 config.peripheral_type,
441 );
442
443 let encryption_key = config
444 .encryption_secret
445 .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
446
447 let connection_graph = ConnectionStateGraph::with_config(
448 config.peer_config.rssi_degraded_threshold,
449 config.peer_config.lost_timeout_ms,
450 );
451
452 let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
453 let gossip_strategy: Box<dyn GossipStrategy> =
454 Box::new(RandomFanout::new(config.relay_fanout));
455 let delta_encoder = DeltaEncoder::new(config.node_id);
456
457 let document_registry = DocumentRegistry::new();
458
459 Self {
460 config,
461 peer_manager,
462 document_sync,
463 observers: ObserverManager::new(),
464 last_sync_ms: std::sync::atomic::AtomicU32::new(0),
465 last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
466 encryption_key,
467 peer_sessions: std::sync::Mutex::new(None),
468 connection_graph: std::sync::Mutex::new(connection_graph),
469 seen_cache: std::sync::Mutex::new(seen_cache),
470 gossip_strategy,
471 delta_encoder: std::sync::Mutex::new(delta_encoder),
472 identity: Some(identity),
473 identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
474 peer_peripherals: std::sync::RwLock::new(HashMap::new()),
475 document_registry,
476 app_documents: std::sync::RwLock::new(HashMap::new()),
477 #[cfg(feature = "mesh-translator")]
478 ble_translator: std::sync::RwLock::new(
479 crate::translator::BleTranslator::with_defaults(),
480 ),
481 #[cfg(feature = "mesh-translator")]
482 decoded_document_callback: std::sync::RwLock::new(None),
483 }
484 }
485
486 pub fn from_genesis(
494 genesis: &crate::security::MeshGenesis,
495 identity: DeviceIdentity,
496 callsign: &str,
497 ) -> Self {
498 let config = PeatMeshConfig::new(identity.node_id(), callsign, &genesis.mesh_id())
499 .with_encryption(genesis.encryption_secret());
500
501 Self::with_identity(config, identity)
502 }
503
504 #[cfg(feature = "std")]
530 pub fn from_persisted(
531 state: crate::security::PersistedState,
532 callsign: &str,
533 ) -> Result<Self, crate::security::PersistenceError> {
534 let identity = state.restore_identity()?;
536
537 let genesis = state.restore_genesis();
539
540 let mesh = if let Some(ref gen) = genesis {
542 Self::from_genesis(gen, identity, callsign)
543 } else {
544 let config = PeatMeshConfig::new(identity.node_id(), callsign, "RESTORED");
545 Self::with_identity(config, identity)
546 };
547
548 let restored_registry = state.restore_registry();
550 if let Ok(mut registry) = mesh.identity_registry.lock() {
551 *registry = restored_registry;
552 }
553
554 log::info!(
555 "PeatMesh restored from persisted state: node_id={:08X}, known_peers={}",
556 mesh.config.node_id.as_u32(),
557 mesh.known_identity_count()
558 );
559
560 Ok(mesh)
561 }
562
563 #[cfg(feature = "std")]
576 pub fn to_persisted_state(
577 &self,
578 genesis: Option<&crate::security::MeshGenesis>,
579 ) -> Option<crate::security::PersistedState> {
580 let identity = self.identity.as_ref()?;
581 let registry = self.identity_registry.lock().ok()?;
582
583 Some(crate::security::PersistedState::with_registry(
584 identity, genesis, ®istry,
585 ))
586 }
587
588 #[cfg(feature = "mesh-translator")]
602 pub fn set_decoded_document_callback(&self, cb: Arc<dyn crate::DecodedDocumentCallback>) {
603 if let Ok(mut slot) = self.decoded_document_callback.write() {
604 *slot = Some(cb);
605 }
606 }
607
608 #[cfg(feature = "mesh-translator")]
612 pub fn set_translator_config(&self, config: crate::translator::TranslationConfig) {
613 if let Ok(mut t) = self.ble_translator.write() {
614 *t = crate::translator::BleTranslator::new(config);
615 }
616 }
617
618 #[cfg(feature = "mesh-translator")]
631 fn try_handle_translator_marker(
632 &self,
633 decrypted: &[u8],
634 peer: Option<&str>,
635 source_node: Option<NodeId>,
636 ) -> bool {
637 if decrypted.is_empty() {
638 return false;
639 }
640 let marker = decrypted[0];
641
642 if (TRANSLATOR_RESERVED_MARKER_START..=TRANSLATOR_RESERVED_MARKER_END).contains(&marker) {
648 log::warn!(
649 "ble: dropping reserved translator-marker frame (marker=0x{marker:02X}, len={})",
650 decrypted.len()
651 );
652 return true;
653 }
654
655 if marker != TRANSLATOR_FRAME_MARKER {
656 return false;
657 }
658
659 if decrypted.len() < 2 {
661 log::warn!(
662 "ble: dropping truncated translator frame (len={}, missing collection code)",
663 decrypted.len()
664 );
665 return true;
666 }
667
668 let code = decrypted[1];
669 let payload = &decrypted[2..];
670
671 let (collection, decode_result) = {
674 let translator = match self.ble_translator.read() {
675 Ok(g) => g,
676 Err(_) => {
677 log::warn!("ble: translator RwLock poisoned; dropping frame");
678 return true;
679 }
680 };
681 let collection = match translator.code_to_collection(code) {
682 Some(c) => c.to_string(),
683 None => {
684 log::warn!(
685 "ble: dropping translator frame with unknown collection code 0x{code:02X}"
686 );
687 return true;
688 }
689 };
690
691 let mut ctx =
692 peat_mesh::transport::TranslationContext::inbound(peer.unwrap_or("unknown"))
693 .with_collection(collection.clone());
694 if let Some(node) = source_node {
695 ctx = ctx.with_local_wire_id(format!("{:08X}", node.as_u32()));
696 }
697
698 let result = translator.decode_inbound_sync(payload, &ctx);
699 (collection, result)
700 };
701
702 match decode_result {
703 Ok(Some(doc)) => {
704 let cb = self
707 .decoded_document_callback
708 .read()
709 .ok()
710 .and_then(|g| g.as_ref().cloned());
711 match cb {
712 Some(cb) => cb.on_document(&collection, doc, peer),
713 None => {
714 log::debug!(
724 "ble: decoded {} frame but no DecodedDocumentCallback installed",
725 collection
726 );
727 self.notify(crate::observer::PeatEvent::TranslatorNoCallback {
728 collection: collection.clone(),
729 peer: peer.map(str::to_string),
730 });
731 }
732 }
733 }
734 Ok(None) => {
735 log::debug!(
736 "ble: codec declined translator frame for collection {}",
737 collection
738 );
739 }
740 Err(e) => {
741 log::warn!(
742 "ble: translator frame decode error (collection={}): {:#}",
743 collection,
744 e
745 );
746 }
747 }
748
749 true
750 }
751
752 pub fn is_encryption_enabled(&self) -> bool {
756 self.encryption_key.is_some()
757 }
758
759 pub fn is_strict_encryption_enabled(&self) -> bool {
763 self.config.strict_encryption && self.encryption_key.is_some()
764 }
765
766 pub fn enable_encryption(&mut self, secret: &[u8; 32]) {
771 self.encryption_key = Some(MeshEncryptionKey::from_shared_secret(
772 &self.config.mesh_id,
773 secret,
774 ));
775 }
776
777 pub fn disable_encryption(&mut self) {
779 self.encryption_key = None;
780 }
781
782 fn encrypt_document(&self, plaintext: &[u8]) -> Vec<u8> {
787 match &self.encryption_key {
788 Some(key) => {
789 match key.encrypt_to_bytes(plaintext) {
791 Ok(ciphertext) => {
792 let mut buf = Vec::with_capacity(2 + ciphertext.len());
793 buf.push(ENCRYPTED_MARKER);
794 buf.push(0x00); buf.extend_from_slice(&ciphertext);
796 buf
797 }
798 Err(e) => {
799 log::error!("Encryption failed: {}", e);
800 plaintext.to_vec()
802 }
803 }
804 }
805 None => plaintext.to_vec(),
806 }
807 }
808
809 fn decrypt_document<'a>(
817 &self,
818 data: &'a [u8],
819 source_hint: Option<&str>,
820 ) -> Option<std::borrow::Cow<'a, [u8]>> {
821 log::debug!(
822 "decrypt_document: len={}, first_byte=0x{:02X}, source={:?}",
823 data.len(),
824 data.first().copied().unwrap_or(0),
825 source_hint
826 );
827
828 if data.len() >= 2 && data[0] == ENCRYPTED_MARKER {
830 let _reserved = data[1];
832 let encrypted_payload = &data[2..];
833
834 log::debug!(
835 "decrypt_document: encrypted payload len={}, nonce+ciphertext",
836 encrypted_payload.len()
837 );
838
839 match &self.encryption_key {
840 Some(key) => match key.decrypt_from_bytes(encrypted_payload) {
841 Ok(plaintext) => {
842 log::debug!(
843 "decrypt_document: SUCCESS, plaintext len={}",
844 plaintext.len()
845 );
846 Some(std::borrow::Cow::Owned(plaintext))
847 }
848 Err(e) => {
849 log::warn!(
850 "decrypt_document: FAILED (wrong key or corrupted): {} [payload_len={}, source={:?}]",
851 e,
852 encrypted_payload.len(),
853 source_hint
854 );
855 self.notify(PeatEvent::SecurityViolation {
856 kind: SecurityViolationKind::DecryptionFailed,
857 source: source_hint.map(String::from),
858 });
859 None
860 }
861 },
862 None => {
863 log::warn!(
864 "decrypt_document: encryption not enabled but received encrypted doc"
865 );
866 None
867 }
868 }
869 } else {
870 if self.config.strict_encryption && self.encryption_key.is_some() {
873 log::warn!(
874 "Rejected unencrypted document in strict encryption mode (source: {:?})",
875 source_hint
876 );
877 self.notify(PeatEvent::SecurityViolation {
878 kind: SecurityViolationKind::UnencryptedInStrictMode,
879 source: source_hint.map(String::from),
880 });
881 None
882 } else {
883 Some(std::borrow::Cow::Borrowed(data))
885 }
886 }
887 }
888
889 pub fn decrypt_only(&self, data: &[u8]) -> Option<Vec<u8>> {
903 self.decrypt_document(data, None)
904 .map(|cow| cow.into_owned())
905 }
906
907 pub fn has_identity(&self) -> bool {
911 self.identity.is_some()
912 }
913
914 pub fn public_key(&self) -> Option<[u8; 32]> {
916 self.identity.as_ref().map(|id| id.public_key())
917 }
918
919 pub fn create_attestation(&self, now_ms: u64) -> Option<IdentityAttestation> {
923 self.identity
924 .as_ref()
925 .map(|id| id.create_attestation(now_ms))
926 }
927
928 pub fn verify_peer_identity(&self, attestation: &IdentityAttestation) -> RegistryResult {
937 self.identity_registry
938 .lock()
939 .unwrap()
940 .verify_or_register(attestation)
941 }
942
943 pub fn is_peer_identity_known(&self, node_id: NodeId) -> bool {
945 self.identity_registry.lock().unwrap().is_known(node_id)
946 }
947
948 pub fn peer_public_key(&self, node_id: NodeId) -> Option<[u8; 32]> {
950 self.identity_registry
951 .lock()
952 .unwrap()
953 .get_public_key(node_id)
954 .copied()
955 }
956
957 pub fn known_identity_count(&self) -> usize {
959 self.identity_registry.lock().unwrap().len()
960 }
961
962 pub fn pre_register_peer_identity(&self, node_id: NodeId, public_key: [u8; 32], now_ms: u64) {
967 self.identity_registry
968 .lock()
969 .unwrap()
970 .pre_register(node_id, public_key, now_ms);
971 }
972
973 pub fn forget_peer_identity(&self, node_id: NodeId) {
977 self.identity_registry.lock().unwrap().remove(node_id);
978 }
979
980 pub fn sign(&self, data: &[u8]) -> Option<[u8; 64]> {
984 self.identity.as_ref().map(|id| id.sign(data))
985 }
986
987 pub fn verify_peer_signature(
992 &self,
993 node_id: NodeId,
994 data: &[u8],
995 signature: &[u8; 64],
996 ) -> bool {
997 if let Some(public_key) = self.peer_public_key(node_id) {
998 crate::security::verify_signature(&public_key, data, signature)
999 } else {
1000 false
1001 }
1002 }
1003
1004 pub fn is_relay_enabled(&self) -> bool {
1008 self.config.enable_relay
1009 }
1010
1011 pub fn enable_relay(&mut self) {
1013 self.config.enable_relay = true;
1014 }
1015
1016 pub fn disable_relay(&mut self) {
1018 self.config.enable_relay = false;
1019 }
1020
1021 pub fn has_seen_message(&self, message_id: &MessageId) -> bool {
1025 self.seen_cache.lock().unwrap().has_seen(message_id)
1026 }
1027
1028 pub fn mark_message_seen(&self, message_id: MessageId, origin: NodeId, now_ms: u64) -> bool {
1032 self.seen_cache
1033 .lock()
1034 .unwrap()
1035 .check_and_mark(message_id, origin, now_ms)
1036 }
1037
1038 pub fn seen_cache_size(&self) -> usize {
1040 self.seen_cache.lock().unwrap().len()
1041 }
1042
1043 pub fn clear_seen_cache(&self) {
1045 self.seen_cache.lock().unwrap().clear();
1046 }
1047
1048 pub fn wrap_for_relay(&self, payload: Vec<u8>) -> Vec<u8> {
1053 let envelope = RelayEnvelope::broadcast(self.config.node_id, payload)
1054 .with_max_hops(self.config.max_relay_hops);
1055 envelope.encode()
1056 }
1057
1058 pub fn get_relay_targets(&self, exclude_peer: Option<NodeId>) -> Vec<PeatPeer> {
1063 let connected = self.peer_manager.get_connected_peers();
1064 let filtered: Vec<_> = if let Some(exclude) = exclude_peer {
1065 connected
1066 .into_iter()
1067 .filter(|p| p.node_id != exclude)
1068 .collect()
1069 } else {
1070 connected
1071 };
1072
1073 self.gossip_strategy
1074 .select_peers(&filtered)
1075 .into_iter()
1076 .cloned()
1077 .collect()
1078 }
1079
1080 pub fn process_relay_envelope(
1090 &self,
1091 data: &[u8],
1092 source_peer: NodeId,
1093 now_ms: u64,
1094 ) -> Option<RelayDecision> {
1095 let envelope = RelayEnvelope::decode(data)?;
1097
1098 if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
1101 let is_new = self.connection_graph.lock().unwrap().on_relay_received(
1102 source_peer,
1103 envelope.origin_node,
1104 envelope.hop_count,
1105 now_ms,
1106 );
1107
1108 if is_new {
1109 log::debug!(
1110 "Discovered indirect peer {:08X} via {:08X} ({} hops)",
1111 envelope.origin_node.as_u32(),
1112 source_peer.as_u32(),
1113 envelope.hop_count
1114 );
1115 }
1116 }
1117
1118 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
1120 let stats = self
1122 .seen_cache
1123 .lock()
1124 .unwrap()
1125 .get_stats(&envelope.message_id);
1126 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
1127
1128 self.notify(PeatEvent::DuplicateMessageDropped {
1129 origin_node: envelope.origin_node,
1130 seen_count,
1131 });
1132
1133 log::debug!(
1134 "Dropped duplicate message {} from {:08X} (seen {} times)",
1135 envelope.message_id,
1136 envelope.origin_node.as_u32(),
1137 seen_count
1138 );
1139 return None;
1140 }
1141
1142 if !envelope.can_relay() {
1144 self.notify(PeatEvent::MessageTtlExpired {
1145 origin_node: envelope.origin_node,
1146 hop_count: envelope.hop_count,
1147 });
1148
1149 log::debug!(
1150 "Message {} from {:08X} TTL expired at hop {}",
1151 envelope.message_id,
1152 envelope.origin_node.as_u32(),
1153 envelope.hop_count
1154 );
1155
1156 return Some(RelayDecision {
1158 payload: envelope.payload,
1159 origin_node: envelope.origin_node,
1160 hop_count: envelope.hop_count,
1161 should_relay: false,
1162 relay_envelope: None,
1163 });
1164 }
1165
1166 let should_relay = self.config.enable_relay;
1168 let relay_envelope = if should_relay {
1169 envelope.relay() } else {
1171 None
1172 };
1173
1174 Some(RelayDecision {
1175 payload: envelope.payload,
1176 origin_node: envelope.origin_node,
1177 hop_count: envelope.hop_count,
1178 should_relay,
1179 relay_envelope,
1180 })
1181 }
1182
1183 pub fn build_relay_document(&self) -> Vec<u8> {
1188 let doc = self.build_document(); self.wrap_for_relay(doc)
1190 }
1191
1192 pub fn register_peer_for_delta(&self, peer_id: &NodeId) {
1199 let mut encoder = self.delta_encoder.lock().unwrap();
1200 encoder.add_peer(peer_id);
1201 log::debug!(
1202 "Registered peer {:08X} for delta sync tracking",
1203 peer_id.as_u32()
1204 );
1205 }
1206
1207 pub fn unregister_peer_for_delta(&self, peer_id: &NodeId) {
1211 let mut encoder = self.delta_encoder.lock().unwrap();
1212 encoder.remove_peer(peer_id);
1213 log::debug!(
1214 "Unregistered peer {:08X} from delta sync tracking",
1215 peer_id.as_u32()
1216 );
1217 }
1218
1219 pub fn reset_peer_delta_state(&self, peer_id: &NodeId) {
1224 let mut encoder = self.delta_encoder.lock().unwrap();
1225 encoder.reset_peer(peer_id);
1226 log::debug!("Reset delta sync state for peer {:08X}", peer_id.as_u32());
1227 }
1228
1229 pub fn record_delta_sent(&self, peer_id: &NodeId, bytes: usize) {
1231 let mut encoder = self.delta_encoder.lock().unwrap();
1232 encoder.record_sent(peer_id, bytes);
1233 }
1234
1235 pub fn record_delta_received(&self, peer_id: &NodeId, bytes: usize, timestamp: u64) {
1237 let mut encoder = self.delta_encoder.lock().unwrap();
1238 encoder.record_received(peer_id, bytes, timestamp);
1239 }
1240
1241 pub fn delta_stats(&self) -> DeltaStats {
1246 self.delta_encoder.lock().unwrap().stats()
1247 }
1248
1249 pub fn peer_delta_stats(&self, peer_id: &NodeId) -> Option<(u64, u64, u32)> {
1253 let encoder = self.delta_encoder.lock().unwrap();
1254 encoder
1255 .get_peer_state(peer_id)
1256 .map(|state| (state.bytes_sent, state.bytes_received, state.sync_count))
1257 }
1258
1259 pub fn build_delta_document_for_peer(&self, peer_id: &NodeId, now_ms: u64) -> Option<Vec<u8>> {
1267 let mut all_operations: Vec<Operation> = Vec::new();
1269
1270 for (node_id_u32, count) in self.document_sync.counter_entries() {
1273 all_operations.push(Operation::IncrementCounter {
1274 counter_id: 0, node_id: NodeId::new(node_id_u32),
1276 amount: count,
1277 timestamp: count, });
1279 }
1280
1281 let peripheral = self.document_sync.peripheral_snapshot();
1284 let peripheral_timestamp = peripheral
1285 .last_event
1286 .as_ref()
1287 .map(|e| e.timestamp)
1288 .unwrap_or(1); all_operations.push(Operation::UpdatePeripheral {
1290 peripheral,
1291 timestamp: peripheral_timestamp,
1292 });
1293
1294 if let Some(emergency) = self.document_sync.emergency_snapshot() {
1296 let source_node = NodeId::new(emergency.source_node());
1297 let timestamp = emergency.timestamp();
1298
1299 all_operations.push(Operation::SetEmergency {
1301 source_node,
1302 timestamp,
1303 known_peers: emergency.all_nodes(),
1304 });
1305
1306 for acked_node in emergency.acked_nodes() {
1308 all_operations.push(Operation::AckEmergency {
1309 node_id: NodeId::new(acked_node),
1310 emergency_timestamp: timestamp,
1311 });
1312 }
1313 }
1314
1315 for app_op in self.app_document_delta_ops() {
1317 all_operations.push(Operation::App(app_op));
1318 }
1319
1320 let filtered_operations: Vec<Operation> = {
1322 let encoder = self.delta_encoder.lock().unwrap();
1323 if let Some(peer_state) = encoder.get_peer_state(peer_id) {
1324 all_operations
1325 .into_iter()
1326 .filter(|op| peer_state.needs_send(&op.key(), op.timestamp()))
1327 .collect()
1328 } else {
1329 all_operations
1331 }
1332 };
1333
1334 if filtered_operations.is_empty() {
1336 return None;
1337 }
1338
1339 {
1341 let mut encoder = self.delta_encoder.lock().unwrap();
1342 if let Some(peer_state) = encoder.get_peer_state_mut(peer_id) {
1343 for op in &filtered_operations {
1344 peer_state.mark_sent(&op.key(), op.timestamp());
1345 }
1346 }
1347 }
1348
1349 let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1351 for op in filtered_operations {
1352 delta.add_operation(op);
1353 }
1354
1355 let encoded = delta.encode();
1357 let result = self.encrypt_document(&encoded);
1358
1359 {
1361 let mut encoder = self.delta_encoder.lock().unwrap();
1362 encoder.record_sent(peer_id, result.len());
1363 }
1364
1365 Some(result)
1366 }
1367
1368 pub fn build_full_delta_document(&self, now_ms: u64) -> Vec<u8> {
1373 let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1374
1375 for (node_id_u32, count) in self.document_sync.counter_entries() {
1377 delta.add_operation(Operation::IncrementCounter {
1378 counter_id: 0,
1379 node_id: NodeId::new(node_id_u32),
1380 amount: count,
1381 timestamp: now_ms,
1382 });
1383 }
1384
1385 let peripheral = self.document_sync.peripheral_snapshot();
1387 let peripheral_timestamp = peripheral
1388 .last_event
1389 .as_ref()
1390 .map(|e| e.timestamp)
1391 .unwrap_or(now_ms);
1392 delta.add_operation(Operation::UpdatePeripheral {
1393 peripheral,
1394 timestamp: peripheral_timestamp,
1395 });
1396
1397 if let Some(emergency) = self.document_sync.emergency_snapshot() {
1399 let source_node = NodeId::new(emergency.source_node());
1400 let timestamp = emergency.timestamp();
1401
1402 delta.add_operation(Operation::SetEmergency {
1403 source_node,
1404 timestamp,
1405 known_peers: emergency.all_nodes(),
1406 });
1407
1408 for acked_node in emergency.acked_nodes() {
1409 delta.add_operation(Operation::AckEmergency {
1410 node_id: NodeId::new(acked_node),
1411 emergency_timestamp: timestamp,
1412 });
1413 }
1414 }
1415
1416 for app_op in self.app_document_delta_ops() {
1418 delta.add_operation(Operation::App(app_op));
1419 }
1420
1421 let encoded = delta.encode();
1422 self.encrypt_document(&encoded)
1423 }
1424
1425 fn process_delta_document_internal(
1429 &self,
1430 source_node: NodeId,
1431 data: &[u8],
1432 now_ms: u64,
1433 relay_data: Option<Vec<u8>>,
1434 origin_node: Option<NodeId>,
1435 hop_count: u8,
1436 ) -> Option<DataReceivedResult> {
1437 let delta = DeltaDocument::decode(data)?;
1439
1440 if delta.origin_node == self.config.node_id {
1442 return None;
1443 }
1444
1445 let mut counter_changed = false;
1447 let mut emergency_changed = false;
1448 let mut is_emergency = false;
1449 let mut is_ack = false;
1450 let mut event_timestamp = 0u64;
1451 let mut peer_peripheral: Option<crate::sync::crdt::Peripheral> = None;
1452
1453 log::info!(
1454 "Delta document from {:08X}: {} operations, data_len={}",
1455 delta.origin_node.as_u32(),
1456 delta.operations.len(),
1457 data.len()
1458 );
1459 for op in &delta.operations {
1460 log::info!(" Operation: {}", op.key());
1461 match op {
1462 Operation::IncrementCounter {
1463 node_id, amount, ..
1464 } => {
1465 let current = self.document_sync.counter_entries();
1467 let current_value = current
1468 .iter()
1469 .find(|(id, _)| *id == node_id.as_u32())
1470 .map(|(_, v)| *v)
1471 .unwrap_or(0);
1472
1473 if *amount > current_value {
1474 counter_changed = true;
1477 }
1478 }
1479 Operation::UpdatePeripheral {
1480 peripheral,
1481 timestamp,
1482 } => {
1483 if let Ok(mut peripherals) = self.peer_peripherals.write() {
1485 peripherals.insert(delta.origin_node, peripheral.clone());
1486 }
1487 peer_peripheral = Some(peripheral.clone());
1489 if *timestamp > event_timestamp {
1491 event_timestamp = *timestamp;
1492 }
1493 }
1494 Operation::SetEmergency { timestamp, .. } => {
1495 is_emergency = true;
1496 emergency_changed = true;
1497 event_timestamp = *timestamp;
1498 }
1499 Operation::AckEmergency {
1500 emergency_timestamp,
1501 ..
1502 } => {
1503 is_ack = true;
1504 emergency_changed = true;
1505 if *emergency_timestamp > event_timestamp {
1506 event_timestamp = *emergency_timestamp;
1507 }
1508 }
1509 Operation::ClearEmergency {
1510 emergency_timestamp,
1511 } => {
1512 emergency_changed = true;
1513 if *emergency_timestamp > event_timestamp {
1514 event_timestamp = *emergency_timestamp;
1515 }
1516 }
1517 Operation::App(app_op) => {
1518 let doc_timestamp = app_op.timestamp & 0x0000_FFFF_FFFF_FFFF;
1524
1525 log::info!(
1526 "App operation received: type={:02X} op_code={:02X} from {:08X} ts={} payload_len={}",
1527 app_op.type_id,
1528 app_op.op_code,
1529 app_op.source_node,
1530 doc_timestamp,
1531 app_op.payload.len()
1532 );
1533
1534 let doc_key = (app_op.type_id, app_op.source_node, doc_timestamp);
1536 let changed = {
1537 let mut docs = self.app_documents.write().unwrap();
1538
1539 if let Some(existing) = docs.get_mut(&doc_key) {
1540 self.document_registry.apply_delta_op(
1542 app_op.type_id,
1543 existing.as_mut(),
1544 app_op,
1545 )
1546 } else {
1547 if let Some(decoded) = self
1549 .document_registry
1550 .decode(app_op.type_id, &app_op.payload)
1551 {
1552 docs.insert(doc_key, decoded);
1553 true
1554 } else {
1555 log::debug!(
1558 "Received delta for unknown doc {:?}, waiting for full state",
1559 doc_key
1560 );
1561 false
1562 }
1563 }
1564 };
1565
1566 self.observers.notify(PeatEvent::app_document_received(
1568 app_op.type_id,
1569 NodeId::new(app_op.source_node),
1570 doc_timestamp,
1571 changed,
1572 ));
1573 }
1574 }
1575 }
1576
1577 self.peer_manager.record_sync(source_node, now_ms);
1579
1580 {
1582 let mut encoder = self.delta_encoder.lock().unwrap();
1583 encoder.record_received(&source_node, data.len(), now_ms);
1584 }
1585
1586 if is_emergency {
1588 self.notify(PeatEvent::EmergencyReceived {
1589 from_node: delta.origin_node,
1590 });
1591 } else if is_ack {
1592 self.notify(PeatEvent::AckReceived {
1593 from_node: delta.origin_node,
1594 });
1595 }
1596
1597 if counter_changed {
1598 let total_count = self.document_sync.total_count();
1599 self.notify(PeatEvent::DocumentSynced {
1600 from_node: delta.origin_node,
1601 total_count,
1602 });
1603 }
1604
1605 if relay_data.is_some() {
1607 let relay_targets = self.get_relay_targets(Some(source_node));
1608 self.notify(PeatEvent::MessageRelayed {
1609 origin_node: origin_node.unwrap_or(delta.origin_node),
1610 relay_count: relay_targets.len(),
1611 hop_count,
1612 });
1613 }
1614
1615 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
1616 DataReceivedResult::peripheral_fields(&peer_peripheral);
1617
1618 Some(DataReceivedResult {
1619 source_node: delta.origin_node,
1620 is_emergency,
1621 is_ack,
1622 counter_changed,
1623 emergency_changed,
1624 total_count: self.document_sync.total_count(),
1625 event_timestamp,
1626 relay_data,
1627 origin_node,
1628 hop_count,
1629 callsign,
1630 battery_percent,
1631 heart_rate,
1632 event_type,
1633 latitude,
1634 longitude,
1635 altitude,
1636 })
1637 }
1638
1639 pub fn enable_peer_e2ee(&self) {
1647 let mut sessions = self.peer_sessions.lock().unwrap();
1648 if sessions.is_none() {
1649 *sessions = Some(PeerSessionManager::new(self.config.node_id));
1650 log::info!(
1651 "Per-peer E2EE enabled for node {:08X}",
1652 self.config.node_id.as_u32()
1653 );
1654 }
1655 }
1656
1657 pub fn disable_peer_e2ee(&self) {
1661 let mut sessions = self.peer_sessions.lock().unwrap();
1662 *sessions = None;
1663 log::info!("Per-peer E2EE disabled");
1664 }
1665
1666 pub fn is_peer_e2ee_enabled(&self) -> bool {
1668 self.peer_sessions.lock().unwrap().is_some()
1669 }
1670
1671 pub fn peer_e2ee_public_key(&self) -> Option<[u8; 32]> {
1675 self.peer_sessions
1676 .lock()
1677 .unwrap()
1678 .as_ref()
1679 .map(|s| s.our_public_key())
1680 }
1681
1682 pub fn initiate_peer_e2ee(&self, peer_node_id: NodeId, now_ms: u64) -> Option<Vec<u8>> {
1688 let mut sessions = self.peer_sessions.lock().unwrap();
1689 let session_mgr = sessions.as_mut()?;
1690
1691 let key_exchange = session_mgr.initiate_session(peer_node_id, now_ms);
1692 let mut buf = Vec::with_capacity(2 + 37);
1693 buf.push(KEY_EXCHANGE_MARKER);
1694 buf.push(0x00); buf.extend_from_slice(&key_exchange.encode());
1696
1697 log::info!(
1698 "Initiated E2EE session with peer {:08X}",
1699 peer_node_id.as_u32()
1700 );
1701 Some(buf)
1702 }
1703
1704 pub fn has_peer_e2ee_session(&self, peer_node_id: NodeId) -> bool {
1706 self.peer_sessions
1707 .lock()
1708 .unwrap()
1709 .as_ref()
1710 .is_some_and(|s| s.has_session(peer_node_id))
1711 }
1712
1713 pub fn peer_e2ee_session_state(&self, peer_node_id: NodeId) -> Option<SessionState> {
1715 self.peer_sessions
1716 .lock()
1717 .unwrap()
1718 .as_ref()
1719 .and_then(|s| s.session_state(peer_node_id))
1720 }
1721
1722 pub fn send_peer_e2ee(
1727 &self,
1728 peer_node_id: NodeId,
1729 plaintext: &[u8],
1730 now_ms: u64,
1731 ) -> Option<Vec<u8>> {
1732 let mut sessions = self.peer_sessions.lock().unwrap();
1733 let session_mgr = sessions.as_mut()?;
1734
1735 match session_mgr.encrypt_for_peer(peer_node_id, plaintext, now_ms) {
1736 Ok(encrypted) => {
1737 let mut buf = Vec::with_capacity(2 + encrypted.encode().len());
1738 buf.push(PEER_E2EE_MARKER);
1739 buf.push(0x00); buf.extend_from_slice(&encrypted.encode());
1741 Some(buf)
1742 }
1743 Err(e) => {
1744 log::warn!(
1745 "Failed to encrypt for peer {:08X}: {:?}",
1746 peer_node_id.as_u32(),
1747 e
1748 );
1749 None
1750 }
1751 }
1752 }
1753
1754 pub fn close_peer_e2ee(&self, peer_node_id: NodeId) {
1756 let mut sessions = self.peer_sessions.lock().unwrap();
1757 if let Some(session_mgr) = sessions.as_mut() {
1758 session_mgr.close_session(peer_node_id);
1759 self.notify(PeatEvent::PeerE2eeClosed { peer_node_id });
1760 log::info!(
1761 "Closed E2EE session with peer {:08X}",
1762 peer_node_id.as_u32()
1763 );
1764 }
1765 }
1766
1767 pub fn peer_e2ee_session_count(&self) -> usize {
1769 self.peer_sessions
1770 .lock()
1771 .unwrap()
1772 .as_ref()
1773 .map(|s| s.session_count())
1774 .unwrap_or(0)
1775 }
1776
1777 pub fn peer_e2ee_established_count(&self) -> usize {
1779 self.peer_sessions
1780 .lock()
1781 .unwrap()
1782 .as_ref()
1783 .map(|s| s.established_count())
1784 .unwrap_or(0)
1785 }
1786
1787 fn handle_key_exchange(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1792 if data.len() < 2 || data[0] != KEY_EXCHANGE_MARKER {
1793 return None;
1794 }
1795
1796 let payload = &data[2..];
1797 let msg = KeyExchangeMessage::decode(payload)?;
1798
1799 let mut sessions = self.peer_sessions.lock().unwrap();
1800 let session_mgr = sessions.as_mut()?;
1801
1802 let (response, established) = session_mgr.handle_key_exchange(&msg, now_ms)?;
1803
1804 if established {
1805 self.notify(PeatEvent::PeerE2eeEstablished {
1806 peer_node_id: msg.sender_node_id,
1807 });
1808 log::info!(
1809 "E2EE session established with peer {:08X}",
1810 msg.sender_node_id.as_u32()
1811 );
1812 }
1813
1814 let mut buf = Vec::with_capacity(2 + 37);
1816 buf.push(KEY_EXCHANGE_MARKER);
1817 buf.push(0x00);
1818 buf.extend_from_slice(&response.encode());
1819 Some(buf)
1820 }
1821
1822 fn handle_peer_e2ee_message(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1827 if data.len() < 2 || data[0] != PEER_E2EE_MARKER {
1828 return None;
1829 }
1830
1831 let payload = &data[2..];
1832 let msg = PeerEncryptedMessage::decode(payload)?;
1833
1834 let mut sessions = self.peer_sessions.lock().unwrap();
1835 let session_mgr = sessions.as_mut()?;
1836
1837 match session_mgr.decrypt_from_peer(&msg, now_ms) {
1838 Ok(plaintext) => {
1839 self.notify(PeatEvent::PeerE2eeMessageReceived {
1841 from_node: msg.sender_node_id,
1842 data: plaintext.clone(),
1843 });
1844 Some(plaintext)
1845 }
1846 Err(e) => {
1847 log::warn!(
1848 "Failed to decrypt E2EE message from {:08X}: {:?}",
1849 msg.sender_node_id.as_u32(),
1850 e
1851 );
1852 None
1853 }
1854 }
1855 }
1856
1857 pub fn node_id(&self) -> NodeId {
1861 self.config.node_id
1862 }
1863
1864 pub fn callsign(&self) -> &str {
1866 &self.config.callsign
1867 }
1868
1869 pub fn mesh_id(&self) -> &str {
1871 &self.config.mesh_id
1872 }
1873
1874 pub fn device_name(&self) -> String {
1876 format!(
1877 "PEAT_{}-{:08X}",
1878 self.config.mesh_id,
1879 self.config.node_id.as_u32()
1880 )
1881 }
1882
1883 pub fn get_peer_callsign(&self, node_id: NodeId) -> Option<String> {
1888 self.peer_peripherals.read().ok().and_then(|peripherals| {
1889 peripherals
1890 .get(&node_id)
1891 .map(|p| p.callsign_str().to_string())
1892 })
1893 }
1894
1895 pub fn get_peer_peripheral(&self, node_id: NodeId) -> Option<Peripheral> {
1900 self.peer_peripherals
1901 .read()
1902 .ok()
1903 .and_then(|peripherals| peripherals.get(&node_id).cloned())
1904 }
1905
1906 pub fn document_registry(&self) -> &DocumentRegistry {
1921 &self.document_registry
1922 }
1923
1924 pub fn store_app_document<T: crate::registry::DocumentType>(&self, doc: T) -> bool {
1931 let type_id = T::TYPE_ID;
1932 let (source_node, timestamp) = doc.identity();
1933 let key = (type_id, source_node, timestamp);
1934
1935 let mut docs = self.app_documents.write().unwrap();
1936
1937 if let Some(existing) = docs.get_mut(&key) {
1938 self.document_registry
1940 .merge(type_id, existing.as_mut(), &doc)
1941 } else {
1942 docs.insert(key, Box::new(doc));
1944 true
1945 }
1946 }
1947
1948 pub fn store_app_document_boxed(
1955 &self,
1956 type_id: u8,
1957 source_node: u32,
1958 timestamp: u64,
1959 doc: Box<dyn core::any::Any + Send + Sync>,
1960 ) -> bool {
1961 let key = (type_id, source_node, timestamp);
1962
1963 let mut docs = self.app_documents.write().unwrap();
1964
1965 if let Some(existing) = docs.get_mut(&key) {
1966 self.document_registry
1968 .merge(type_id, existing.as_mut(), doc.as_ref())
1969 } else {
1970 docs.insert(key, doc);
1972 true
1973 }
1974 }
1975
1976 pub fn get_app_document<T: crate::registry::DocumentType>(
1980 &self,
1981 source_node: u32,
1982 timestamp: u64,
1983 ) -> Option<T> {
1984 let key = (T::TYPE_ID, source_node, timestamp);
1985
1986 let docs = self.app_documents.read().unwrap();
1987 docs.get(&key).and_then(|d| d.downcast_ref::<T>()).cloned()
1988 }
1989
1990 pub fn get_all_app_documents_of_type<T: crate::registry::DocumentType>(&self) -> Vec<T> {
1994 let docs = self.app_documents.read().unwrap();
1995 docs.iter()
1996 .filter(|((type_id, _, _), _)| *type_id == T::TYPE_ID)
1997 .filter_map(|(_, doc)| doc.downcast_ref::<T>().cloned())
1998 .collect()
1999 }
2000
2001 pub fn app_document_delta_ops(&self) -> Vec<crate::registry::AppOperation> {
2005 let docs = self.app_documents.read().unwrap();
2006 let mut ops = Vec::new();
2007
2008 for ((type_id, _source, _ts), doc) in docs.iter() {
2009 if let Some(op) = self.document_registry.to_delta_op(*type_id, doc.as_ref()) {
2010 ops.push(op);
2011 }
2012 }
2013
2014 ops
2015 }
2016
2017 pub fn app_document_keys(&self, type_id: u8) -> Vec<(u32, u64)> {
2021 let docs = self.app_documents.read().unwrap();
2022 docs.keys()
2023 .filter(|(tid, _, _)| *tid == type_id)
2024 .map(|(_, source, ts)| (*source, *ts))
2025 .collect()
2026 }
2027
2028 pub fn app_document_count(&self) -> usize {
2030 self.app_documents.read().unwrap().len()
2031 }
2032
2033 pub fn add_observer(&self, observer: Arc<dyn PeatObserver>) {
2037 self.observers.add(observer);
2038 }
2039
2040 pub fn remove_observer(&self, observer: &Arc<dyn PeatObserver>) {
2042 self.observers.remove(observer);
2043 }
2044
2045 pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
2052 let data = self.document_sync.send_emergency(timestamp);
2053 self.notify(PeatEvent::MeshStateChanged {
2054 peer_count: self.peer_manager.peer_count(),
2055 connected_count: self.peer_manager.connected_count(),
2056 });
2057 self.encrypt_document(&data)
2058 }
2059
2060 pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
2065 let data = self.document_sync.send_ack(timestamp);
2066 self.notify(PeatEvent::MeshStateChanged {
2067 peer_count: self.peer_manager.peer_count(),
2068 connected_count: self.peer_manager.connected_count(),
2069 });
2070 self.encrypt_document(&data)
2071 }
2072
2073 pub fn broadcast_bytes(&self, payload: &[u8]) -> Vec<u8> {
2080 self.encrypt_document(payload)
2081 }
2082
2083 pub fn clear_event(&self) {
2085 self.document_sync.clear_event();
2086 }
2087
2088 pub fn is_emergency_active(&self) -> bool {
2090 self.document_sync.is_emergency_active()
2091 }
2092
2093 pub fn is_ack_active(&self) -> bool {
2095 self.document_sync.is_ack_active()
2096 }
2097
2098 pub fn current_event(&self) -> Option<EventType> {
2100 self.document_sync.current_event()
2101 }
2102
2103 pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
2112 let data = self.document_sync.start_emergency(timestamp, known_peers);
2113 self.notify(PeatEvent::MeshStateChanged {
2114 peer_count: self.peer_manager.peer_count(),
2115 connected_count: self.peer_manager.connected_count(),
2116 });
2117 self.encrypt_document(&data)
2118 }
2119
2120 pub fn start_emergency_with_known_peers(&self, timestamp: u64) -> Vec<u8> {
2124 let peers: Vec<u32> = self
2125 .peer_manager
2126 .get_peers()
2127 .iter()
2128 .map(|p| p.node_id.as_u32())
2129 .collect();
2130 self.start_emergency(timestamp, &peers)
2131 }
2132
2133 pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
2138 let result = self.document_sync.ack_emergency(timestamp);
2139 if result.is_some() {
2140 self.notify(PeatEvent::MeshStateChanged {
2141 peer_count: self.peer_manager.peer_count(),
2142 connected_count: self.peer_manager.connected_count(),
2143 });
2144 }
2145 result.map(|data| self.encrypt_document(&data))
2146 }
2147
2148 pub fn clear_emergency(&self) {
2150 self.document_sync.clear_emergency();
2151 }
2152
2153 pub fn has_active_emergency(&self) -> bool {
2155 self.document_sync.has_active_emergency()
2156 }
2157
2158 pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
2162 self.document_sync.get_emergency_status()
2163 }
2164
2165 pub fn has_peer_acked(&self, peer_id: u32) -> bool {
2167 self.document_sync.has_peer_acked(peer_id)
2168 }
2169
2170 pub fn all_peers_acked(&self) -> bool {
2172 self.document_sync.all_peers_acked()
2173 }
2174
2175 #[cfg(feature = "legacy-chat")]
2185 pub fn send_chat(&self, sender: &str, text: &str, timestamp: u64) -> Option<Vec<u8>> {
2186 if self.document_sync.add_chat_message(sender, text, timestamp) {
2187 Some(self.encrypt_document(&self.build_document()))
2188 } else {
2189 None
2190 }
2191 }
2192
2193 #[cfg(feature = "legacy-chat")]
2201 pub fn send_chat_reply(
2202 &self,
2203 sender: &str,
2204 text: &str,
2205 reply_to_node: u32,
2206 reply_to_timestamp: u64,
2207 timestamp: u64,
2208 ) -> Option<Vec<u8>> {
2209 if self.document_sync.add_chat_reply(
2210 sender,
2211 text,
2212 reply_to_node,
2213 reply_to_timestamp,
2214 timestamp,
2215 ) {
2216 Some(self.encrypt_document(&self.build_document()))
2217 } else {
2218 None
2219 }
2220 }
2221
2222 #[cfg(feature = "legacy-chat")]
2224 pub fn chat_count(&self) -> usize {
2225 self.document_sync.chat_count()
2226 }
2227
2228 #[cfg(feature = "legacy-chat")]
2232 pub fn chat_messages_since(
2233 &self,
2234 since_timestamp: u64,
2235 ) -> Vec<(u32, u64, String, String, u32, u64)> {
2236 self.document_sync.chat_messages_since(since_timestamp)
2237 }
2238
2239 #[cfg(feature = "legacy-chat")]
2243 pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
2244 self.document_sync.all_chat_messages()
2245 }
2246
2247 pub fn on_ble_discovered(
2253 &self,
2254 identifier: &str,
2255 name: Option<&str>,
2256 rssi: i8,
2257 mesh_id: Option<&str>,
2258 now_ms: u64,
2259 ) -> Option<PeatPeer> {
2260 let (node_id, is_new) = self
2261 .peer_manager
2262 .on_discovered(identifier, name, rssi, mesh_id, now_ms)?;
2263
2264 let peer = self.peer_manager.get_peer(node_id)?;
2265
2266 {
2268 let mut graph = self.connection_graph.lock().unwrap();
2269 graph.on_discovered(
2270 node_id,
2271 identifier.to_string(),
2272 name.map(|s| s.to_string()),
2273 mesh_id.map(|s| s.to_string()),
2274 rssi,
2275 now_ms,
2276 );
2277 }
2278
2279 if is_new {
2280 self.notify(PeatEvent::PeerDiscovered { peer: peer.clone() });
2281 self.notify_mesh_state_changed();
2282 }
2283
2284 Some(peer)
2285 }
2286
2287 pub fn on_ble_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
2291 let node_id = match self.peer_manager.on_connected(identifier, now_ms) {
2292 Some(id) => id,
2293 None => {
2294 log::warn!(
2295 "on_ble_connected: identifier {:?} not in peer map — \
2296 use on_incoming_connection() for peripheral connections",
2297 identifier
2298 );
2299 return None;
2300 }
2301 };
2302
2303 {
2305 let mut graph = self.connection_graph.lock().unwrap();
2306 graph.on_connected(node_id, now_ms);
2307 }
2308
2309 self.register_peer_for_delta(&node_id);
2311
2312 self.notify(PeatEvent::PeerConnected { node_id });
2313 self.notify_mesh_state_changed();
2314 Some(node_id)
2315 }
2316
2317 pub fn on_ble_disconnected(
2319 &self,
2320 identifier: &str,
2321 reason: DisconnectReason,
2322 ) -> Option<NodeId> {
2323 let (node_id, observer_reason) = self.peer_manager.on_disconnected(identifier, reason)?;
2324
2325 {
2327 let mut graph = self.connection_graph.lock().unwrap();
2328 let platform_reason = match observer_reason {
2329 DisconnectReason::LocalRequest => crate::platform::DisconnectReason::LocalRequest,
2330 DisconnectReason::RemoteRequest => crate::platform::DisconnectReason::RemoteRequest,
2331 DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2332 DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2333 DisconnectReason::ConnectionFailed => {
2334 crate::platform::DisconnectReason::ConnectionFailed
2335 }
2336 DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2337 };
2338 let now_ms = std::time::SystemTime::now()
2339 .duration_since(std::time::UNIX_EPOCH)
2340 .map(|d| d.as_millis() as u64)
2341 .unwrap_or(0);
2342 graph.on_disconnected(node_id, platform_reason, now_ms);
2343
2344 graph.remove_via_peer(node_id);
2347 }
2348
2349 self.unregister_peer_for_delta(&node_id);
2351
2352 self.notify(PeatEvent::PeerDisconnected {
2353 node_id,
2354 reason: observer_reason,
2355 });
2356 self.notify_mesh_state_changed();
2357 Some(node_id)
2358 }
2359
2360 pub fn on_peer_disconnected(&self, node_id: NodeId, reason: DisconnectReason) {
2364 if self
2365 .peer_manager
2366 .on_disconnected_by_node_id(node_id, reason)
2367 {
2368 {
2370 let mut graph = self.connection_graph.lock().unwrap();
2371 let platform_reason = match reason {
2372 DisconnectReason::LocalRequest => {
2373 crate::platform::DisconnectReason::LocalRequest
2374 }
2375 DisconnectReason::RemoteRequest => {
2376 crate::platform::DisconnectReason::RemoteRequest
2377 }
2378 DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2379 DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2380 DisconnectReason::ConnectionFailed => {
2381 crate::platform::DisconnectReason::ConnectionFailed
2382 }
2383 DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2384 };
2385 let now_ms = std::time::SystemTime::now()
2386 .duration_since(std::time::UNIX_EPOCH)
2387 .map(|d| d.as_millis() as u64)
2388 .unwrap_or(0);
2389 graph.on_disconnected(node_id, platform_reason, now_ms);
2390
2391 graph.remove_via_peer(node_id);
2393 }
2394
2395 self.unregister_peer_for_delta(&node_id);
2397
2398 self.notify(PeatEvent::PeerDisconnected { node_id, reason });
2399 self.notify_mesh_state_changed();
2400 }
2401 }
2402
2403 pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
2407 let is_new = self
2408 .peer_manager
2409 .on_incoming_connection(identifier, node_id, now_ms);
2410
2411 {
2413 let mut graph = self.connection_graph.lock().unwrap();
2414 if is_new {
2415 graph.on_discovered(
2416 node_id,
2417 identifier.to_string(),
2418 None,
2419 Some(self.config.mesh_id.clone()),
2420 -50, now_ms,
2422 );
2423 }
2424 graph.on_connected(node_id, now_ms);
2425 }
2426
2427 self.register_peer_for_delta(&node_id);
2429
2430 if is_new {
2431 if let Some(peer) = self.peer_manager.get_peer(node_id) {
2432 self.notify(PeatEvent::PeerDiscovered { peer });
2433 }
2434 }
2435
2436 self.notify(PeatEvent::PeerConnected { node_id });
2437 self.notify_mesh_state_changed();
2438
2439 is_new
2440 }
2441
2442 pub fn on_ble_data_received(
2449 &self,
2450 identifier: &str,
2451 data: &[u8],
2452 now_ms: u64,
2453 ) -> Option<DataReceivedResult> {
2454 let node_id = self.peer_manager.get_node_id(identifier)?;
2456
2457 if data.len() >= 2 {
2459 match data[0] {
2460 KEY_EXCHANGE_MARKER => {
2461 let _response = self.handle_key_exchange(data, now_ms);
2463 return None;
2465 }
2466 PEER_E2EE_MARKER => {
2467 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2469 return None;
2471 }
2472 RELAY_ENVELOPE_MARKER => {
2473 return self
2475 .handle_relay_envelope_with_identifier(node_id, identifier, data, now_ms);
2476 }
2477 _ => {}
2478 }
2479 }
2480
2481 self.process_document_data_with_identifier(node_id, identifier, data, now_ms, None, None, 0)
2483 }
2484
2485 #[allow(clippy::too_many_arguments)]
2487 fn process_document_data_with_identifier(
2488 &self,
2489 source_node: NodeId,
2490 identifier: &str,
2491 data: &[u8],
2492 now_ms: u64,
2493 relay_data: Option<Vec<u8>>,
2494 origin_node: Option<NodeId>,
2495 hop_count: u8,
2496 ) -> Option<DataReceivedResult> {
2497 let decrypted = self.decrypt_document(data, Some(identifier))?;
2499
2500 #[cfg(feature = "mesh-translator")]
2503 if self.try_handle_translator_marker(&decrypted, Some(identifier), Some(source_node)) {
2504 return None;
2505 }
2506
2507 if DeltaDocument::is_delta_document(&decrypted) {
2509 return self.process_delta_document_internal(
2510 source_node,
2511 &decrypted,
2512 now_ms,
2513 relay_data,
2514 origin_node,
2515 hop_count,
2516 );
2517 }
2518
2519 let result = self.document_sync.merge_document(&decrypted)?;
2521
2522 if let Some(ref peripheral) = result.peer_peripheral {
2524 if let Ok(mut peripherals) = self.peer_peripherals.write() {
2525 peripherals.insert(result.source_node, peripheral.clone());
2526 }
2527 }
2528
2529 self.peer_manager.record_sync(source_node, now_ms);
2531
2532 if result.is_emergency() {
2534 self.notify(PeatEvent::EmergencyReceived {
2535 from_node: result.source_node,
2536 });
2537 } else if result.is_ack() {
2538 self.notify(PeatEvent::AckReceived {
2539 from_node: result.source_node,
2540 });
2541 }
2542
2543 if result.counter_changed {
2544 self.notify(PeatEvent::DocumentSynced {
2545 from_node: result.source_node,
2546 total_count: result.total_count,
2547 });
2548 }
2549
2550 if relay_data.is_some() {
2552 let relay_targets = self.get_relay_targets(Some(source_node));
2553 self.notify(PeatEvent::MessageRelayed {
2554 origin_node: origin_node.unwrap_or(result.source_node),
2555 relay_count: relay_targets.len(),
2556 hop_count,
2557 });
2558 }
2559
2560 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2561 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2562
2563 Some(DataReceivedResult {
2564 source_node: result.source_node,
2565 is_emergency: result.is_emergency(),
2566 is_ack: result.is_ack(),
2567 counter_changed: result.counter_changed,
2568 emergency_changed: result.emergency_changed,
2569 total_count: result.total_count,
2570 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2571 relay_data,
2572 origin_node,
2573 hop_count,
2574 callsign,
2575 battery_percent,
2576 heart_rate,
2577 event_type,
2578 latitude,
2579 longitude,
2580 altitude,
2581 })
2582 }
2583
2584 fn handle_relay_envelope_with_identifier(
2586 &self,
2587 source_node: NodeId,
2588 identifier: &str,
2589 data: &[u8],
2590 now_ms: u64,
2591 ) -> Option<DataReceivedResult> {
2592 let envelope = RelayEnvelope::decode(data)?;
2594
2595 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2597 let stats = self
2598 .seen_cache
2599 .lock()
2600 .unwrap()
2601 .get_stats(&envelope.message_id);
2602 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2603
2604 self.notify(PeatEvent::DuplicateMessageDropped {
2605 origin_node: envelope.origin_node,
2606 seen_count,
2607 });
2608 return None;
2609 }
2610
2611 let relay_data = if envelope.can_relay() && self.config.enable_relay {
2613 envelope.relay().map(|e| e.encode())
2614 } else {
2615 if !envelope.can_relay() {
2616 self.notify(PeatEvent::MessageTtlExpired {
2617 origin_node: envelope.origin_node,
2618 hop_count: envelope.hop_count,
2619 });
2620 }
2621 None
2622 };
2623
2624 self.process_document_data_with_identifier(
2626 source_node,
2627 identifier,
2628 &envelope.payload,
2629 now_ms,
2630 relay_data,
2631 Some(envelope.origin_node),
2632 envelope.hop_count,
2633 )
2634 }
2635
2636 pub fn on_ble_data_received_from_node(
2643 &self,
2644 node_id: NodeId,
2645 data: &[u8],
2646 now_ms: u64,
2647 ) -> Option<DataReceivedResult> {
2648 if data.len() >= 2 {
2650 match data[0] {
2651 KEY_EXCHANGE_MARKER => {
2652 let _response = self.handle_key_exchange(data, now_ms);
2653 return None;
2654 }
2655 PEER_E2EE_MARKER => {
2656 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2657 return None;
2658 }
2659 RELAY_ENVELOPE_MARKER => {
2660 return self.handle_relay_envelope(node_id, data, now_ms);
2662 }
2663 _ => {}
2664 }
2665 }
2666
2667 self.process_document_data(node_id, data, now_ms, None, None, 0)
2669 }
2670
2671 pub fn on_ble_data_received_anonymous(
2681 &self,
2682 identifier: &str,
2683 data: &[u8],
2684 now_ms: u64,
2685 ) -> Option<DataReceivedResult> {
2686 log::debug!(
2687 "on_ble_data_received_anonymous: identifier={}, len={}, marker=0x{:02X}",
2688 identifier,
2689 data.len(),
2690 data.first().copied().unwrap_or(0)
2691 );
2692
2693 let decrypted = match self.decrypt_document(data, Some(identifier)) {
2695 Some(d) => d,
2696 None => {
2697 log::warn!(
2698 "on_ble_data_received_anonymous: decrypt/parse FAILED for {} byte doc from {}",
2699 data.len(),
2700 identifier
2701 );
2702 return None;
2703 }
2704 };
2705
2706 #[cfg(feature = "mesh-translator")]
2717 if self.try_handle_translator_marker(&decrypted, Some(identifier), None) {
2718 return None;
2719 }
2720
2721 if decrypted.len() < 8 {
2724 log::warn!("Decrypted document too short to extract source_node");
2725 return None;
2726 }
2727
2728 let source_node_u32 =
2729 u32::from_le_bytes([decrypted[4], decrypted[5], decrypted[6], decrypted[7]]);
2730 let source_node = NodeId::new(source_node_u32);
2731
2732 log::info!(
2733 "Anonymous document from {}: source_node={:08X}, len={}",
2734 identifier,
2735 source_node_u32,
2736 decrypted.len()
2737 );
2738
2739 self.peer_manager
2742 .register_identifier(identifier, source_node);
2743
2744 let is_delta = DeltaDocument::is_delta_document(&decrypted);
2746 log::info!(
2747 "Document format: delta={}, first_byte=0x{:02X}, len={}",
2748 is_delta,
2749 decrypted.first().copied().unwrap_or(0),
2750 decrypted.len()
2751 );
2752
2753 if is_delta {
2754 return self.process_delta_document_internal(
2755 source_node,
2756 &decrypted,
2757 now_ms,
2758 None,
2759 None,
2760 0,
2761 );
2762 }
2763
2764 const APP_LAYER_MARKER: u8 = 0xAF;
2768 if decrypted.first().copied() == Some(APP_LAYER_MARKER) {
2769 log::debug!(
2770 "App-layer message (0xAF) from {:08X}, {} bytes - passing to relay",
2771 source_node.as_u32(),
2772 decrypted.len()
2773 );
2774 return Some(DataReceivedResult {
2775 source_node,
2776 is_emergency: false,
2777 is_ack: false,
2778 counter_changed: false,
2779 emergency_changed: false,
2780 total_count: 0,
2781 event_timestamp: now_ms,
2782 relay_data: Some(decrypted.to_vec()),
2783 origin_node: None,
2784 hop_count: 0,
2785 callsign: None,
2786 battery_percent: None,
2787 heart_rate: None,
2788 event_type: None,
2789 latitude: None,
2790 longitude: None,
2791 altitude: None,
2792 });
2793 }
2794
2795 log::info!(
2797 "Processing legacy document from {:08X}",
2798 source_node.as_u32()
2799 );
2800 let result = self.document_sync.merge_document(&decrypted)?;
2801
2802 log::info!(
2804 "Merge result: peer_peripheral={}, counter_changed={}",
2805 result.peer_peripheral.is_some(),
2806 result.counter_changed
2807 );
2808 if let Some(ref p) = result.peer_peripheral {
2809 log::info!("Peripheral callsign: '{}'", p.callsign_str());
2810 }
2811
2812 self.peer_manager.record_sync(source_node, now_ms);
2814
2815 if result.is_emergency() {
2817 self.notify(PeatEvent::EmergencyReceived {
2818 from_node: result.source_node,
2819 });
2820 } else if result.is_ack() {
2821 self.notify(PeatEvent::AckReceived {
2822 from_node: result.source_node,
2823 });
2824 }
2825
2826 if result.counter_changed {
2827 self.notify(PeatEvent::DocumentSynced {
2828 from_node: result.source_node,
2829 total_count: result.total_count,
2830 });
2831 }
2832
2833 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2834 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2835
2836 Some(DataReceivedResult {
2837 source_node: result.source_node,
2838 is_emergency: result.is_emergency(),
2839 is_ack: result.is_ack(),
2840 counter_changed: result.counter_changed,
2841 emergency_changed: result.emergency_changed,
2842 total_count: result.total_count,
2843 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2844 relay_data: None,
2845 origin_node: None,
2846 hop_count: 0,
2847 callsign,
2848 battery_percent,
2849 heart_rate,
2850 event_type,
2851 latitude,
2852 longitude,
2853 altitude,
2854 })
2855 }
2856
2857 fn process_document_data(
2859 &self,
2860 source_node: NodeId,
2861 data: &[u8],
2862 now_ms: u64,
2863 relay_data: Option<Vec<u8>>,
2864 origin_node: Option<NodeId>,
2865 hop_count: u8,
2866 ) -> Option<DataReceivedResult> {
2867 let source_hint = format!("node:{:08X}", source_node.as_u32());
2869 let decrypted = self.decrypt_document(data, Some(&source_hint))?;
2870
2871 #[cfg(feature = "mesh-translator")]
2874 if self.try_handle_translator_marker(&decrypted, None, Some(source_node)) {
2875 return None;
2876 }
2877
2878 if DeltaDocument::is_delta_document(&decrypted) {
2880 return self.process_delta_document_internal(
2881 source_node,
2882 &decrypted,
2883 now_ms,
2884 relay_data,
2885 origin_node,
2886 hop_count,
2887 );
2888 }
2889
2890 let result = self.document_sync.merge_document(&decrypted)?;
2892
2893 if let Some(ref peripheral) = result.peer_peripheral {
2895 if let Ok(mut peripherals) = self.peer_peripherals.write() {
2896 peripherals.insert(result.source_node, peripheral.clone());
2897 }
2898 }
2899
2900 self.peer_manager.record_sync(source_node, now_ms);
2902
2903 if result.is_emergency() {
2905 self.notify(PeatEvent::EmergencyReceived {
2906 from_node: result.source_node,
2907 });
2908 } else if result.is_ack() {
2909 self.notify(PeatEvent::AckReceived {
2910 from_node: result.source_node,
2911 });
2912 }
2913
2914 if result.counter_changed {
2915 self.notify(PeatEvent::DocumentSynced {
2916 from_node: result.source_node,
2917 total_count: result.total_count,
2918 });
2919 }
2920
2921 if relay_data.is_some() {
2923 let relay_targets = self.get_relay_targets(Some(source_node));
2924 self.notify(PeatEvent::MessageRelayed {
2925 origin_node: origin_node.unwrap_or(result.source_node),
2926 relay_count: relay_targets.len(),
2927 hop_count,
2928 });
2929 }
2930
2931 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2932 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2933
2934 Some(DataReceivedResult {
2935 source_node: result.source_node,
2936 is_emergency: result.is_emergency(),
2937 is_ack: result.is_ack(),
2938 counter_changed: result.counter_changed,
2939 emergency_changed: result.emergency_changed,
2940 total_count: result.total_count,
2941 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2942 relay_data,
2943 origin_node,
2944 hop_count,
2945 callsign,
2946 battery_percent,
2947 heart_rate,
2948 event_type,
2949 latitude,
2950 longitude,
2951 altitude,
2952 })
2953 }
2954
2955 fn handle_relay_envelope(
2957 &self,
2958 source_node: NodeId,
2959 data: &[u8],
2960 now_ms: u64,
2961 ) -> Option<DataReceivedResult> {
2962 let decision = self.process_relay_envelope(data, source_node, now_ms)?;
2964
2965 let relay_data = if decision.should_relay {
2967 decision.relay_data()
2968 } else {
2969 None
2970 };
2971
2972 self.process_document_data(
2974 source_node,
2975 &decision.payload,
2976 now_ms,
2977 relay_data,
2978 Some(decision.origin_node),
2979 decision.hop_count,
2980 )
2981 }
2982
2983 pub fn on_ble_data(
2992 &self,
2993 identifier: &str,
2994 data: &[u8],
2995 now_ms: u64,
2996 ) -> Option<DataReceivedResult> {
2997 if data.len() >= 2 {
2999 match data[0] {
3000 KEY_EXCHANGE_MARKER => {
3001 let _response = self.handle_key_exchange(data, now_ms);
3002 return None;
3003 }
3004 PEER_E2EE_MARKER => {
3005 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
3006 return None;
3007 }
3008 RELAY_ENVELOPE_MARKER => {
3009 return self.handle_relay_envelope_with_incoming(identifier, data, now_ms);
3011 }
3012 _ => {}
3013 }
3014 }
3015
3016 self.process_incoming_document(identifier, data, now_ms, None, None, 0)
3018 }
3019
3020 fn process_incoming_document(
3022 &self,
3023 identifier: &str,
3024 data: &[u8],
3025 now_ms: u64,
3026 relay_data: Option<Vec<u8>>,
3027 origin_node: Option<NodeId>,
3028 hop_count: u8,
3029 ) -> Option<DataReceivedResult> {
3030 let decrypted = self.decrypt_document(data, Some(identifier))?;
3032
3033 let result = self.document_sync.merge_document(&decrypted)?;
3035
3036 self.peer_manager.record_sync(result.source_node, now_ms);
3038
3039 if origin_node.is_none() {
3044 let is_new =
3046 self.peer_manager
3047 .on_incoming_connection(identifier, result.source_node, now_ms);
3048
3049 {
3051 let mut graph = self.connection_graph.lock().unwrap();
3052 if is_new {
3053 graph.on_discovered(
3054 result.source_node,
3055 identifier.to_string(),
3056 None,
3057 Some(self.config.mesh_id.clone()),
3058 -50, now_ms,
3060 );
3061 }
3062 graph.on_connected(result.source_node, now_ms);
3063 }
3064 }
3065
3066 if result.is_emergency() {
3068 self.notify(PeatEvent::EmergencyReceived {
3069 from_node: result.source_node,
3070 });
3071 } else if result.is_ack() {
3072 self.notify(PeatEvent::AckReceived {
3073 from_node: result.source_node,
3074 });
3075 }
3076
3077 if result.counter_changed {
3078 self.notify(PeatEvent::DocumentSynced {
3079 from_node: result.source_node,
3080 total_count: result.total_count,
3081 });
3082 }
3083
3084 if relay_data.is_some() {
3086 let relay_targets = self.get_relay_targets(Some(result.source_node));
3087 self.notify(PeatEvent::MessageRelayed {
3088 origin_node: origin_node.unwrap_or(result.source_node),
3089 relay_count: relay_targets.len(),
3090 hop_count,
3091 });
3092 }
3093
3094 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
3095 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
3096
3097 Some(DataReceivedResult {
3098 source_node: result.source_node,
3099 is_emergency: result.is_emergency(),
3100 is_ack: result.is_ack(),
3101 counter_changed: result.counter_changed,
3102 emergency_changed: result.emergency_changed,
3103 total_count: result.total_count,
3104 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
3105 relay_data,
3106 origin_node,
3107 hop_count,
3108 callsign,
3109 battery_percent,
3110 heart_rate,
3111 event_type,
3112 latitude,
3113 longitude,
3114 altitude,
3115 })
3116 }
3117
3118 fn handle_relay_envelope_with_incoming(
3120 &self,
3121 identifier: &str,
3122 data: &[u8],
3123 now_ms: u64,
3124 ) -> Option<DataReceivedResult> {
3125 let envelope = RelayEnvelope::decode(data)?;
3127
3128 if let Some(source_peer) = self.peer_manager.get_node_id(identifier) {
3131 if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
3132 let is_new = self.connection_graph.lock().unwrap().on_relay_received(
3133 source_peer,
3134 envelope.origin_node,
3135 envelope.hop_count,
3136 now_ms,
3137 );
3138
3139 if is_new {
3140 log::debug!(
3141 "Discovered indirect peer {:08X} via {:08X} ({} hops)",
3142 envelope.origin_node.as_u32(),
3143 source_peer.as_u32(),
3144 envelope.hop_count
3145 );
3146 }
3147 }
3148 }
3149
3150 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
3152 let stats = self
3154 .seen_cache
3155 .lock()
3156 .unwrap()
3157 .get_stats(&envelope.message_id);
3158 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
3159
3160 self.notify(PeatEvent::DuplicateMessageDropped {
3161 origin_node: envelope.origin_node,
3162 seen_count,
3163 });
3164 return None;
3165 }
3166
3167 let (should_relay, relay_data) = if envelope.can_relay() && self.config.enable_relay {
3169 let relay_env = envelope.relay();
3170 (true, relay_env.map(|e| e.encode()))
3171 } else {
3172 if !envelope.can_relay() {
3173 self.notify(PeatEvent::MessageTtlExpired {
3174 origin_node: envelope.origin_node,
3175 hop_count: envelope.hop_count,
3176 });
3177 }
3178 (false, None)
3179 };
3180
3181 self.process_incoming_document(
3183 identifier,
3184 &envelope.payload,
3185 now_ms,
3186 if should_relay { relay_data } else { None },
3187 Some(envelope.origin_node),
3188 envelope.hop_count,
3189 )
3190 }
3191
3192 pub fn tick(&self, now_ms: u64) -> Option<Vec<u8>> {
3202 use std::sync::atomic::Ordering;
3203
3204 let now_ms_32 = now_ms as u32;
3206
3207 let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3209 let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3210 if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3211 self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3212 let removed = self.peer_manager.cleanup_stale(now_ms);
3213 for node_id in &removed {
3214 self.notify(PeatEvent::PeerLost { node_id: *node_id });
3215 }
3216 if !removed.is_empty() {
3217 self.notify_mesh_state_changed();
3218 }
3219
3220 {
3222 let mut graph = self.connection_graph.lock().unwrap();
3223 let newly_lost = graph.tick(now_ms);
3224 graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3226 drop(graph);
3227
3228 for node_id in newly_lost {
3231 if !removed.contains(&node_id) {
3233 self.notify(PeatEvent::PeerLost { node_id });
3234 }
3235 }
3236 }
3237 }
3238
3239 let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3241 let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3242 if sync_elapsed >= self.config.sync_interval_ms as u32 {
3243 self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3244 if self.peer_manager.connected_count() > 0 {
3246 let doc = self.document_sync.build_document();
3247 return Some(self.encrypt_document(&doc));
3248 }
3249 }
3250
3251 None
3252 }
3253
3254 pub fn tick_with_peer_deltas(&self, now_ms: u64) -> Vec<(NodeId, Vec<u8>)> {
3263 use std::sync::atomic::Ordering;
3264 let now_ms_32 = now_ms as u32;
3265
3266 let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3268 let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3269 if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3270 self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3271 let removed = self.peer_manager.cleanup_stale(now_ms);
3272 for node_id in &removed {
3273 self.notify(PeatEvent::PeerLost { node_id: *node_id });
3274 }
3275 if !removed.is_empty() {
3276 self.notify_mesh_state_changed();
3277 }
3278
3279 {
3281 let mut graph = self.connection_graph.lock().unwrap();
3282 let newly_lost = graph.tick(now_ms);
3283 graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3284 drop(graph);
3285
3286 for node_id in newly_lost {
3287 if !removed.contains(&node_id) {
3288 self.notify(PeatEvent::PeerLost { node_id });
3289 }
3290 }
3291 }
3292 }
3293
3294 let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3296 let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3297 if sync_elapsed >= self.config.sync_interval_ms as u32 {
3298 self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3299
3300 let doc = self.document_sync.build_document();
3302 let encrypted = self.encrypt_document(&doc);
3303 let mut results = Vec::new();
3304 for peer in self.get_connected_peers() {
3305 results.push((peer.node_id, encrypted.clone()));
3306 }
3307 return results;
3308 }
3309
3310 Vec::new()
3311 }
3312
3313 pub fn get_peers(&self) -> Vec<PeatPeer> {
3317 self.peer_manager.get_peers()
3318 }
3319
3320 pub fn get_connected_peers(&self) -> Vec<PeatPeer> {
3322 self.peer_manager.get_connected_peers()
3323 }
3324
3325 pub fn get_peer(&self, node_id: NodeId) -> Option<PeatPeer> {
3327 self.peer_manager.get_peer(node_id)
3328 }
3329
3330 pub fn peer_count(&self) -> usize {
3332 self.peer_manager.peer_count()
3333 }
3334
3335 pub fn connected_count(&self) -> usize {
3337 self.peer_manager.connected_count()
3338 }
3339
3340 pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
3342 self.peer_manager.matches_mesh(device_mesh_id)
3343 }
3344
3345 pub fn get_connection_graph(&self) -> Vec<PeerConnectionState> {
3369 self.connection_graph.lock().unwrap().get_all_owned()
3370 }
3371
3372 pub fn get_peer_connection_state(&self, node_id: NodeId) -> Option<PeerConnectionState> {
3374 self.connection_graph
3375 .lock()
3376 .unwrap()
3377 .get_peer(node_id)
3378 .cloned()
3379 }
3380
3381 pub fn get_connected_states(&self) -> Vec<PeerConnectionState> {
3383 self.connection_graph
3384 .lock()
3385 .unwrap()
3386 .get_connected()
3387 .into_iter()
3388 .cloned()
3389 .collect()
3390 }
3391
3392 pub fn get_degraded_peers(&self) -> Vec<PeerConnectionState> {
3394 self.connection_graph
3395 .lock()
3396 .unwrap()
3397 .get_degraded()
3398 .into_iter()
3399 .cloned()
3400 .collect()
3401 }
3402
3403 pub fn get_recently_disconnected(
3407 &self,
3408 within_ms: u64,
3409 now_ms: u64,
3410 ) -> Vec<PeerConnectionState> {
3411 self.connection_graph
3412 .lock()
3413 .unwrap()
3414 .get_recently_disconnected(within_ms, now_ms)
3415 .into_iter()
3416 .cloned()
3417 .collect()
3418 }
3419
3420 pub fn get_lost_peers(&self) -> Vec<PeerConnectionState> {
3422 self.connection_graph
3423 .lock()
3424 .unwrap()
3425 .get_lost()
3426 .into_iter()
3427 .cloned()
3428 .collect()
3429 }
3430
3431 pub fn get_connection_state_counts(&self) -> StateCountSummary {
3433 self.connection_graph.lock().unwrap().state_counts()
3434 }
3435
3436 pub fn get_indirect_peers(&self) -> Vec<IndirectPeer> {
3444 self.connection_graph
3445 .lock()
3446 .unwrap()
3447 .get_indirect_peers_owned()
3448 }
3449
3450 pub fn get_peer_degree(&self, node_id: NodeId) -> Option<PeerDegree> {
3457 self.connection_graph.lock().unwrap().peer_degree(node_id)
3458 }
3459
3460 pub fn get_full_state_counts(&self) -> FullStateCountSummary {
3465 self.connection_graph.lock().unwrap().full_state_counts()
3466 }
3467
3468 pub fn get_paths_to_peer(&self, node_id: NodeId) -> Vec<(NodeId, u8)> {
3473 self.connection_graph.lock().unwrap().get_paths_to(node_id)
3474 }
3475
3476 pub fn is_peer_known(&self, node_id: NodeId) -> bool {
3478 self.connection_graph.lock().unwrap().is_known(node_id)
3479 }
3480
3481 pub fn indirect_peer_count(&self) -> usize {
3483 self.connection_graph.lock().unwrap().indirect_peer_count()
3484 }
3485
3486 pub fn cleanup_indirect_peers(&self, now_ms: u64) -> Vec<NodeId> {
3491 self.connection_graph
3492 .lock()
3493 .unwrap()
3494 .cleanup_indirect(now_ms)
3495 }
3496
3497 pub fn total_count(&self) -> u64 {
3499 self.document_sync.total_count()
3500 }
3501
3502 pub fn document_version(&self) -> u32 {
3504 self.document_sync.version()
3505 }
3506
3507 pub fn version(&self) -> u32 {
3509 self.document_sync.version()
3510 }
3511
3512 pub fn update_health(&self, battery_percent: u8) {
3514 self.document_sync.update_health(battery_percent);
3515 }
3516
3517 pub fn update_activity(&self, activity: u8) {
3519 self.document_sync.update_activity(activity);
3520 }
3521
3522 pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
3524 self.document_sync
3525 .update_health_full(battery_percent, activity);
3526 }
3527
3528 pub fn update_heart_rate(&self, heart_rate: u8) {
3530 self.document_sync.update_heart_rate(heart_rate);
3531 }
3532
3533 pub fn update_location(&self, latitude: f32, longitude: f32, altitude: Option<f32>) {
3535 self.document_sync
3536 .update_location(latitude, longitude, altitude);
3537 }
3538
3539 pub fn clear_location(&self) {
3541 self.document_sync.clear_location();
3542 }
3543
3544 pub fn update_callsign(&self, callsign: &str) {
3546 self.document_sync.update_callsign(callsign);
3547 }
3548
3549 pub fn set_peripheral_event(&self, event_type: EventType, timestamp: u64) {
3551 self.document_sync
3552 .set_peripheral_event(event_type, timestamp);
3553 }
3554
3555 pub fn clear_peripheral_event(&self) {
3557 self.document_sync.clear_peripheral_event();
3558 }
3559
3560 #[allow(clippy::too_many_arguments)]
3565 pub fn update_peripheral_state(
3566 &self,
3567 callsign: &str,
3568 battery_percent: u8,
3569 heart_rate: Option<u8>,
3570 latitude: Option<f32>,
3571 longitude: Option<f32>,
3572 altitude: Option<f32>,
3573 event_type: Option<EventType>,
3574 timestamp: u64,
3575 ) {
3576 self.document_sync.update_peripheral_state(
3577 callsign,
3578 battery_percent,
3579 heart_rate,
3580 latitude,
3581 longitude,
3582 altitude,
3583 event_type,
3584 timestamp,
3585 );
3586 }
3587
3588 pub fn build_document(&self) -> Vec<u8> {
3592 let doc = self.document_sync.build_document();
3593 self.encrypt_document(&doc)
3594 }
3595
3596 pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<PeatPeer> {
3598 self.peer_manager.peers_needing_sync(now_ms)
3599 }
3600
3601 fn notify(&self, event: PeatEvent) {
3604 self.observers.notify(event);
3605 }
3606
3607 fn notify_mesh_state_changed(&self) {
3608 self.notify(PeatEvent::MeshStateChanged {
3609 peer_count: self.peer_manager.peer_count(),
3610 connected_count: self.peer_manager.connected_count(),
3611 });
3612 }
3613
3614 pub fn check_canned_message(&self, source_node: u32, timestamp: u64, _ttl_ms: u64) -> bool {
3634 let mut id_bytes = [0u8; 16];
3637 id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3638 id_bytes[4..12].copy_from_slice(×tamp.to_le_bytes());
3639 let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3640
3641 let seen = self.seen_cache.lock().unwrap();
3643 !seen.has_seen(&message_id)
3644 }
3645
3646 pub fn mark_canned_message_seen(&self, source_node: u32, timestamp: u64) {
3651 let now = std::time::SystemTime::now()
3652 .duration_since(std::time::UNIX_EPOCH)
3653 .map(|d| d.as_millis() as u64)
3654 .unwrap_or(0);
3655
3656 let mut id_bytes = [0u8; 16];
3658 id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3659 id_bytes[4..12].copy_from_slice(×tamp.to_le_bytes());
3660 let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3661 let origin = NodeId::new(source_node);
3662
3663 let mut seen = self.seen_cache.lock().unwrap();
3664 seen.mark_seen(message_id, origin, now);
3665 }
3666
3667 pub fn get_connected_peer_identifiers(&self) -> Vec<String> {
3672 self.peer_manager.get_connected_identifiers()
3673 }
3674}
3675
3676#[derive(Debug, Clone)]
3678pub struct DataReceivedResult {
3679 pub source_node: NodeId,
3681
3682 pub is_emergency: bool,
3684
3685 pub is_ack: bool,
3687
3688 pub counter_changed: bool,
3690
3691 pub emergency_changed: bool,
3693
3694 pub total_count: u64,
3696
3697 pub event_timestamp: u64,
3699
3700 pub relay_data: Option<Vec<u8>>,
3705
3706 pub origin_node: Option<NodeId>,
3708
3709 pub hop_count: u8,
3711
3712 pub callsign: Option<String>,
3715
3716 pub battery_percent: Option<u8>,
3718
3719 pub heart_rate: Option<u8>,
3721
3722 pub event_type: Option<u8>,
3724
3725 pub latitude: Option<f32>,
3727
3728 pub longitude: Option<f32>,
3730
3731 pub altitude: Option<f32>,
3733}
3734
3735impl DataReceivedResult {
3736 #[allow(clippy::type_complexity)]
3738 fn peripheral_fields(
3739 peripheral: &Option<crate::sync::crdt::Peripheral>,
3740 ) -> (
3741 Option<String>,
3742 Option<u8>,
3743 Option<u8>,
3744 Option<u8>,
3745 Option<f32>,
3746 Option<f32>,
3747 Option<f32>,
3748 ) {
3749 match peripheral {
3750 Some(p) => {
3751 let callsign = {
3752 let s = p.callsign_str();
3753 if s.is_empty() {
3754 None
3755 } else {
3756 Some(s.to_string())
3757 }
3758 };
3759 let battery = if p.health.battery_percent > 0 {
3760 Some(p.health.battery_percent)
3761 } else {
3762 None
3763 };
3764 let heart_rate = p.health.heart_rate;
3765 let event_type = p.last_event.as_ref().map(|e| e.event_type as u8);
3766 let (lat, lon, alt) = match &p.location {
3767 Some(loc) => (Some(loc.latitude), Some(loc.longitude), loc.altitude),
3768 None => (None, None, None),
3769 };
3770 (callsign, battery, heart_rate, event_type, lat, lon, alt)
3771 }
3772 None => (None, None, None, None, None, None, None),
3773 }
3774 }
3775}
3776
3777#[derive(Debug, Clone)]
3779pub struct RelayDecision {
3780 pub payload: Vec<u8>,
3782
3783 pub origin_node: NodeId,
3785
3786 pub hop_count: u8,
3788
3789 pub should_relay: bool,
3791
3792 pub relay_envelope: Option<RelayEnvelope>,
3796}
3797
3798impl RelayDecision {
3799 pub fn relay_data(&self) -> Option<Vec<u8>> {
3803 self.relay_envelope.as_ref().map(|e| e.encode())
3804 }
3805}
3806
3807#[cfg(all(test, feature = "std"))]
3808mod tests {
3809 use super::*;
3810 use crate::observer::CollectingObserver;
3811
3812 const TEST_TIMESTAMP: u64 = 1705276800000;
3814
3815 fn create_mesh(node_id: u32, callsign: &str) -> PeatMesh {
3816 let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST");
3817 PeatMesh::new(config)
3818 }
3819
3820 #[test]
3821 fn test_mesh_creation() {
3822 let mesh = create_mesh(0x12345678, "ALPHA-1");
3823
3824 assert_eq!(mesh.node_id().as_u32(), 0x12345678);
3825 assert_eq!(mesh.callsign(), "ALPHA-1");
3826 assert_eq!(mesh.mesh_id(), "TEST");
3827 assert_eq!(mesh.device_name(), "PEAT_TEST-12345678");
3828 }
3829
3830 #[test]
3831 fn test_peer_discovery() {
3832 let mesh = create_mesh(0x11111111, "ALPHA-1");
3833 let observer = Arc::new(CollectingObserver::new());
3834 mesh.add_observer(observer.clone());
3835
3836 let peer = mesh.on_ble_discovered(
3838 "device-uuid",
3839 Some("PEAT_TEST-22222222"),
3840 -65,
3841 Some("TEST"),
3842 1000,
3843 );
3844
3845 assert!(peer.is_some());
3846 let peer = peer.unwrap();
3847 assert_eq!(peer.node_id.as_u32(), 0x22222222);
3848
3849 let events = observer.events();
3851 assert!(events
3852 .iter()
3853 .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
3854 assert!(events
3855 .iter()
3856 .any(|e| matches!(e, PeatEvent::MeshStateChanged { .. })));
3857 }
3858
3859 #[test]
3860 fn test_connection_lifecycle() {
3861 let mesh = create_mesh(0x11111111, "ALPHA-1");
3862 let observer = Arc::new(CollectingObserver::new());
3863 mesh.add_observer(observer.clone());
3864
3865 mesh.on_ble_discovered(
3867 "device-uuid",
3868 Some("PEAT_TEST-22222222"),
3869 -65,
3870 Some("TEST"),
3871 1000,
3872 );
3873
3874 let node_id = mesh.on_ble_connected("device-uuid", 2000);
3875 assert_eq!(node_id, Some(NodeId::new(0x22222222)));
3876 assert_eq!(mesh.connected_count(), 1);
3877
3878 let node_id = mesh.on_ble_disconnected("device-uuid", DisconnectReason::RemoteRequest);
3880 assert_eq!(node_id, Some(NodeId::new(0x22222222)));
3881 assert_eq!(mesh.connected_count(), 0);
3882
3883 let events = observer.events();
3885 assert!(events
3886 .iter()
3887 .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
3888 assert!(events
3889 .iter()
3890 .any(|e| matches!(e, PeatEvent::PeerDisconnected { .. })));
3891 }
3892
3893 #[test]
3894 fn test_emergency_flow() {
3895 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3896 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3897
3898 let observer2 = Arc::new(CollectingObserver::new());
3899 mesh2.add_observer(observer2.clone());
3900
3901 let doc = mesh1.send_emergency(TEST_TIMESTAMP);
3903 assert!(mesh1.is_emergency_active());
3904
3905 let result =
3907 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3908
3909 assert!(result.is_some());
3910 let result = result.unwrap();
3911 assert!(result.is_emergency);
3912 assert_eq!(result.source_node.as_u32(), 0x11111111);
3913
3914 let events = observer2.events();
3916 assert!(events
3917 .iter()
3918 .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
3919 }
3920
3921 #[test]
3922 fn test_ack_flow() {
3923 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3924 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3925
3926 let observer2 = Arc::new(CollectingObserver::new());
3927 mesh2.add_observer(observer2.clone());
3928
3929 let doc = mesh1.send_ack(TEST_TIMESTAMP);
3931 assert!(mesh1.is_ack_active());
3932
3933 let result =
3935 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3936
3937 assert!(result.is_some());
3938 let result = result.unwrap();
3939 assert!(result.is_ack);
3940
3941 let events = observer2.events();
3943 assert!(events
3944 .iter()
3945 .any(|e| matches!(e, PeatEvent::AckReceived { .. })));
3946 }
3947
3948 #[test]
3949 fn test_tick_cleanup() {
3950 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3951 .with_peer_timeout(10_000);
3952 let mesh = PeatMesh::new(config);
3953
3954 let observer = Arc::new(CollectingObserver::new());
3955 mesh.add_observer(observer.clone());
3956
3957 mesh.on_ble_discovered(
3959 "device-uuid",
3960 Some("PEAT_TEST-22222222"),
3961 -65,
3962 Some("TEST"),
3963 1000,
3964 );
3965 assert_eq!(mesh.peer_count(), 1);
3966
3967 mesh.tick(5000);
3969 assert_eq!(mesh.peer_count(), 1);
3970
3971 mesh.tick(20000);
3973 assert_eq!(mesh.peer_count(), 0);
3974
3975 let events = observer.events();
3977 assert!(events
3978 .iter()
3979 .any(|e| matches!(e, PeatEvent::PeerLost { .. })));
3980 }
3981
3982 #[test]
3983 fn test_tick_sync_broadcast() {
3984 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3985 .with_sync_interval(5000);
3986 let mesh = PeatMesh::new(config);
3987
3988 mesh.on_ble_discovered(
3990 "device-uuid",
3991 Some("PEAT_TEST-22222222"),
3992 -65,
3993 Some("TEST"),
3994 1000,
3995 );
3996 mesh.on_ble_connected("device-uuid", 1000);
3997
3998 let _result = mesh.tick(0);
4000 let result = mesh.tick(3000);
4004 assert!(result.is_none());
4005
4006 let result = mesh.tick(6000);
4008 assert!(result.is_some());
4009
4010 let result = mesh.tick(6100);
4012 assert!(result.is_none());
4013
4014 let result = mesh.tick(12000);
4016 assert!(result.is_some());
4017 }
4018
4019 #[test]
4020 fn test_incoming_connection() {
4021 let mesh = create_mesh(0x11111111, "ALPHA-1");
4022 let observer = Arc::new(CollectingObserver::new());
4023 mesh.add_observer(observer.clone());
4024
4025 let is_new = mesh.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
4027
4028 assert!(is_new);
4029 assert_eq!(mesh.peer_count(), 1);
4030 assert_eq!(mesh.connected_count(), 1);
4031
4032 let events = observer.events();
4034 assert!(events
4035 .iter()
4036 .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
4037 assert!(events
4038 .iter()
4039 .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
4040 }
4041
4042 #[test]
4043 fn test_mesh_filtering() {
4044 let mesh = create_mesh(0x11111111, "ALPHA-1");
4045
4046 let peer = mesh.on_ble_discovered(
4048 "device-uuid-1",
4049 Some("PEAT_OTHER-22222222"),
4050 -65,
4051 Some("OTHER"),
4052 1000,
4053 );
4054 assert!(peer.is_none());
4055 assert_eq!(mesh.peer_count(), 0);
4056
4057 let peer = mesh.on_ble_discovered(
4059 "device-uuid-2",
4060 Some("PEAT_TEST-33333333"),
4061 -65,
4062 Some("TEST"),
4063 1000,
4064 );
4065 assert!(peer.is_some());
4066 assert_eq!(mesh.peer_count(), 1);
4067 }
4068
4069 fn create_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
4072 let config =
4073 PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_encryption(secret);
4074 PeatMesh::new(config)
4075 }
4076
4077 #[test]
4078 fn test_encryption_enabled() {
4079 let secret = [0x42u8; 32];
4080 let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4081
4082 assert!(mesh.is_encryption_enabled());
4083 }
4084
4085 #[test]
4086 fn test_encryption_disabled_by_default() {
4087 let mesh = create_mesh(0x11111111, "ALPHA-1");
4088
4089 assert!(!mesh.is_encryption_enabled());
4090 }
4091
4092 #[test]
4093 fn test_encrypted_document_exchange() {
4094 let secret = [0x42u8; 32];
4095 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4096 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4097
4098 let doc = mesh1.build_document();
4100
4101 assert!(doc.len() >= 2);
4103 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4104
4105 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4107
4108 assert!(result.is_some());
4109 let result = result.unwrap();
4110 assert_eq!(result.source_node.as_u32(), 0x11111111);
4111 }
4112
4113 #[test]
4114 fn test_encrypted_emergency_exchange() {
4115 let secret = [0x42u8; 32];
4116 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4117 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4118
4119 let observer = Arc::new(CollectingObserver::new());
4120 mesh2.add_observer(observer.clone());
4121
4122 let doc = mesh1.send_emergency(TEST_TIMESTAMP);
4124
4125 let result =
4127 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4128
4129 assert!(result.is_some());
4130 let result = result.unwrap();
4131 assert!(result.is_emergency);
4132
4133 let events = observer.events();
4135 assert!(events
4136 .iter()
4137 .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
4138 }
4139
4140 #[test]
4141 fn test_wrong_key_fails_decrypt() {
4142 let secret1 = [0x42u8; 32];
4143 let secret2 = [0x43u8; 32]; let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4145 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4146
4147 let doc = mesh1.build_document();
4149
4150 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4152
4153 assert!(result.is_none());
4154 }
4155
4156 #[test]
4157 fn test_unencrypted_mesh_can_read_unencrypted() {
4158 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4159 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4160
4161 let doc = mesh1.build_document();
4163
4164 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4166
4167 assert!(result.is_some());
4168 }
4169
4170 #[test]
4171 fn test_encrypted_mesh_can_receive_unencrypted() {
4172 let secret = [0x42u8; 32];
4174 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); let doc = mesh1.build_document();
4179
4180 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4182
4183 assert!(result.is_some());
4184 }
4185
4186 #[test]
4187 fn test_unencrypted_mesh_cannot_receive_encrypted() {
4188 let secret = [0x42u8; 32];
4189 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret); let mesh2 = create_mesh(0x22222222, "BRAVO-1"); let doc = mesh1.build_document();
4194
4195 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4197
4198 assert!(result.is_none());
4199 }
4200
4201 #[test]
4202 fn test_enable_disable_encryption() {
4203 let mut mesh = create_mesh(0x11111111, "ALPHA-1");
4204
4205 assert!(!mesh.is_encryption_enabled());
4206
4207 let secret = [0x42u8; 32];
4209 mesh.enable_encryption(&secret);
4210 assert!(mesh.is_encryption_enabled());
4211
4212 let doc = mesh.build_document();
4214 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4215
4216 mesh.disable_encryption();
4218 assert!(!mesh.is_encryption_enabled());
4219
4220 let doc = mesh.build_document();
4222 assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4223 }
4224
4225 #[test]
4226 fn test_encryption_overhead() {
4227 let secret = [0x42u8; 32];
4228 let mesh_encrypted = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4229 let mesh_unencrypted = create_mesh(0x22222222, "BRAVO-1");
4230
4231 let doc_encrypted = mesh_encrypted.build_document();
4232 let doc_unencrypted = mesh_unencrypted.build_document();
4233
4234 let overhead = doc_encrypted.len() - doc_unencrypted.len();
4240 assert_eq!(overhead, 30); }
4242
4243 #[test]
4246 fn test_peer_e2ee_enable_disable() {
4247 let mesh = create_mesh(0x11111111, "ALPHA-1");
4248
4249 assert!(!mesh.is_peer_e2ee_enabled());
4250 assert!(mesh.peer_e2ee_public_key().is_none());
4251
4252 mesh.enable_peer_e2ee();
4253 assert!(mesh.is_peer_e2ee_enabled());
4254 assert!(mesh.peer_e2ee_public_key().is_some());
4255
4256 mesh.disable_peer_e2ee();
4257 assert!(!mesh.is_peer_e2ee_enabled());
4258 }
4259
4260 #[test]
4261 fn test_peer_e2ee_initiate_session() {
4262 let mesh = create_mesh(0x11111111, "ALPHA-1");
4263 mesh.enable_peer_e2ee();
4264
4265 let key_exchange = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4266 assert!(key_exchange.is_some());
4267
4268 let key_exchange = key_exchange.unwrap();
4269 assert_eq!(key_exchange[0], crate::document::KEY_EXCHANGE_MARKER);
4271
4272 assert_eq!(mesh.peer_e2ee_session_count(), 1);
4274 assert_eq!(mesh.peer_e2ee_established_count(), 0);
4275 }
4276
4277 #[test]
4278 fn test_peer_e2ee_full_handshake() {
4279 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4280 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4281
4282 mesh1.enable_peer_e2ee();
4283 mesh2.enable_peer_e2ee();
4284
4285 let observer1 = Arc::new(CollectingObserver::new());
4286 let observer2 = Arc::new(CollectingObserver::new());
4287 mesh1.add_observer(observer1.clone());
4288 mesh2.add_observer(observer2.clone());
4289
4290 let key_exchange1 = mesh1
4292 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4293 .unwrap();
4294
4295 let response = mesh2.handle_key_exchange(&key_exchange1, 1000);
4297 assert!(response.is_some());
4298
4299 assert!(mesh2.has_peer_e2ee_session(NodeId::new(0x11111111)));
4301
4302 let key_exchange2 = response.unwrap();
4304 let _ = mesh1.handle_key_exchange(&key_exchange2, 1000);
4305
4306 assert!(mesh1.has_peer_e2ee_session(NodeId::new(0x22222222)));
4308
4309 let events1 = observer1.events();
4311 assert!(events1
4312 .iter()
4313 .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4314
4315 let events2 = observer2.events();
4316 assert!(events2
4317 .iter()
4318 .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4319 }
4320
4321 #[test]
4322 fn test_peer_e2ee_encrypt_decrypt() {
4323 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4324 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4325
4326 mesh1.enable_peer_e2ee();
4327 mesh2.enable_peer_e2ee();
4328
4329 let key_exchange1 = mesh1
4331 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4332 .unwrap();
4333 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4334 mesh1.handle_key_exchange(&key_exchange2, 1000);
4335
4336 let plaintext = b"Secret message from mesh1";
4338 let encrypted = mesh1.send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000);
4339 assert!(encrypted.is_some());
4340
4341 let encrypted = encrypted.unwrap();
4342 assert_eq!(encrypted[0], crate::document::PEER_E2EE_MARKER);
4344
4345 let observer2 = Arc::new(CollectingObserver::new());
4347 mesh2.add_observer(observer2.clone());
4348
4349 let decrypted = mesh2.handle_peer_e2ee_message(&encrypted, 2000);
4350 assert!(decrypted.is_some());
4351 assert_eq!(decrypted.unwrap(), plaintext);
4352
4353 let events = observer2.events();
4355 assert!(events.iter().any(|e| matches!(
4356 e,
4357 PeatEvent::PeerE2eeMessageReceived { from_node, data }
4358 if from_node.as_u32() == 0x11111111 && data == plaintext
4359 )));
4360 }
4361
4362 #[test]
4363 fn test_peer_e2ee_bidirectional() {
4364 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4365 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4366
4367 mesh1.enable_peer_e2ee();
4368 mesh2.enable_peer_e2ee();
4369
4370 let key_exchange1 = mesh1
4372 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4373 .unwrap();
4374 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4375 mesh1.handle_key_exchange(&key_exchange2, 1000);
4376
4377 let msg1 = mesh1
4379 .send_peer_e2ee(NodeId::new(0x22222222), b"Hello from mesh1", 2000)
4380 .unwrap();
4381 let dec1 = mesh2.handle_peer_e2ee_message(&msg1, 2000).unwrap();
4382 assert_eq!(dec1, b"Hello from mesh1");
4383
4384 let msg2 = mesh2
4386 .send_peer_e2ee(NodeId::new(0x11111111), b"Hello from mesh2", 2000)
4387 .unwrap();
4388 let dec2 = mesh1.handle_peer_e2ee_message(&msg2, 2000).unwrap();
4389 assert_eq!(dec2, b"Hello from mesh2");
4390 }
4391
4392 #[test]
4393 fn test_peer_e2ee_close_session() {
4394 let mesh = create_mesh(0x11111111, "ALPHA-1");
4395 mesh.enable_peer_e2ee();
4396
4397 let observer = Arc::new(CollectingObserver::new());
4398 mesh.add_observer(observer.clone());
4399
4400 mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4402 assert_eq!(mesh.peer_e2ee_session_count(), 1);
4403
4404 mesh.close_peer_e2ee(NodeId::new(0x22222222));
4406
4407 let events = observer.events();
4409 assert!(events
4410 .iter()
4411 .any(|e| matches!(e, PeatEvent::PeerE2eeClosed { .. })));
4412 }
4413
4414 #[test]
4415 fn test_peer_e2ee_without_enabling() {
4416 let mesh = create_mesh(0x11111111, "ALPHA-1");
4417
4418 let result = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4420 assert!(result.is_none());
4421
4422 let result = mesh.send_peer_e2ee(NodeId::new(0x22222222), b"test", 1000);
4423 assert!(result.is_none());
4424
4425 assert!(!mesh.has_peer_e2ee_session(NodeId::new(0x22222222)));
4426 }
4427
4428 #[test]
4429 fn test_peer_e2ee_overhead() {
4430 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4431 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4432
4433 mesh1.enable_peer_e2ee();
4434 mesh2.enable_peer_e2ee();
4435
4436 let key_exchange1 = mesh1
4438 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4439 .unwrap();
4440 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4441 mesh1.handle_key_exchange(&key_exchange2, 1000);
4442
4443 let plaintext = b"Test message";
4445 let encrypted = mesh1
4446 .send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000)
4447 .unwrap();
4448
4449 let overhead = encrypted.len() - plaintext.len();
4458 assert_eq!(overhead, 46);
4459 }
4460
4461 fn create_strict_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
4464 let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST")
4465 .with_encryption(secret)
4466 .with_strict_encryption();
4467 PeatMesh::new(config)
4468 }
4469
4470 #[test]
4471 fn test_strict_encryption_enabled() {
4472 let secret = [0x42u8; 32];
4473 let mesh = create_strict_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4474
4475 assert!(mesh.is_encryption_enabled());
4476 assert!(mesh.is_strict_encryption_enabled());
4477 }
4478
4479 #[test]
4480 fn test_strict_encryption_disabled_by_default() {
4481 let secret = [0x42u8; 32];
4482 let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4483
4484 assert!(mesh.is_encryption_enabled());
4485 assert!(!mesh.is_strict_encryption_enabled());
4486 }
4487
4488 #[test]
4489 fn test_strict_encryption_requires_encryption_enabled() {
4490 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4492 .with_strict_encryption(); let mesh = PeatMesh::new(config);
4494
4495 assert!(!mesh.is_encryption_enabled());
4496 assert!(!mesh.is_strict_encryption_enabled());
4497 }
4498
4499 #[test]
4500 fn test_strict_mode_accepts_encrypted_documents() {
4501 let secret = [0x42u8; 32];
4502 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4503 let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4504
4505 let doc = mesh1.build_document();
4507 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4508
4509 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4511 assert!(result.is_some());
4512 }
4513
4514 #[test]
4515 fn test_strict_mode_rejects_unencrypted_documents() {
4516 let secret = [0x42u8; 32];
4517 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret); let observer = Arc::new(CollectingObserver::new());
4521 mesh2.add_observer(observer.clone());
4522
4523 let doc = mesh1.build_document();
4525 assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4526
4527 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4529 assert!(result.is_none());
4530
4531 let events = observer.events();
4533 assert!(events.iter().any(|e| matches!(
4534 e,
4535 PeatEvent::SecurityViolation {
4536 kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4537 ..
4538 }
4539 )));
4540 }
4541
4542 #[test]
4543 fn test_non_strict_mode_accepts_unencrypted_documents() {
4544 let secret = [0x42u8; 32];
4545 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); let doc = mesh1.build_document();
4550
4551 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4553 assert!(result.is_some());
4554 }
4555
4556 #[test]
4557 fn test_strict_mode_security_violation_event_includes_source() {
4558 let secret = [0x42u8; 32];
4559 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4560 let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4561
4562 let observer = Arc::new(CollectingObserver::new());
4563 mesh2.add_observer(observer.clone());
4564
4565 let doc = mesh1.build_document();
4566
4567 mesh2.on_ble_discovered(
4569 "test-device-uuid",
4570 Some("PEAT_TEST-11111111"),
4571 -65,
4572 Some("TEST"),
4573 500,
4574 );
4575 mesh2.on_ble_connected("test-device-uuid", 600);
4576
4577 let _result = mesh2.on_ble_data_received("test-device-uuid", &doc, 1000);
4578
4579 let events = observer.events();
4581 let violation = events.iter().find(|e| {
4582 matches!(
4583 e,
4584 PeatEvent::SecurityViolation {
4585 kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4586 ..
4587 }
4588 )
4589 });
4590 assert!(violation.is_some());
4591
4592 if let Some(PeatEvent::SecurityViolation { source, .. }) = violation {
4593 assert!(source.is_some());
4594 assert_eq!(source.as_ref().unwrap(), "test-device-uuid");
4595 }
4596 }
4597
4598 #[test]
4599 fn test_decryption_failure_emits_security_violation() {
4600 let secret1 = [0x42u8; 32];
4601 let secret2 = [0x43u8; 32]; let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4603 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4604
4605 let observer = Arc::new(CollectingObserver::new());
4606 mesh2.add_observer(observer.clone());
4607
4608 let doc = mesh1.build_document();
4610
4611 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4613 assert!(result.is_none());
4614
4615 let events = observer.events();
4617 assert!(events.iter().any(|e| matches!(
4618 e,
4619 PeatEvent::SecurityViolation {
4620 kind: crate::observer::SecurityViolationKind::DecryptionFailed,
4621 ..
4622 }
4623 )));
4624 }
4625
4626 #[test]
4627 fn test_strict_mode_builder_chain() {
4628 let secret = [0x42u8; 32];
4629 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4630 .with_encryption(secret)
4631 .with_strict_encryption()
4632 .with_sync_interval(10_000)
4633 .with_peer_timeout(60_000);
4634
4635 let mesh = PeatMesh::new(config);
4636
4637 assert!(mesh.is_encryption_enabled());
4638 assert!(mesh.is_strict_encryption_enabled());
4639 }
4640
4641 fn create_relay_mesh(node_id: u32, callsign: &str) -> PeatMesh {
4644 let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_relay();
4645 PeatMesh::new(config)
4646 }
4647
4648 #[test]
4649 fn test_relay_disabled_by_default() {
4650 let mesh = create_mesh(0x11111111, "ALPHA-1");
4651 assert!(!mesh.is_relay_enabled());
4652 }
4653
4654 #[test]
4655 fn test_relay_enabled() {
4656 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4657 assert!(mesh.is_relay_enabled());
4658 }
4659
4660 #[test]
4661 fn test_relay_config_builder() {
4662 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4663 .with_relay()
4664 .with_max_relay_hops(5)
4665 .with_relay_fanout(3)
4666 .with_seen_cache_ttl(60_000);
4667
4668 assert!(config.enable_relay);
4669 assert_eq!(config.max_relay_hops, 5);
4670 assert_eq!(config.relay_fanout, 3);
4671 assert_eq!(config.seen_cache_ttl_ms, 60_000);
4672 }
4673
4674 #[test]
4675 fn test_seen_message_deduplication() {
4676 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4677 let origin = NodeId::new(0x22222222);
4678 let msg_id = crate::relay::MessageId::from_content(origin, 1000, 0xDEADBEEF);
4679
4680 assert!(mesh.mark_message_seen(msg_id, origin, 1000));
4682
4683 assert!(!mesh.mark_message_seen(msg_id, origin, 2000));
4685
4686 assert_eq!(mesh.seen_cache_size(), 1);
4687 }
4688
4689 #[test]
4690 fn test_wrap_for_relay() {
4691 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4692
4693 let payload = vec![1, 2, 3, 4, 5];
4694 let wrapped = mesh.wrap_for_relay(payload.clone());
4695
4696 assert_eq!(wrapped[0], crate::relay::RELAY_ENVELOPE_MARKER);
4698
4699 let envelope = crate::relay::RelayEnvelope::decode(&wrapped).unwrap();
4701 assert_eq!(envelope.payload, payload);
4702 assert_eq!(envelope.origin_node, NodeId::new(0x11111111));
4703 assert_eq!(envelope.hop_count, 0);
4704 }
4705
4706 #[test]
4707 fn test_process_relay_envelope_new_message() {
4708 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4709 let observer = Arc::new(CollectingObserver::new());
4710 mesh.add_observer(observer.clone());
4711
4712 let payload = vec![1, 2, 3, 4, 5];
4714 let envelope =
4715 crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4716 .with_max_hops(7);
4717 let data = envelope.encode();
4718
4719 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4721
4722 assert!(decision.is_some());
4723 let decision = decision.unwrap();
4724 assert_eq!(decision.payload, payload);
4725 assert_eq!(decision.origin_node.as_u32(), 0x22222222);
4726 assert_eq!(decision.hop_count, 0);
4727 assert!(decision.should_relay);
4728 assert!(decision.relay_envelope.is_some());
4729
4730 let relay_env = decision.relay_envelope.unwrap();
4732 assert_eq!(relay_env.hop_count, 1);
4733 }
4734
4735 #[test]
4736 fn test_process_relay_envelope_duplicate() {
4737 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4738 let observer = Arc::new(CollectingObserver::new());
4739 mesh.add_observer(observer.clone());
4740
4741 let payload = vec![1, 2, 3, 4, 5];
4742 let envelope = crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload);
4743 let data = envelope.encode();
4744
4745 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4747 assert!(decision.is_some());
4748
4749 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 2000);
4751 assert!(decision.is_none());
4752
4753 let events = observer.events();
4755 assert!(events
4756 .iter()
4757 .any(|e| matches!(e, PeatEvent::DuplicateMessageDropped { .. })));
4758 }
4759
4760 #[test]
4761 fn test_process_relay_envelope_ttl_expired() {
4762 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4763 let observer = Arc::new(CollectingObserver::new());
4764 mesh.add_observer(observer.clone());
4765
4766 let payload = vec![1, 2, 3, 4, 5];
4768 let mut envelope =
4769 crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4770 .with_max_hops(3);
4771
4772 envelope = envelope.relay().unwrap(); envelope = envelope.relay().unwrap(); envelope = envelope.relay().unwrap(); let data = envelope.encode();
4778
4779 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4781
4782 assert!(decision.is_some());
4783 let decision = decision.unwrap();
4784 assert_eq!(decision.payload, payload);
4785 assert!(!decision.should_relay); assert!(decision.relay_envelope.is_none());
4787
4788 let events = observer.events();
4790 assert!(events
4791 .iter()
4792 .any(|e| matches!(e, PeatEvent::MessageTtlExpired { .. })));
4793 }
4794
4795 #[test]
4796 fn test_build_relay_document() {
4797 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4798
4799 let relay_doc = mesh.build_relay_document();
4800
4801 assert_eq!(relay_doc[0], crate::relay::RELAY_ENVELOPE_MARKER);
4803
4804 let envelope = crate::relay::RelayEnvelope::decode(&relay_doc).unwrap();
4806 assert_eq!(envelope.origin_node.as_u32(), 0x11111111);
4807
4808 let doc = crate::document::PeatDocument::decode(&envelope.payload);
4810 assert!(doc.is_some());
4811 }
4812
4813 #[test]
4814 fn test_relay_targets_excludes_source() {
4815 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4816
4817 mesh.on_ble_discovered(
4819 "peer-1",
4820 Some("PEAT_TEST-22222222"),
4821 -60,
4822 Some("TEST"),
4823 1000,
4824 );
4825 mesh.on_ble_connected("peer-1", 1000);
4826
4827 mesh.on_ble_discovered(
4828 "peer-2",
4829 Some("PEAT_TEST-33333333"),
4830 -65,
4831 Some("TEST"),
4832 1000,
4833 );
4834 mesh.on_ble_connected("peer-2", 1000);
4835
4836 mesh.on_ble_discovered(
4837 "peer-3",
4838 Some("PEAT_TEST-44444444"),
4839 -70,
4840 Some("TEST"),
4841 1000,
4842 );
4843 mesh.on_ble_connected("peer-3", 1000);
4844
4845 let targets = mesh.get_relay_targets(Some(NodeId::new(0x33333333)));
4847
4848 assert!(targets.iter().all(|p| p.node_id.as_u32() != 0x33333333));
4850 }
4851
4852 #[test]
4853 fn test_clear_seen_cache() {
4854 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4855 let origin = NodeId::new(0x22222222);
4856
4857 mesh.mark_message_seen(
4859 crate::relay::MessageId::from_content(origin, 1000, 0x11111111),
4860 origin,
4861 1000,
4862 );
4863 mesh.mark_message_seen(
4864 crate::relay::MessageId::from_content(origin, 2000, 0x22222222),
4865 origin,
4866 2000,
4867 );
4868
4869 assert_eq!(mesh.seen_cache_size(), 2);
4870
4871 mesh.clear_seen_cache();
4873 assert_eq!(mesh.seen_cache_size(), 0);
4874 }
4875}