1#[cfg(not(feature = "std"))]
57use alloc::{string::String, sync::Arc, vec::Vec};
58#[cfg(feature = "std")]
59use std::collections::HashMap;
60#[cfg(feature = "std")]
61use std::sync::Arc;
62
63use crate::document::{ENCRYPTED_MARKER, KEY_EXCHANGE_MARKER, PEER_E2EE_MARKER};
64#[cfg(feature = "mesh-translator")]
65use crate::document::{
66 TRANSLATOR_FRAME_MARKER, TRANSLATOR_RESERVED_MARKER_END, TRANSLATOR_RESERVED_MARKER_START,
67};
68use crate::document_sync::DocumentSync;
69use crate::gossip::{GossipStrategy, RandomFanout};
70use crate::observer::{DisconnectReason, PeatEvent, PeatObserver, SecurityViolationKind};
71use crate::peer::{
72 ConnectionStateGraph, FullStateCountSummary, IndirectPeer, PeatPeer, PeerConnectionState,
73 PeerDegree, PeerManagerConfig, StateCountSummary,
74};
75use crate::peer_manager::PeerManager;
76use crate::relay::{
77 MessageId, RelayEnvelope, SeenMessageCache, DEFAULT_MAX_HOPS, DEFAULT_SEEN_TTL_MS,
78 RELAY_ENVELOPE_MARKER,
79};
80use crate::security::{
81 DeviceIdentity, IdentityAttestation, IdentityRegistry, KeyExchangeMessage, MeshEncryptionKey,
82 PeerEncryptedMessage, PeerSessionManager, RegistryResult, SessionState,
83};
84use crate::sync::crdt::{EventType, Peripheral, PeripheralType};
85use crate::sync::delta::{DeltaEncoder, DeltaStats};
86use crate::sync::delta_document::{DeltaDocument, Operation};
87use crate::NodeId;
88
89#[cfg(feature = "std")]
90use crate::observer::ObserverManager;
91
92use crate::registry::DocumentRegistry;
93
94#[derive(Debug, Clone)]
96pub struct PeatMeshConfig {
97 pub node_id: NodeId,
99
100 pub callsign: String,
102
103 pub mesh_id: String,
105
106 pub peripheral_type: PeripheralType,
108
109 pub peer_config: PeerManagerConfig,
111
112 pub sync_interval_ms: u64,
114
115 pub auto_broadcast_events: bool,
117
118 pub encryption_secret: Option<[u8; 32]>,
124
125 pub strict_encryption: bool,
133
134 pub enable_relay: bool,
141
142 pub max_relay_hops: u8,
147
148 pub relay_fanout: usize,
154
155 pub seen_cache_ttl_ms: u64,
160}
161
162impl PeatMeshConfig {
163 pub fn new(node_id: NodeId, callsign: &str, mesh_id: &str) -> Self {
165 Self {
166 node_id,
167 callsign: callsign.into(),
168 mesh_id: mesh_id.into(),
169 peripheral_type: PeripheralType::SoldierSensor,
170 peer_config: PeerManagerConfig::with_mesh_id(mesh_id),
171 sync_interval_ms: 5000,
172 auto_broadcast_events: true,
173 encryption_secret: None,
174 strict_encryption: false,
175 enable_relay: false,
176 max_relay_hops: DEFAULT_MAX_HOPS,
177 relay_fanout: 2,
178 seen_cache_ttl_ms: DEFAULT_SEEN_TTL_MS,
179 }
180 }
181
182 pub fn with_encryption(mut self, secret: [u8; 32]) -> Self {
187 self.encryption_secret = Some(secret);
188 self
189 }
190
191 pub fn with_peripheral_type(mut self, ptype: PeripheralType) -> Self {
193 self.peripheral_type = ptype;
194 self
195 }
196
197 pub fn with_sync_interval(mut self, interval_ms: u64) -> Self {
199 self.sync_interval_ms = interval_ms;
200 self
201 }
202
203 pub fn with_peer_timeout(mut self, timeout_ms: u64) -> Self {
205 self.peer_config.peer_timeout_ms = timeout_ms;
206 self
207 }
208
209 pub fn with_max_peers(mut self, max: usize) -> Self {
211 self.peer_config.max_peers = max;
212 self
213 }
214
215 pub fn with_strict_encryption(mut self) -> Self {
223 self.strict_encryption = true;
224 self
225 }
226
227 pub fn with_relay(mut self) -> Self {
232 self.enable_relay = true;
233 self
234 }
235
236 pub fn with_max_relay_hops(mut self, max_hops: u8) -> Self {
240 self.max_relay_hops = max_hops;
241 self
242 }
243
244 pub fn with_relay_fanout(mut self, fanout: usize) -> Self {
248 self.relay_fanout = fanout.max(1);
249 self
250 }
251
252 pub fn with_seen_cache_ttl(mut self, ttl_ms: u64) -> Self {
256 self.seen_cache_ttl_ms = ttl_ms;
257 self
258 }
259}
260
261#[cfg(feature = "std")]
263type AppDocumentStore =
264 std::sync::RwLock<HashMap<(u8, u32, u64), Box<dyn core::any::Any + Send + Sync>>>;
265
266#[cfg(feature = "std")]
271pub struct PeatMesh {
272 config: PeatMeshConfig,
274
275 peer_manager: PeerManager,
277
278 document_sync: DocumentSync,
280
281 observers: ObserverManager,
283
284 last_sync_ms: std::sync::atomic::AtomicU32,
286
287 last_cleanup_ms: std::sync::atomic::AtomicU32,
289
290 encryption_key: Option<MeshEncryptionKey>,
292
293 peer_sessions: std::sync::Mutex<Option<PeerSessionManager>>,
295
296 connection_graph: std::sync::Mutex<ConnectionStateGraph>,
298
299 seen_cache: std::sync::Mutex<SeenMessageCache>,
301
302 gossip_strategy: Box<dyn GossipStrategy>,
304
305 delta_encoder: std::sync::Mutex<DeltaEncoder>,
310
311 identity: Option<DeviceIdentity>,
316
317 identity_registry: std::sync::Mutex<IdentityRegistry>,
321
322 peer_peripherals: std::sync::RwLock<HashMap<NodeId, Peripheral>>,
327
328 document_registry: DocumentRegistry,
333
334 app_documents: AppDocumentStore,
340
341 #[cfg(feature = "mesh-translator")]
350 ble_translator: std::sync::RwLock<crate::translator::BleTranslator>,
351
352 #[cfg(feature = "mesh-translator")]
366 decoded_document_callback: std::sync::RwLock<Option<Arc<dyn crate::DecodedDocumentCallback>>>,
367
368 #[cfg(all(feature = "mesh-translator", feature = "uniffi"))]
376 decoded_document_json_callback:
377 std::sync::RwLock<Option<Arc<dyn crate::DecodedDocumentJsonCallback>>>,
378
379 polled_translator_consumer_attested: std::sync::atomic::AtomicBool,
395}
396
397#[cfg(feature = "std")]
398impl PeatMesh {
399 pub fn new(config: PeatMeshConfig) -> Self {
401 let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
402 let document_sync = DocumentSync::with_peripheral_type(
403 config.node_id,
404 &config.callsign,
405 config.peripheral_type,
406 );
407
408 let encryption_key = config
410 .encryption_secret
411 .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
412
413 let connection_graph = ConnectionStateGraph::with_config(
415 config.peer_config.rssi_degraded_threshold,
416 config.peer_config.lost_timeout_ms,
417 );
418
419 let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
421
422 let gossip_strategy: Box<dyn GossipStrategy> =
424 Box::new(RandomFanout::new(config.relay_fanout));
425
426 let delta_encoder = DeltaEncoder::new(config.node_id);
428
429 let document_registry = DocumentRegistry::new();
430
431 Self {
432 config,
433 peer_manager,
434 document_sync,
435 observers: ObserverManager::new(),
436 last_sync_ms: std::sync::atomic::AtomicU32::new(0),
437 last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
438 encryption_key,
439 peer_sessions: std::sync::Mutex::new(None),
440 connection_graph: std::sync::Mutex::new(connection_graph),
441 seen_cache: std::sync::Mutex::new(seen_cache),
442 gossip_strategy,
443 delta_encoder: std::sync::Mutex::new(delta_encoder),
444 identity: None,
445 identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
446 peer_peripherals: std::sync::RwLock::new(HashMap::new()),
447 document_registry,
448 app_documents: std::sync::RwLock::new(HashMap::new()),
449 #[cfg(feature = "mesh-translator")]
450 ble_translator: std::sync::RwLock::new(
451 crate::translator::BleTranslator::with_defaults(),
452 ),
453 #[cfg(feature = "mesh-translator")]
454 decoded_document_callback: std::sync::RwLock::new(None),
455 #[cfg(all(feature = "mesh-translator", feature = "uniffi"))]
456 decoded_document_json_callback: std::sync::RwLock::new(None),
457 polled_translator_consumer_attested: std::sync::atomic::AtomicBool::new(false),
458 }
459 }
460
461 pub fn with_identity(config: PeatMeshConfig, identity: DeviceIdentity) -> Self {
467 let mut config = config;
469 config.node_id = identity.node_id();
470
471 let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
472 let document_sync = DocumentSync::with_peripheral_type(
473 config.node_id,
474 &config.callsign,
475 config.peripheral_type,
476 );
477
478 let encryption_key = config
479 .encryption_secret
480 .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
481
482 let connection_graph = ConnectionStateGraph::with_config(
483 config.peer_config.rssi_degraded_threshold,
484 config.peer_config.lost_timeout_ms,
485 );
486
487 let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
488 let gossip_strategy: Box<dyn GossipStrategy> =
489 Box::new(RandomFanout::new(config.relay_fanout));
490 let delta_encoder = DeltaEncoder::new(config.node_id);
491
492 let document_registry = DocumentRegistry::new();
493
494 Self {
495 config,
496 peer_manager,
497 document_sync,
498 observers: ObserverManager::new(),
499 last_sync_ms: std::sync::atomic::AtomicU32::new(0),
500 last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
501 encryption_key,
502 peer_sessions: std::sync::Mutex::new(None),
503 connection_graph: std::sync::Mutex::new(connection_graph),
504 seen_cache: std::sync::Mutex::new(seen_cache),
505 gossip_strategy,
506 delta_encoder: std::sync::Mutex::new(delta_encoder),
507 identity: Some(identity),
508 identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
509 peer_peripherals: std::sync::RwLock::new(HashMap::new()),
510 document_registry,
511 app_documents: std::sync::RwLock::new(HashMap::new()),
512 #[cfg(feature = "mesh-translator")]
513 ble_translator: std::sync::RwLock::new(
514 crate::translator::BleTranslator::with_defaults(),
515 ),
516 #[cfg(feature = "mesh-translator")]
517 decoded_document_callback: std::sync::RwLock::new(None),
518 #[cfg(all(feature = "mesh-translator", feature = "uniffi"))]
519 decoded_document_json_callback: std::sync::RwLock::new(None),
520 polled_translator_consumer_attested: std::sync::atomic::AtomicBool::new(false),
521 }
522 }
523
524 pub fn from_genesis(
532 genesis: &crate::security::MeshGenesis,
533 identity: DeviceIdentity,
534 callsign: &str,
535 ) -> Self {
536 let config = PeatMeshConfig::new(identity.node_id(), callsign, &genesis.mesh_id())
537 .with_encryption(genesis.encryption_secret());
538
539 Self::with_identity(config, identity)
540 }
541
542 #[cfg(feature = "std")]
568 pub fn from_persisted(
569 state: crate::security::PersistedState,
570 callsign: &str,
571 ) -> Result<Self, crate::security::PersistenceError> {
572 let identity = state.restore_identity()?;
574
575 let genesis = state.restore_genesis();
577
578 let mesh = if let Some(ref gen) = genesis {
580 Self::from_genesis(gen, identity, callsign)
581 } else {
582 let config = PeatMeshConfig::new(identity.node_id(), callsign, "RESTORED");
583 Self::with_identity(config, identity)
584 };
585
586 let restored_registry = state.restore_registry();
588 if let Ok(mut registry) = mesh.identity_registry.lock() {
589 *registry = restored_registry;
590 }
591
592 log::info!(
593 "PeatMesh restored from persisted state: node_id={:08X}, known_peers={}",
594 mesh.config.node_id.as_u32(),
595 mesh.known_identity_count()
596 );
597
598 Ok(mesh)
599 }
600
601 #[cfg(feature = "std")]
614 pub fn to_persisted_state(
615 &self,
616 genesis: Option<&crate::security::MeshGenesis>,
617 ) -> Option<crate::security::PersistedState> {
618 let identity = self.identity.as_ref()?;
619 let registry = self.identity_registry.lock().ok()?;
620
621 Some(crate::security::PersistedState::with_registry(
622 identity, genesis, ®istry,
623 ))
624 }
625
626 #[cfg(feature = "mesh-translator")]
640 pub fn set_decoded_document_callback(&self, cb: Arc<dyn crate::DecodedDocumentCallback>) {
641 if let Ok(mut slot) = self.decoded_document_callback.write() {
642 *slot = Some(cb);
643 }
644 }
645
646 #[cfg(all(feature = "mesh-translator", feature = "uniffi"))]
658 pub fn set_decoded_document_json_callback(
659 &self,
660 cb: Box<dyn crate::DecodedDocumentJsonCallback>,
661 ) {
662 let cb_arc: Arc<dyn crate::DecodedDocumentJsonCallback> = Arc::from(cb);
668 if let Ok(mut slot) = self.decoded_document_json_callback.write() {
669 *slot = Some(cb_arc);
670 }
671 }
672
673 pub fn acknowledge_polled_translator_consumer(&self) {
689 self.polled_translator_consumer_attested
690 .store(true, std::sync::atomic::Ordering::Release);
691 }
692
693 #[cfg(feature = "mesh-translator")]
697 pub fn set_translator_config(&self, config: crate::translator::TranslationConfig) {
698 if let Ok(mut t) = self.ble_translator.write() {
699 *t = crate::translator::BleTranslator::new(config);
700 }
701 }
702
703 #[cfg(feature = "mesh-translator")]
716 fn try_handle_translator_marker(
717 &self,
718 decrypted: &[u8],
719 peer: Option<&str>,
720 source_node: Option<NodeId>,
721 ) -> TranslatorMarkerOutcome {
722 if decrypted.is_empty() {
723 return TranslatorMarkerOutcome::NotTranslatorMarker;
724 }
725 let marker = decrypted[0];
726
727 if (TRANSLATOR_RESERVED_MARKER_START..=TRANSLATOR_RESERVED_MARKER_END).contains(&marker) {
733 log::warn!(
734 "ble: dropping reserved translator-marker frame (marker=0x{marker:02X}, len={})",
735 decrypted.len()
736 );
737 return TranslatorMarkerOutcome::Handled;
738 }
739
740 if marker != TRANSLATOR_FRAME_MARKER {
741 return TranslatorMarkerOutcome::NotTranslatorMarker;
742 }
743
744 if decrypted.len() < 2 {
746 log::warn!(
747 "ble: dropping truncated translator frame (len={}, missing collection code)",
748 decrypted.len()
749 );
750 return TranslatorMarkerOutcome::Handled;
751 }
752
753 let code = decrypted[1];
754 let payload = &decrypted[2..];
755
756 let (collection, decode_result) = {
759 let translator = match self.ble_translator.read() {
760 Ok(g) => g,
761 Err(_) => {
762 log::warn!("ble: translator RwLock poisoned; dropping frame");
763 return TranslatorMarkerOutcome::Handled;
764 }
765 };
766 let collection = match translator.code_to_collection(code) {
767 Some(c) => c.to_string(),
768 None => {
769 log::warn!(
770 "ble: dropping translator frame with unknown collection code 0x{code:02X}"
771 );
772 return TranslatorMarkerOutcome::Handled;
773 }
774 };
775
776 let mut ctx =
777 peat_mesh::transport::TranslationContext::inbound(peer.unwrap_or("unknown"))
778 .with_collection(collection.clone());
779 if let Some(node) = source_node {
780 ctx = ctx.with_local_wire_id(format!("{:08X}", node.as_u32()));
781 }
782
783 let result = translator.decode_inbound_sync(payload, &ctx);
784 (collection, result)
785 };
786
787 match decode_result {
788 Ok(Some(doc)) => {
789 let rust_cb = self
800 .decoded_document_callback
801 .read()
802 .ok()
803 .and_then(|g| g.as_ref().cloned());
804
805 #[cfg(feature = "uniffi")]
806 let json_cb = self
807 .decoded_document_json_callback
808 .read()
809 .ok()
810 .and_then(|g| g.as_ref().cloned());
811 #[cfg(not(feature = "uniffi"))]
812 let json_cb: Option<()> = None;
813
814 let polled_attested = self
815 .polled_translator_consumer_attested
816 .load(std::sync::atomic::Ordering::Acquire);
817
818 let any_consumer = rust_cb.is_some() || json_cb.is_some() || polled_attested;
819
820 if let Some(cb) = &rust_cb {
823 cb.on_document(&collection, doc.clone(), peer);
824 }
825
826 let doc_json_result = serde_json::to_string(&doc);
833
834 #[cfg(feature = "uniffi")]
835 if let (Some(json_cb), Ok(ref doc_json)) = (json_cb, &doc_json_result) {
836 json_cb.on_document(
837 collection.clone(),
838 doc_json.clone(),
839 peer.map(str::to_string),
840 );
841 }
842
843 if let Err(ref e) = doc_json_result {
844 log::warn!(
845 "ble: failed to serialize decoded {} doc to JSON: {}",
846 collection,
847 e
848 );
849 }
850
851 if !any_consumer {
852 log::debug!(
862 "ble: decoded {} frame but no consumer registered (no callback, no polled attestation)",
863 collection
864 );
865 self.notify(crate::observer::PeatEvent::TranslatorNoCallback {
866 collection: collection.clone(),
867 peer: peer.map(str::to_string),
868 });
869 }
870
871 if let Ok(doc_json) = doc_json_result {
877 return TranslatorMarkerOutcome::Decoded(DecodedTranslatorFrame {
878 collection,
879 doc_json,
880 peer: peer.map(str::to_string),
881 });
882 }
883 }
884 Ok(None) => {
885 log::debug!(
886 "ble: codec declined translator frame for collection {}",
887 collection
888 );
889 }
890 Err(e) => {
891 log::warn!(
892 "ble: translator frame decode error (collection={}): {:#}",
893 collection,
894 e
895 );
896 }
897 }
898
899 TranslatorMarkerOutcome::Handled
900 }
901
902 pub fn is_encryption_enabled(&self) -> bool {
906 self.encryption_key.is_some()
907 }
908
909 pub fn is_strict_encryption_enabled(&self) -> bool {
913 self.config.strict_encryption && self.encryption_key.is_some()
914 }
915
916 pub fn enable_encryption(&mut self, secret: &[u8; 32]) {
921 self.encryption_key = Some(MeshEncryptionKey::from_shared_secret(
922 &self.config.mesh_id,
923 secret,
924 ));
925 }
926
927 pub fn disable_encryption(&mut self) {
929 self.encryption_key = None;
930 }
931
932 fn encrypt_document(&self, plaintext: &[u8]) -> Vec<u8> {
937 match &self.encryption_key {
938 Some(key) => {
939 match key.encrypt_to_bytes(plaintext) {
941 Ok(ciphertext) => {
942 let mut buf = Vec::with_capacity(2 + ciphertext.len());
943 buf.push(ENCRYPTED_MARKER);
944 buf.push(0x00); buf.extend_from_slice(&ciphertext);
946 buf
947 }
948 Err(e) => {
949 log::error!("Encryption failed: {}", e);
950 plaintext.to_vec()
952 }
953 }
954 }
955 None => plaintext.to_vec(),
956 }
957 }
958
959 fn decrypt_document<'a>(
967 &self,
968 data: &'a [u8],
969 source_hint: Option<&str>,
970 ) -> Option<std::borrow::Cow<'a, [u8]>> {
971 log::debug!(
972 "decrypt_document: len={}, first_byte=0x{:02X}, source={:?}",
973 data.len(),
974 data.first().copied().unwrap_or(0),
975 source_hint
976 );
977
978 if data.len() >= 2 && data[0] == ENCRYPTED_MARKER {
980 let _reserved = data[1];
982 let encrypted_payload = &data[2..];
983
984 log::debug!(
985 "decrypt_document: encrypted payload len={}, nonce+ciphertext",
986 encrypted_payload.len()
987 );
988
989 match &self.encryption_key {
990 Some(key) => match key.decrypt_from_bytes(encrypted_payload) {
991 Ok(plaintext) => {
992 log::debug!(
993 "decrypt_document: SUCCESS, plaintext len={}",
994 plaintext.len()
995 );
996 Some(std::borrow::Cow::Owned(plaintext))
997 }
998 Err(e) => {
999 log::warn!(
1000 "decrypt_document: FAILED (wrong key or corrupted): {} [payload_len={}, source={:?}]",
1001 e,
1002 encrypted_payload.len(),
1003 source_hint
1004 );
1005 self.notify(PeatEvent::SecurityViolation {
1006 kind: SecurityViolationKind::DecryptionFailed,
1007 source: source_hint.map(String::from),
1008 });
1009 None
1010 }
1011 },
1012 None => {
1013 log::warn!(
1014 "decrypt_document: encryption not enabled but received encrypted doc"
1015 );
1016 None
1017 }
1018 }
1019 } else {
1020 if self.config.strict_encryption && self.encryption_key.is_some() {
1023 log::warn!(
1024 "Rejected unencrypted document in strict encryption mode (source: {:?})",
1025 source_hint
1026 );
1027 self.notify(PeatEvent::SecurityViolation {
1028 kind: SecurityViolationKind::UnencryptedInStrictMode,
1029 source: source_hint.map(String::from),
1030 });
1031 None
1032 } else {
1033 Some(std::borrow::Cow::Borrowed(data))
1035 }
1036 }
1037 }
1038
1039 pub fn decrypt_only(&self, data: &[u8]) -> Option<Vec<u8>> {
1053 self.decrypt_document(data, None)
1054 .map(|cow| cow.into_owned())
1055 }
1056
1057 pub fn has_identity(&self) -> bool {
1061 self.identity.is_some()
1062 }
1063
1064 pub fn public_key(&self) -> Option<[u8; 32]> {
1066 self.identity.as_ref().map(|id| id.public_key())
1067 }
1068
1069 pub fn create_attestation(&self, now_ms: u64) -> Option<IdentityAttestation> {
1073 self.identity
1074 .as_ref()
1075 .map(|id| id.create_attestation(now_ms))
1076 }
1077
1078 pub fn verify_peer_identity(&self, attestation: &IdentityAttestation) -> RegistryResult {
1087 self.identity_registry
1088 .lock()
1089 .unwrap()
1090 .verify_or_register(attestation)
1091 }
1092
1093 pub fn is_peer_identity_known(&self, node_id: NodeId) -> bool {
1095 self.identity_registry.lock().unwrap().is_known(node_id)
1096 }
1097
1098 pub fn peer_public_key(&self, node_id: NodeId) -> Option<[u8; 32]> {
1100 self.identity_registry
1101 .lock()
1102 .unwrap()
1103 .get_public_key(node_id)
1104 .copied()
1105 }
1106
1107 pub fn known_identity_count(&self) -> usize {
1109 self.identity_registry.lock().unwrap().len()
1110 }
1111
1112 pub fn pre_register_peer_identity(&self, node_id: NodeId, public_key: [u8; 32], now_ms: u64) {
1117 self.identity_registry
1118 .lock()
1119 .unwrap()
1120 .pre_register(node_id, public_key, now_ms);
1121 }
1122
1123 pub fn forget_peer_identity(&self, node_id: NodeId) {
1127 self.identity_registry.lock().unwrap().remove(node_id);
1128 }
1129
1130 pub fn sign(&self, data: &[u8]) -> Option<[u8; 64]> {
1134 self.identity.as_ref().map(|id| id.sign(data))
1135 }
1136
1137 pub fn verify_peer_signature(
1142 &self,
1143 node_id: NodeId,
1144 data: &[u8],
1145 signature: &[u8; 64],
1146 ) -> bool {
1147 if let Some(public_key) = self.peer_public_key(node_id) {
1148 crate::security::verify_signature(&public_key, data, signature)
1149 } else {
1150 false
1151 }
1152 }
1153
1154 pub fn is_relay_enabled(&self) -> bool {
1158 self.config.enable_relay
1159 }
1160
1161 pub fn enable_relay(&mut self) {
1163 self.config.enable_relay = true;
1164 }
1165
1166 pub fn disable_relay(&mut self) {
1168 self.config.enable_relay = false;
1169 }
1170
1171 pub fn has_seen_message(&self, message_id: &MessageId) -> bool {
1175 self.seen_cache.lock().unwrap().has_seen(message_id)
1176 }
1177
1178 pub fn mark_message_seen(&self, message_id: MessageId, origin: NodeId, now_ms: u64) -> bool {
1182 self.seen_cache
1183 .lock()
1184 .unwrap()
1185 .check_and_mark(message_id, origin, now_ms)
1186 }
1187
1188 pub fn seen_cache_size(&self) -> usize {
1190 self.seen_cache.lock().unwrap().len()
1191 }
1192
1193 pub fn clear_seen_cache(&self) {
1195 self.seen_cache.lock().unwrap().clear();
1196 }
1197
1198 pub fn wrap_for_relay(&self, payload: Vec<u8>) -> Vec<u8> {
1203 let envelope = RelayEnvelope::broadcast(self.config.node_id, payload)
1204 .with_max_hops(self.config.max_relay_hops);
1205 envelope.encode()
1206 }
1207
1208 pub fn get_relay_targets(&self, exclude_peer: Option<NodeId>) -> Vec<PeatPeer> {
1213 let connected = self.peer_manager.get_connected_peers();
1214 let filtered: Vec<_> = if let Some(exclude) = exclude_peer {
1215 connected
1216 .into_iter()
1217 .filter(|p| p.node_id != exclude)
1218 .collect()
1219 } else {
1220 connected
1221 };
1222
1223 self.gossip_strategy
1224 .select_peers(&filtered)
1225 .into_iter()
1226 .cloned()
1227 .collect()
1228 }
1229
1230 pub fn process_relay_envelope(
1240 &self,
1241 data: &[u8],
1242 source_peer: NodeId,
1243 now_ms: u64,
1244 ) -> Option<RelayDecision> {
1245 let envelope = RelayEnvelope::decode(data)?;
1247
1248 if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
1251 let is_new = self.connection_graph.lock().unwrap().on_relay_received(
1252 source_peer,
1253 envelope.origin_node,
1254 envelope.hop_count,
1255 now_ms,
1256 );
1257
1258 if is_new {
1259 log::debug!(
1260 "Discovered indirect peer {:08X} via {:08X} ({} hops)",
1261 envelope.origin_node.as_u32(),
1262 source_peer.as_u32(),
1263 envelope.hop_count
1264 );
1265 }
1266 }
1267
1268 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
1270 let stats = self
1272 .seen_cache
1273 .lock()
1274 .unwrap()
1275 .get_stats(&envelope.message_id);
1276 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
1277
1278 self.notify(PeatEvent::DuplicateMessageDropped {
1279 origin_node: envelope.origin_node,
1280 seen_count,
1281 });
1282
1283 log::debug!(
1284 "Dropped duplicate message {} from {:08X} (seen {} times)",
1285 envelope.message_id,
1286 envelope.origin_node.as_u32(),
1287 seen_count
1288 );
1289 return None;
1290 }
1291
1292 if !envelope.can_relay() {
1294 self.notify(PeatEvent::MessageTtlExpired {
1295 origin_node: envelope.origin_node,
1296 hop_count: envelope.hop_count,
1297 });
1298
1299 log::debug!(
1300 "Message {} from {:08X} TTL expired at hop {}",
1301 envelope.message_id,
1302 envelope.origin_node.as_u32(),
1303 envelope.hop_count
1304 );
1305
1306 return Some(RelayDecision {
1308 payload: envelope.payload,
1309 origin_node: envelope.origin_node,
1310 hop_count: envelope.hop_count,
1311 should_relay: false,
1312 relay_envelope: None,
1313 });
1314 }
1315
1316 let should_relay = self.config.enable_relay;
1318 let relay_envelope = if should_relay {
1319 envelope.relay() } else {
1321 None
1322 };
1323
1324 Some(RelayDecision {
1325 payload: envelope.payload,
1326 origin_node: envelope.origin_node,
1327 hop_count: envelope.hop_count,
1328 should_relay,
1329 relay_envelope,
1330 })
1331 }
1332
1333 pub fn build_relay_document(&self) -> Vec<u8> {
1338 let doc = self.build_document(); self.wrap_for_relay(doc)
1340 }
1341
1342 pub fn register_peer_for_delta(&self, peer_id: &NodeId) {
1349 let mut encoder = self.delta_encoder.lock().unwrap();
1350 encoder.add_peer(peer_id);
1351 log::debug!(
1352 "Registered peer {:08X} for delta sync tracking",
1353 peer_id.as_u32()
1354 );
1355 }
1356
1357 pub fn unregister_peer_for_delta(&self, peer_id: &NodeId) {
1361 let mut encoder = self.delta_encoder.lock().unwrap();
1362 encoder.remove_peer(peer_id);
1363 log::debug!(
1364 "Unregistered peer {:08X} from delta sync tracking",
1365 peer_id.as_u32()
1366 );
1367 }
1368
1369 pub fn reset_peer_delta_state(&self, peer_id: &NodeId) {
1374 let mut encoder = self.delta_encoder.lock().unwrap();
1375 encoder.reset_peer(peer_id);
1376 log::debug!("Reset delta sync state for peer {:08X}", peer_id.as_u32());
1377 }
1378
1379 pub fn record_delta_sent(&self, peer_id: &NodeId, bytes: usize) {
1381 let mut encoder = self.delta_encoder.lock().unwrap();
1382 encoder.record_sent(peer_id, bytes);
1383 }
1384
1385 pub fn record_delta_received(&self, peer_id: &NodeId, bytes: usize, timestamp: u64) {
1387 let mut encoder = self.delta_encoder.lock().unwrap();
1388 encoder.record_received(peer_id, bytes, timestamp);
1389 }
1390
1391 pub fn delta_stats(&self) -> DeltaStats {
1396 self.delta_encoder.lock().unwrap().stats()
1397 }
1398
1399 pub fn peer_delta_stats(&self, peer_id: &NodeId) -> Option<(u64, u64, u32)> {
1403 let encoder = self.delta_encoder.lock().unwrap();
1404 encoder
1405 .get_peer_state(peer_id)
1406 .map(|state| (state.bytes_sent, state.bytes_received, state.sync_count))
1407 }
1408
1409 pub fn build_delta_document_for_peer(&self, peer_id: &NodeId, now_ms: u64) -> Option<Vec<u8>> {
1417 let mut all_operations: Vec<Operation> = Vec::new();
1419
1420 for (node_id_u32, count) in self.document_sync.counter_entries() {
1423 all_operations.push(Operation::IncrementCounter {
1424 counter_id: 0, node_id: NodeId::new(node_id_u32),
1426 amount: count,
1427 timestamp: count, });
1429 }
1430
1431 let peripheral = self.document_sync.peripheral_snapshot();
1434 let peripheral_timestamp = peripheral
1435 .last_event
1436 .as_ref()
1437 .map(|e| e.timestamp)
1438 .unwrap_or(1); all_operations.push(Operation::UpdatePeripheral {
1440 peripheral,
1441 timestamp: peripheral_timestamp,
1442 });
1443
1444 if let Some(emergency) = self.document_sync.emergency_snapshot() {
1446 let source_node = NodeId::new(emergency.source_node());
1447 let timestamp = emergency.timestamp();
1448
1449 all_operations.push(Operation::SetEmergency {
1451 source_node,
1452 timestamp,
1453 known_peers: emergency.all_nodes(),
1454 });
1455
1456 for acked_node in emergency.acked_nodes() {
1458 all_operations.push(Operation::AckEmergency {
1459 node_id: NodeId::new(acked_node),
1460 emergency_timestamp: timestamp,
1461 });
1462 }
1463 }
1464
1465 for app_op in self.app_document_delta_ops() {
1467 all_operations.push(Operation::App(app_op));
1468 }
1469
1470 let filtered_operations: Vec<Operation> = {
1472 let encoder = self.delta_encoder.lock().unwrap();
1473 if let Some(peer_state) = encoder.get_peer_state(peer_id) {
1474 all_operations
1475 .into_iter()
1476 .filter(|op| peer_state.needs_send(&op.key(), op.timestamp()))
1477 .collect()
1478 } else {
1479 all_operations
1481 }
1482 };
1483
1484 if filtered_operations.is_empty() {
1486 return None;
1487 }
1488
1489 {
1491 let mut encoder = self.delta_encoder.lock().unwrap();
1492 if let Some(peer_state) = encoder.get_peer_state_mut(peer_id) {
1493 for op in &filtered_operations {
1494 peer_state.mark_sent(&op.key(), op.timestamp());
1495 }
1496 }
1497 }
1498
1499 let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1501 for op in filtered_operations {
1502 delta.add_operation(op);
1503 }
1504
1505 let encoded = delta.encode();
1507 let result = self.encrypt_document(&encoded);
1508
1509 {
1511 let mut encoder = self.delta_encoder.lock().unwrap();
1512 encoder.record_sent(peer_id, result.len());
1513 }
1514
1515 Some(result)
1516 }
1517
1518 pub fn build_full_delta_document(&self, now_ms: u64) -> Vec<u8> {
1523 let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1524
1525 for (node_id_u32, count) in self.document_sync.counter_entries() {
1527 delta.add_operation(Operation::IncrementCounter {
1528 counter_id: 0,
1529 node_id: NodeId::new(node_id_u32),
1530 amount: count,
1531 timestamp: now_ms,
1532 });
1533 }
1534
1535 let peripheral = self.document_sync.peripheral_snapshot();
1537 let peripheral_timestamp = peripheral
1538 .last_event
1539 .as_ref()
1540 .map(|e| e.timestamp)
1541 .unwrap_or(now_ms);
1542 delta.add_operation(Operation::UpdatePeripheral {
1543 peripheral,
1544 timestamp: peripheral_timestamp,
1545 });
1546
1547 if let Some(emergency) = self.document_sync.emergency_snapshot() {
1549 let source_node = NodeId::new(emergency.source_node());
1550 let timestamp = emergency.timestamp();
1551
1552 delta.add_operation(Operation::SetEmergency {
1553 source_node,
1554 timestamp,
1555 known_peers: emergency.all_nodes(),
1556 });
1557
1558 for acked_node in emergency.acked_nodes() {
1559 delta.add_operation(Operation::AckEmergency {
1560 node_id: NodeId::new(acked_node),
1561 emergency_timestamp: timestamp,
1562 });
1563 }
1564 }
1565
1566 for app_op in self.app_document_delta_ops() {
1568 delta.add_operation(Operation::App(app_op));
1569 }
1570
1571 let encoded = delta.encode();
1572 self.encrypt_document(&encoded)
1573 }
1574
1575 fn process_delta_document_internal(
1579 &self,
1580 source_node: NodeId,
1581 data: &[u8],
1582 now_ms: u64,
1583 relay_data: Option<Vec<u8>>,
1584 origin_node: Option<NodeId>,
1585 hop_count: u8,
1586 ) -> Option<DataReceivedResult> {
1587 let delta = DeltaDocument::decode(data)?;
1589
1590 if delta.origin_node == self.config.node_id {
1592 return None;
1593 }
1594
1595 let mut counter_changed = false;
1597 let mut emergency_changed = false;
1598 let mut is_emergency = false;
1599 let mut is_ack = false;
1600 let mut event_timestamp = 0u64;
1601 let mut peer_peripheral: Option<crate::sync::crdt::Peripheral> = None;
1602
1603 log::info!(
1604 "Delta document from {:08X}: {} operations, data_len={}",
1605 delta.origin_node.as_u32(),
1606 delta.operations.len(),
1607 data.len()
1608 );
1609 for op in &delta.operations {
1610 log::info!(" Operation: {}", op.key());
1611 match op {
1612 Operation::IncrementCounter {
1613 node_id, amount, ..
1614 } => {
1615 let current = self.document_sync.counter_entries();
1617 let current_value = current
1618 .iter()
1619 .find(|(id, _)| *id == node_id.as_u32())
1620 .map(|(_, v)| *v)
1621 .unwrap_or(0);
1622
1623 if *amount > current_value {
1624 counter_changed = true;
1627 }
1628 }
1629 Operation::UpdatePeripheral {
1630 peripheral,
1631 timestamp,
1632 } => {
1633 if let Ok(mut peripherals) = self.peer_peripherals.write() {
1635 peripherals.insert(delta.origin_node, peripheral.clone());
1636 }
1637 peer_peripheral = Some(peripheral.clone());
1639 if *timestamp > event_timestamp {
1641 event_timestamp = *timestamp;
1642 }
1643 }
1644 Operation::SetEmergency { timestamp, .. } => {
1645 is_emergency = true;
1646 emergency_changed = true;
1647 event_timestamp = *timestamp;
1648 }
1649 Operation::AckEmergency {
1650 emergency_timestamp,
1651 ..
1652 } => {
1653 is_ack = true;
1654 emergency_changed = true;
1655 if *emergency_timestamp > event_timestamp {
1656 event_timestamp = *emergency_timestamp;
1657 }
1658 }
1659 Operation::ClearEmergency {
1660 emergency_timestamp,
1661 } => {
1662 emergency_changed = true;
1663 if *emergency_timestamp > event_timestamp {
1664 event_timestamp = *emergency_timestamp;
1665 }
1666 }
1667 Operation::App(app_op) => {
1668 let doc_timestamp = app_op.timestamp & 0x0000_FFFF_FFFF_FFFF;
1674
1675 log::info!(
1676 "App operation received: type={:02X} op_code={:02X} from {:08X} ts={} payload_len={}",
1677 app_op.type_id,
1678 app_op.op_code,
1679 app_op.source_node,
1680 doc_timestamp,
1681 app_op.payload.len()
1682 );
1683
1684 let doc_key = (app_op.type_id, app_op.source_node, doc_timestamp);
1686 let changed = {
1687 let mut docs = self.app_documents.write().unwrap();
1688
1689 if let Some(existing) = docs.get_mut(&doc_key) {
1690 self.document_registry.apply_delta_op(
1692 app_op.type_id,
1693 existing.as_mut(),
1694 app_op,
1695 )
1696 } else {
1697 if let Some(decoded) = self
1699 .document_registry
1700 .decode(app_op.type_id, &app_op.payload)
1701 {
1702 docs.insert(doc_key, decoded);
1703 true
1704 } else {
1705 log::debug!(
1708 "Received delta for unknown doc {:?}, waiting for full state",
1709 doc_key
1710 );
1711 false
1712 }
1713 }
1714 };
1715
1716 self.observers.notify(PeatEvent::app_document_received(
1718 app_op.type_id,
1719 NodeId::new(app_op.source_node),
1720 doc_timestamp,
1721 changed,
1722 ));
1723 }
1724 }
1725 }
1726
1727 self.peer_manager.record_sync(source_node, now_ms);
1729
1730 {
1732 let mut encoder = self.delta_encoder.lock().unwrap();
1733 encoder.record_received(&source_node, data.len(), now_ms);
1734 }
1735
1736 if is_emergency {
1738 self.notify(PeatEvent::EmergencyReceived {
1739 from_node: delta.origin_node,
1740 });
1741 } else if is_ack {
1742 self.notify(PeatEvent::AckReceived {
1743 from_node: delta.origin_node,
1744 });
1745 }
1746
1747 if counter_changed {
1748 let total_count = self.document_sync.total_count();
1749 self.notify(PeatEvent::DocumentSynced {
1750 from_node: delta.origin_node,
1751 total_count,
1752 });
1753 }
1754
1755 if relay_data.is_some() {
1757 let relay_targets = self.get_relay_targets(Some(source_node));
1758 self.notify(PeatEvent::MessageRelayed {
1759 origin_node: origin_node.unwrap_or(delta.origin_node),
1760 relay_count: relay_targets.len(),
1761 hop_count,
1762 });
1763 }
1764
1765 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
1766 DataReceivedResult::peripheral_fields(&peer_peripheral);
1767
1768 Some(DataReceivedResult {
1769 source_node: delta.origin_node,
1770 is_emergency,
1771 is_ack,
1772 counter_changed,
1773 emergency_changed,
1774 total_count: self.document_sync.total_count(),
1775 event_timestamp,
1776 relay_data,
1777 origin_node,
1778 hop_count,
1779 callsign,
1780 battery_percent,
1781 heart_rate,
1782 event_type,
1783 latitude,
1784 longitude,
1785 altitude,
1786 ..Default::default()
1787 })
1788 }
1789
1790 pub fn enable_peer_e2ee(&self) {
1798 let mut sessions = self.peer_sessions.lock().unwrap();
1799 if sessions.is_none() {
1800 *sessions = Some(PeerSessionManager::new(self.config.node_id));
1801 log::info!(
1802 "Per-peer E2EE enabled for node {:08X}",
1803 self.config.node_id.as_u32()
1804 );
1805 }
1806 }
1807
1808 pub fn disable_peer_e2ee(&self) {
1812 let mut sessions = self.peer_sessions.lock().unwrap();
1813 *sessions = None;
1814 log::info!("Per-peer E2EE disabled");
1815 }
1816
1817 pub fn is_peer_e2ee_enabled(&self) -> bool {
1819 self.peer_sessions.lock().unwrap().is_some()
1820 }
1821
1822 pub fn peer_e2ee_public_key(&self) -> Option<[u8; 32]> {
1826 self.peer_sessions
1827 .lock()
1828 .unwrap()
1829 .as_ref()
1830 .map(|s| s.our_public_key())
1831 }
1832
1833 pub fn initiate_peer_e2ee(&self, peer_node_id: NodeId, now_ms: u64) -> Option<Vec<u8>> {
1839 let mut sessions = self.peer_sessions.lock().unwrap();
1840 let session_mgr = sessions.as_mut()?;
1841
1842 let key_exchange = session_mgr.initiate_session(peer_node_id, now_ms);
1843 let mut buf = Vec::with_capacity(2 + 37);
1844 buf.push(KEY_EXCHANGE_MARKER);
1845 buf.push(0x00); buf.extend_from_slice(&key_exchange.encode());
1847
1848 log::info!(
1849 "Initiated E2EE session with peer {:08X}",
1850 peer_node_id.as_u32()
1851 );
1852 Some(buf)
1853 }
1854
1855 pub fn has_peer_e2ee_session(&self, peer_node_id: NodeId) -> bool {
1857 self.peer_sessions
1858 .lock()
1859 .unwrap()
1860 .as_ref()
1861 .is_some_and(|s| s.has_session(peer_node_id))
1862 }
1863
1864 pub fn peer_e2ee_session_state(&self, peer_node_id: NodeId) -> Option<SessionState> {
1866 self.peer_sessions
1867 .lock()
1868 .unwrap()
1869 .as_ref()
1870 .and_then(|s| s.session_state(peer_node_id))
1871 }
1872
1873 pub fn send_peer_e2ee(
1878 &self,
1879 peer_node_id: NodeId,
1880 plaintext: &[u8],
1881 now_ms: u64,
1882 ) -> Option<Vec<u8>> {
1883 let mut sessions = self.peer_sessions.lock().unwrap();
1884 let session_mgr = sessions.as_mut()?;
1885
1886 match session_mgr.encrypt_for_peer(peer_node_id, plaintext, now_ms) {
1887 Ok(encrypted) => {
1888 let mut buf = Vec::with_capacity(2 + encrypted.encode().len());
1889 buf.push(PEER_E2EE_MARKER);
1890 buf.push(0x00); buf.extend_from_slice(&encrypted.encode());
1892 Some(buf)
1893 }
1894 Err(e) => {
1895 log::warn!(
1896 "Failed to encrypt for peer {:08X}: {:?}",
1897 peer_node_id.as_u32(),
1898 e
1899 );
1900 None
1901 }
1902 }
1903 }
1904
1905 pub fn close_peer_e2ee(&self, peer_node_id: NodeId) {
1907 let mut sessions = self.peer_sessions.lock().unwrap();
1908 if let Some(session_mgr) = sessions.as_mut() {
1909 session_mgr.close_session(peer_node_id);
1910 self.notify(PeatEvent::PeerE2eeClosed { peer_node_id });
1911 log::info!(
1912 "Closed E2EE session with peer {:08X}",
1913 peer_node_id.as_u32()
1914 );
1915 }
1916 }
1917
1918 pub fn peer_e2ee_session_count(&self) -> usize {
1920 self.peer_sessions
1921 .lock()
1922 .unwrap()
1923 .as_ref()
1924 .map(|s| s.session_count())
1925 .unwrap_or(0)
1926 }
1927
1928 pub fn peer_e2ee_established_count(&self) -> usize {
1930 self.peer_sessions
1931 .lock()
1932 .unwrap()
1933 .as_ref()
1934 .map(|s| s.established_count())
1935 .unwrap_or(0)
1936 }
1937
1938 fn handle_key_exchange(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1943 if data.len() < 2 || data[0] != KEY_EXCHANGE_MARKER {
1944 return None;
1945 }
1946
1947 let payload = &data[2..];
1948 let msg = KeyExchangeMessage::decode(payload)?;
1949
1950 let mut sessions = self.peer_sessions.lock().unwrap();
1951 let session_mgr = sessions.as_mut()?;
1952
1953 let (response, established) = session_mgr.handle_key_exchange(&msg, now_ms)?;
1954
1955 if established {
1956 self.notify(PeatEvent::PeerE2eeEstablished {
1957 peer_node_id: msg.sender_node_id,
1958 });
1959 log::info!(
1960 "E2EE session established with peer {:08X}",
1961 msg.sender_node_id.as_u32()
1962 );
1963 }
1964
1965 let mut buf = Vec::with_capacity(2 + 37);
1967 buf.push(KEY_EXCHANGE_MARKER);
1968 buf.push(0x00);
1969 buf.extend_from_slice(&response.encode());
1970 Some(buf)
1971 }
1972
1973 fn handle_peer_e2ee_message(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1978 if data.len() < 2 || data[0] != PEER_E2EE_MARKER {
1979 return None;
1980 }
1981
1982 let payload = &data[2..];
1983 let msg = PeerEncryptedMessage::decode(payload)?;
1984
1985 let mut sessions = self.peer_sessions.lock().unwrap();
1986 let session_mgr = sessions.as_mut()?;
1987
1988 match session_mgr.decrypt_from_peer(&msg, now_ms) {
1989 Ok(plaintext) => {
1990 self.notify(PeatEvent::PeerE2eeMessageReceived {
1992 from_node: msg.sender_node_id,
1993 data: plaintext.clone(),
1994 });
1995 Some(plaintext)
1996 }
1997 Err(e) => {
1998 log::warn!(
1999 "Failed to decrypt E2EE message from {:08X}: {:?}",
2000 msg.sender_node_id.as_u32(),
2001 e
2002 );
2003 None
2004 }
2005 }
2006 }
2007
2008 pub fn node_id(&self) -> NodeId {
2012 self.config.node_id
2013 }
2014
2015 pub fn callsign(&self) -> &str {
2017 &self.config.callsign
2018 }
2019
2020 pub fn mesh_id(&self) -> &str {
2022 &self.config.mesh_id
2023 }
2024
2025 pub fn device_name(&self) -> String {
2027 format!(
2028 "PEAT_{}-{:08X}",
2029 self.config.mesh_id,
2030 self.config.node_id.as_u32()
2031 )
2032 }
2033
2034 pub fn get_peer_callsign(&self, node_id: NodeId) -> Option<String> {
2039 self.peer_peripherals.read().ok().and_then(|peripherals| {
2040 peripherals
2041 .get(&node_id)
2042 .map(|p| p.callsign_str().to_string())
2043 })
2044 }
2045
2046 pub fn get_peer_peripheral(&self, node_id: NodeId) -> Option<Peripheral> {
2051 self.peer_peripherals
2052 .read()
2053 .ok()
2054 .and_then(|peripherals| peripherals.get(&node_id).cloned())
2055 }
2056
2057 pub fn document_registry(&self) -> &DocumentRegistry {
2072 &self.document_registry
2073 }
2074
2075 pub fn store_app_document<T: crate::registry::DocumentType>(&self, doc: T) -> bool {
2082 let type_id = T::TYPE_ID;
2083 let (source_node, timestamp) = doc.identity();
2084 let key = (type_id, source_node, timestamp);
2085
2086 let mut docs = self.app_documents.write().unwrap();
2087
2088 if let Some(existing) = docs.get_mut(&key) {
2089 self.document_registry
2091 .merge(type_id, existing.as_mut(), &doc)
2092 } else {
2093 docs.insert(key, Box::new(doc));
2095 true
2096 }
2097 }
2098
2099 pub fn store_app_document_boxed(
2106 &self,
2107 type_id: u8,
2108 source_node: u32,
2109 timestamp: u64,
2110 doc: Box<dyn core::any::Any + Send + Sync>,
2111 ) -> bool {
2112 let key = (type_id, source_node, timestamp);
2113
2114 let mut docs = self.app_documents.write().unwrap();
2115
2116 if let Some(existing) = docs.get_mut(&key) {
2117 self.document_registry
2119 .merge(type_id, existing.as_mut(), doc.as_ref())
2120 } else {
2121 docs.insert(key, doc);
2123 true
2124 }
2125 }
2126
2127 pub fn get_app_document<T: crate::registry::DocumentType>(
2131 &self,
2132 source_node: u32,
2133 timestamp: u64,
2134 ) -> Option<T> {
2135 let key = (T::TYPE_ID, source_node, timestamp);
2136
2137 let docs = self.app_documents.read().unwrap();
2138 docs.get(&key).and_then(|d| d.downcast_ref::<T>()).cloned()
2139 }
2140
2141 pub fn get_all_app_documents_of_type<T: crate::registry::DocumentType>(&self) -> Vec<T> {
2145 let docs = self.app_documents.read().unwrap();
2146 docs.iter()
2147 .filter(|((type_id, _, _), _)| *type_id == T::TYPE_ID)
2148 .filter_map(|(_, doc)| doc.downcast_ref::<T>().cloned())
2149 .collect()
2150 }
2151
2152 pub fn app_document_delta_ops(&self) -> Vec<crate::registry::AppOperation> {
2156 let docs = self.app_documents.read().unwrap();
2157 let mut ops = Vec::new();
2158
2159 for ((type_id, _source, _ts), doc) in docs.iter() {
2160 if let Some(op) = self.document_registry.to_delta_op(*type_id, doc.as_ref()) {
2161 ops.push(op);
2162 }
2163 }
2164
2165 ops
2166 }
2167
2168 pub fn app_document_keys(&self, type_id: u8) -> Vec<(u32, u64)> {
2172 let docs = self.app_documents.read().unwrap();
2173 docs.keys()
2174 .filter(|(tid, _, _)| *tid == type_id)
2175 .map(|(_, source, ts)| (*source, *ts))
2176 .collect()
2177 }
2178
2179 pub fn app_document_count(&self) -> usize {
2181 self.app_documents.read().unwrap().len()
2182 }
2183
2184 pub fn add_observer(&self, observer: Arc<dyn PeatObserver>) {
2188 self.observers.add(observer);
2189 }
2190
2191 pub fn remove_observer(&self, observer: &Arc<dyn PeatObserver>) {
2193 self.observers.remove(observer);
2194 }
2195
2196 pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
2203 let data = self.document_sync.send_emergency(timestamp);
2204 self.notify(PeatEvent::MeshStateChanged {
2205 peer_count: self.peer_manager.peer_count(),
2206 connected_count: self.peer_manager.connected_count(),
2207 });
2208 self.encrypt_document(&data)
2209 }
2210
2211 pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
2216 let data = self.document_sync.send_ack(timestamp);
2217 self.notify(PeatEvent::MeshStateChanged {
2218 peer_count: self.peer_manager.peer_count(),
2219 connected_count: self.peer_manager.connected_count(),
2220 });
2221 self.encrypt_document(&data)
2222 }
2223
2224 pub fn broadcast_bytes(&self, payload: &[u8]) -> Vec<u8> {
2231 self.encrypt_document(payload)
2232 }
2233
2234 pub fn clear_event(&self) {
2236 self.document_sync.clear_event();
2237 }
2238
2239 pub fn is_emergency_active(&self) -> bool {
2241 self.document_sync.is_emergency_active()
2242 }
2243
2244 pub fn is_ack_active(&self) -> bool {
2246 self.document_sync.is_ack_active()
2247 }
2248
2249 pub fn current_event(&self) -> Option<EventType> {
2251 self.document_sync.current_event()
2252 }
2253
2254 pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
2263 let data = self.document_sync.start_emergency(timestamp, known_peers);
2264 self.notify(PeatEvent::MeshStateChanged {
2265 peer_count: self.peer_manager.peer_count(),
2266 connected_count: self.peer_manager.connected_count(),
2267 });
2268 self.encrypt_document(&data)
2269 }
2270
2271 pub fn start_emergency_with_known_peers(&self, timestamp: u64) -> Vec<u8> {
2275 let peers: Vec<u32> = self
2276 .peer_manager
2277 .get_peers()
2278 .iter()
2279 .map(|p| p.node_id.as_u32())
2280 .collect();
2281 self.start_emergency(timestamp, &peers)
2282 }
2283
2284 pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
2289 let result = self.document_sync.ack_emergency(timestamp);
2290 if result.is_some() {
2291 self.notify(PeatEvent::MeshStateChanged {
2292 peer_count: self.peer_manager.peer_count(),
2293 connected_count: self.peer_manager.connected_count(),
2294 });
2295 }
2296 result.map(|data| self.encrypt_document(&data))
2297 }
2298
2299 pub fn clear_emergency(&self) {
2301 self.document_sync.clear_emergency();
2302 }
2303
2304 pub fn has_active_emergency(&self) -> bool {
2306 self.document_sync.has_active_emergency()
2307 }
2308
2309 pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
2313 self.document_sync.get_emergency_status()
2314 }
2315
2316 pub fn has_peer_acked(&self, peer_id: u32) -> bool {
2318 self.document_sync.has_peer_acked(peer_id)
2319 }
2320
2321 pub fn all_peers_acked(&self) -> bool {
2323 self.document_sync.all_peers_acked()
2324 }
2325
2326 #[cfg(feature = "legacy-chat")]
2336 pub fn send_chat(&self, sender: &str, text: &str, timestamp: u64) -> Option<Vec<u8>> {
2337 if self.document_sync.add_chat_message(sender, text, timestamp) {
2338 Some(self.encrypt_document(&self.build_document()))
2339 } else {
2340 None
2341 }
2342 }
2343
2344 #[cfg(feature = "legacy-chat")]
2352 pub fn send_chat_reply(
2353 &self,
2354 sender: &str,
2355 text: &str,
2356 reply_to_node: u32,
2357 reply_to_timestamp: u64,
2358 timestamp: u64,
2359 ) -> Option<Vec<u8>> {
2360 if self.document_sync.add_chat_reply(
2361 sender,
2362 text,
2363 reply_to_node,
2364 reply_to_timestamp,
2365 timestamp,
2366 ) {
2367 Some(self.encrypt_document(&self.build_document()))
2368 } else {
2369 None
2370 }
2371 }
2372
2373 #[cfg(feature = "legacy-chat")]
2375 pub fn chat_count(&self) -> usize {
2376 self.document_sync.chat_count()
2377 }
2378
2379 #[cfg(feature = "legacy-chat")]
2383 pub fn chat_messages_since(
2384 &self,
2385 since_timestamp: u64,
2386 ) -> Vec<(u32, u64, String, String, u32, u64)> {
2387 self.document_sync.chat_messages_since(since_timestamp)
2388 }
2389
2390 #[cfg(feature = "legacy-chat")]
2394 pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
2395 self.document_sync.all_chat_messages()
2396 }
2397
2398 pub fn on_ble_discovered(
2404 &self,
2405 identifier: &str,
2406 name: Option<&str>,
2407 rssi: i8,
2408 mesh_id: Option<&str>,
2409 now_ms: u64,
2410 ) -> Option<PeatPeer> {
2411 let (node_id, is_new) = self
2412 .peer_manager
2413 .on_discovered(identifier, name, rssi, mesh_id, now_ms)?;
2414
2415 let peer = self.peer_manager.get_peer(node_id)?;
2416
2417 {
2419 let mut graph = self.connection_graph.lock().unwrap();
2420 graph.on_discovered(
2421 node_id,
2422 identifier.to_string(),
2423 name.map(|s| s.to_string()),
2424 mesh_id.map(|s| s.to_string()),
2425 rssi,
2426 now_ms,
2427 );
2428 }
2429
2430 if is_new {
2431 self.notify(PeatEvent::PeerDiscovered { peer: peer.clone() });
2432 self.notify_mesh_state_changed();
2433 }
2434
2435 Some(peer)
2436 }
2437
2438 pub fn on_ble_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
2442 let node_id = match self.peer_manager.on_connected(identifier, now_ms) {
2443 Some(id) => id,
2444 None => {
2445 log::warn!(
2446 "on_ble_connected: identifier {:?} not in peer map — \
2447 use on_incoming_connection() for peripheral connections",
2448 identifier
2449 );
2450 return None;
2451 }
2452 };
2453
2454 {
2456 let mut graph = self.connection_graph.lock().unwrap();
2457 graph.on_connected(node_id, now_ms);
2458 }
2459
2460 self.register_peer_for_delta(&node_id);
2462
2463 self.notify(PeatEvent::PeerConnected { node_id });
2464 self.notify_mesh_state_changed();
2465 Some(node_id)
2466 }
2467
2468 pub fn on_ble_disconnected(
2470 &self,
2471 identifier: &str,
2472 reason: DisconnectReason,
2473 ) -> Option<NodeId> {
2474 let (node_id, observer_reason) = self.peer_manager.on_disconnected(identifier, reason)?;
2475
2476 {
2478 let mut graph = self.connection_graph.lock().unwrap();
2479 let platform_reason = match observer_reason {
2480 DisconnectReason::LocalRequest => crate::platform::DisconnectReason::LocalRequest,
2481 DisconnectReason::RemoteRequest => crate::platform::DisconnectReason::RemoteRequest,
2482 DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2483 DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2484 DisconnectReason::ConnectionFailed => {
2485 crate::platform::DisconnectReason::ConnectionFailed
2486 }
2487 DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2488 };
2489 let now_ms = std::time::SystemTime::now()
2490 .duration_since(std::time::UNIX_EPOCH)
2491 .map(|d| d.as_millis() as u64)
2492 .unwrap_or(0);
2493 graph.on_disconnected(node_id, platform_reason, now_ms);
2494
2495 graph.remove_via_peer(node_id);
2498 }
2499
2500 self.unregister_peer_for_delta(&node_id);
2502
2503 self.notify(PeatEvent::PeerDisconnected {
2504 node_id,
2505 reason: observer_reason,
2506 });
2507 self.notify_mesh_state_changed();
2508 Some(node_id)
2509 }
2510
2511 pub fn on_peer_disconnected(&self, node_id: NodeId, reason: DisconnectReason) {
2515 if self
2516 .peer_manager
2517 .on_disconnected_by_node_id(node_id, reason)
2518 {
2519 {
2521 let mut graph = self.connection_graph.lock().unwrap();
2522 let platform_reason = match reason {
2523 DisconnectReason::LocalRequest => {
2524 crate::platform::DisconnectReason::LocalRequest
2525 }
2526 DisconnectReason::RemoteRequest => {
2527 crate::platform::DisconnectReason::RemoteRequest
2528 }
2529 DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2530 DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2531 DisconnectReason::ConnectionFailed => {
2532 crate::platform::DisconnectReason::ConnectionFailed
2533 }
2534 DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2535 };
2536 let now_ms = std::time::SystemTime::now()
2537 .duration_since(std::time::UNIX_EPOCH)
2538 .map(|d| d.as_millis() as u64)
2539 .unwrap_or(0);
2540 graph.on_disconnected(node_id, platform_reason, now_ms);
2541
2542 graph.remove_via_peer(node_id);
2544 }
2545
2546 self.unregister_peer_for_delta(&node_id);
2548
2549 self.notify(PeatEvent::PeerDisconnected { node_id, reason });
2550 self.notify_mesh_state_changed();
2551 }
2552 }
2553
2554 pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
2558 let is_new = self
2559 .peer_manager
2560 .on_incoming_connection(identifier, node_id, now_ms);
2561
2562 {
2564 let mut graph = self.connection_graph.lock().unwrap();
2565 if is_new {
2566 graph.on_discovered(
2567 node_id,
2568 identifier.to_string(),
2569 None,
2570 Some(self.config.mesh_id.clone()),
2571 -50, now_ms,
2573 );
2574 }
2575 graph.on_connected(node_id, now_ms);
2576 }
2577
2578 self.register_peer_for_delta(&node_id);
2580
2581 if is_new {
2582 if let Some(peer) = self.peer_manager.get_peer(node_id) {
2583 self.notify(PeatEvent::PeerDiscovered { peer });
2584 }
2585 }
2586
2587 self.notify(PeatEvent::PeerConnected { node_id });
2588 self.notify_mesh_state_changed();
2589
2590 is_new
2591 }
2592
2593 pub fn on_ble_data_received(
2600 &self,
2601 identifier: &str,
2602 data: &[u8],
2603 now_ms: u64,
2604 ) -> Option<DataReceivedResult> {
2605 let node_id = self.peer_manager.get_node_id(identifier)?;
2607
2608 if data.len() >= 2 {
2610 match data[0] {
2611 KEY_EXCHANGE_MARKER => {
2612 let _response = self.handle_key_exchange(data, now_ms);
2614 return None;
2616 }
2617 PEER_E2EE_MARKER => {
2618 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2620 return None;
2622 }
2623 RELAY_ENVELOPE_MARKER => {
2624 return self
2626 .handle_relay_envelope_with_identifier(node_id, identifier, data, now_ms);
2627 }
2628 _ => {}
2629 }
2630 }
2631
2632 self.process_document_data_with_identifier(node_id, identifier, data, now_ms, None, None, 0)
2634 }
2635
2636 #[allow(clippy::too_many_arguments)]
2638 fn process_document_data_with_identifier(
2639 &self,
2640 source_node: NodeId,
2641 identifier: &str,
2642 data: &[u8],
2643 now_ms: u64,
2644 relay_data: Option<Vec<u8>>,
2645 origin_node: Option<NodeId>,
2646 hop_count: u8,
2647 ) -> Option<DataReceivedResult> {
2648 let decrypted = self.decrypt_document(data, Some(identifier))?;
2650
2651 #[cfg(feature = "mesh-translator")]
2658 match self.try_handle_translator_marker(&decrypted, Some(identifier), Some(source_node)) {
2659 TranslatorMarkerOutcome::NotTranslatorMarker => {}
2660 TranslatorMarkerOutcome::Handled => return None,
2661 TranslatorMarkerOutcome::Decoded(frame) => {
2662 return Some(DataReceivedResult::translator_frame(source_node, frame));
2663 }
2664 }
2665
2666 if DeltaDocument::is_delta_document(&decrypted) {
2668 return self.process_delta_document_internal(
2669 source_node,
2670 &decrypted,
2671 now_ms,
2672 relay_data,
2673 origin_node,
2674 hop_count,
2675 );
2676 }
2677
2678 let result = self.document_sync.merge_document(&decrypted)?;
2680
2681 if let Some(ref peripheral) = result.peer_peripheral {
2683 if let Ok(mut peripherals) = self.peer_peripherals.write() {
2684 peripherals.insert(result.source_node, peripheral.clone());
2685 }
2686 }
2687
2688 self.peer_manager.record_sync(source_node, now_ms);
2690
2691 if result.is_emergency() {
2693 self.notify(PeatEvent::EmergencyReceived {
2694 from_node: result.source_node,
2695 });
2696 } else if result.is_ack() {
2697 self.notify(PeatEvent::AckReceived {
2698 from_node: result.source_node,
2699 });
2700 }
2701
2702 if result.counter_changed {
2703 self.notify(PeatEvent::DocumentSynced {
2704 from_node: result.source_node,
2705 total_count: result.total_count,
2706 });
2707 }
2708
2709 if relay_data.is_some() {
2711 let relay_targets = self.get_relay_targets(Some(source_node));
2712 self.notify(PeatEvent::MessageRelayed {
2713 origin_node: origin_node.unwrap_or(result.source_node),
2714 relay_count: relay_targets.len(),
2715 hop_count,
2716 });
2717 }
2718
2719 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2720 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2721
2722 Some(DataReceivedResult {
2723 source_node: result.source_node,
2724 is_emergency: result.is_emergency(),
2725 is_ack: result.is_ack(),
2726 counter_changed: result.counter_changed,
2727 emergency_changed: result.emergency_changed,
2728 total_count: result.total_count,
2729 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2730 relay_data,
2731 origin_node,
2732 hop_count,
2733 callsign,
2734 battery_percent,
2735 heart_rate,
2736 event_type,
2737 latitude,
2738 longitude,
2739 altitude,
2740 ..Default::default()
2741 })
2742 }
2743
2744 fn handle_relay_envelope_with_identifier(
2746 &self,
2747 source_node: NodeId,
2748 identifier: &str,
2749 data: &[u8],
2750 now_ms: u64,
2751 ) -> Option<DataReceivedResult> {
2752 let envelope = RelayEnvelope::decode(data)?;
2754
2755 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2757 let stats = self
2758 .seen_cache
2759 .lock()
2760 .unwrap()
2761 .get_stats(&envelope.message_id);
2762 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2763
2764 self.notify(PeatEvent::DuplicateMessageDropped {
2765 origin_node: envelope.origin_node,
2766 seen_count,
2767 });
2768 return None;
2769 }
2770
2771 let relay_data = if envelope.can_relay() && self.config.enable_relay {
2773 envelope.relay().map(|e| e.encode())
2774 } else {
2775 if !envelope.can_relay() {
2776 self.notify(PeatEvent::MessageTtlExpired {
2777 origin_node: envelope.origin_node,
2778 hop_count: envelope.hop_count,
2779 });
2780 }
2781 None
2782 };
2783
2784 self.process_document_data_with_identifier(
2786 source_node,
2787 identifier,
2788 &envelope.payload,
2789 now_ms,
2790 relay_data,
2791 Some(envelope.origin_node),
2792 envelope.hop_count,
2793 )
2794 }
2795
2796 pub fn on_ble_data_received_from_node(
2803 &self,
2804 node_id: NodeId,
2805 data: &[u8],
2806 now_ms: u64,
2807 ) -> Option<DataReceivedResult> {
2808 if data.len() >= 2 {
2810 match data[0] {
2811 KEY_EXCHANGE_MARKER => {
2812 let _response = self.handle_key_exchange(data, now_ms);
2813 return None;
2814 }
2815 PEER_E2EE_MARKER => {
2816 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2817 return None;
2818 }
2819 RELAY_ENVELOPE_MARKER => {
2820 return self.handle_relay_envelope(node_id, data, now_ms);
2822 }
2823 _ => {}
2824 }
2825 }
2826
2827 self.process_document_data(node_id, data, now_ms, None, None, 0)
2829 }
2830
2831 pub fn on_ble_data_received_anonymous(
2841 &self,
2842 identifier: &str,
2843 data: &[u8],
2844 now_ms: u64,
2845 ) -> Option<DataReceivedResult> {
2846 log::debug!(
2847 "on_ble_data_received_anonymous: identifier={}, len={}, marker=0x{:02X}",
2848 identifier,
2849 data.len(),
2850 data.first().copied().unwrap_or(0)
2851 );
2852
2853 let decrypted = match self.decrypt_document(data, Some(identifier)) {
2855 Some(d) => d,
2856 None => {
2857 log::warn!(
2858 "on_ble_data_received_anonymous: decrypt/parse FAILED for {} byte doc from {}",
2859 data.len(),
2860 identifier
2861 );
2862 return None;
2863 }
2864 };
2865
2866 #[cfg(feature = "mesh-translator")]
2877 match self.try_handle_translator_marker(&decrypted, Some(identifier), None) {
2878 TranslatorMarkerOutcome::NotTranslatorMarker => {}
2879 TranslatorMarkerOutcome::Handled => return None,
2880 TranslatorMarkerOutcome::Decoded(frame) => {
2881 return Some(DataReceivedResult::translator_frame(NodeId::new(0), frame));
2886 }
2887 }
2888
2889 if decrypted.len() < 8 {
2892 log::warn!("Decrypted document too short to extract source_node");
2893 return None;
2894 }
2895
2896 let source_node_u32 =
2897 u32::from_le_bytes([decrypted[4], decrypted[5], decrypted[6], decrypted[7]]);
2898 let source_node = NodeId::new(source_node_u32);
2899
2900 log::info!(
2901 "Anonymous document from {}: source_node={:08X}, len={}",
2902 identifier,
2903 source_node_u32,
2904 decrypted.len()
2905 );
2906
2907 self.peer_manager
2910 .register_identifier(identifier, source_node);
2911
2912 let is_delta = DeltaDocument::is_delta_document(&decrypted);
2914 log::info!(
2915 "Document format: delta={}, first_byte=0x{:02X}, len={}",
2916 is_delta,
2917 decrypted.first().copied().unwrap_or(0),
2918 decrypted.len()
2919 );
2920
2921 if is_delta {
2922 return self.process_delta_document_internal(
2923 source_node,
2924 &decrypted,
2925 now_ms,
2926 None,
2927 None,
2928 0,
2929 );
2930 }
2931
2932 const APP_LAYER_MARKER: u8 = 0xAF;
2936 if decrypted.first().copied() == Some(APP_LAYER_MARKER) {
2937 log::debug!(
2938 "App-layer message (0xAF) from {:08X}, {} bytes - passing to relay",
2939 source_node.as_u32(),
2940 decrypted.len()
2941 );
2942 return Some(DataReceivedResult {
2943 source_node,
2944 is_emergency: false,
2945 is_ack: false,
2946 counter_changed: false,
2947 emergency_changed: false,
2948 total_count: 0,
2949 event_timestamp: now_ms,
2950 relay_data: Some(decrypted.to_vec()),
2951 origin_node: None,
2952 hop_count: 0,
2953 callsign: None,
2954 battery_percent: None,
2955 heart_rate: None,
2956 event_type: None,
2957 latitude: None,
2958 longitude: None,
2959 altitude: None,
2960 ..Default::default()
2961 });
2962 }
2963
2964 log::info!(
2966 "Processing legacy document from {:08X}",
2967 source_node.as_u32()
2968 );
2969 let result = self.document_sync.merge_document(&decrypted)?;
2970
2971 log::info!(
2973 "Merge result: peer_peripheral={}, counter_changed={}",
2974 result.peer_peripheral.is_some(),
2975 result.counter_changed
2976 );
2977 if let Some(ref p) = result.peer_peripheral {
2978 log::info!("Peripheral callsign: '{}'", p.callsign_str());
2979 }
2980
2981 self.peer_manager.record_sync(source_node, now_ms);
2983
2984 if result.is_emergency() {
2986 self.notify(PeatEvent::EmergencyReceived {
2987 from_node: result.source_node,
2988 });
2989 } else if result.is_ack() {
2990 self.notify(PeatEvent::AckReceived {
2991 from_node: result.source_node,
2992 });
2993 }
2994
2995 if result.counter_changed {
2996 self.notify(PeatEvent::DocumentSynced {
2997 from_node: result.source_node,
2998 total_count: result.total_count,
2999 });
3000 }
3001
3002 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
3003 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
3004
3005 Some(DataReceivedResult {
3006 source_node: result.source_node,
3007 is_emergency: result.is_emergency(),
3008 is_ack: result.is_ack(),
3009 counter_changed: result.counter_changed,
3010 emergency_changed: result.emergency_changed,
3011 total_count: result.total_count,
3012 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
3013 relay_data: None,
3014 origin_node: None,
3015 hop_count: 0,
3016 callsign,
3017 battery_percent,
3018 heart_rate,
3019 event_type,
3020 latitude,
3021 longitude,
3022 altitude,
3023 ..Default::default()
3024 })
3025 }
3026
3027 fn process_document_data(
3029 &self,
3030 source_node: NodeId,
3031 data: &[u8],
3032 now_ms: u64,
3033 relay_data: Option<Vec<u8>>,
3034 origin_node: Option<NodeId>,
3035 hop_count: u8,
3036 ) -> Option<DataReceivedResult> {
3037 let source_hint = format!("node:{:08X}", source_node.as_u32());
3039 let decrypted = self.decrypt_document(data, Some(&source_hint))?;
3040
3041 #[cfg(feature = "mesh-translator")]
3045 match self.try_handle_translator_marker(&decrypted, None, Some(source_node)) {
3046 TranslatorMarkerOutcome::NotTranslatorMarker => {}
3047 TranslatorMarkerOutcome::Handled => return None,
3048 TranslatorMarkerOutcome::Decoded(frame) => {
3049 return Some(DataReceivedResult::translator_frame(source_node, frame));
3050 }
3051 }
3052
3053 if DeltaDocument::is_delta_document(&decrypted) {
3055 return self.process_delta_document_internal(
3056 source_node,
3057 &decrypted,
3058 now_ms,
3059 relay_data,
3060 origin_node,
3061 hop_count,
3062 );
3063 }
3064
3065 let result = self.document_sync.merge_document(&decrypted)?;
3067
3068 if let Some(ref peripheral) = result.peer_peripheral {
3070 if let Ok(mut peripherals) = self.peer_peripherals.write() {
3071 peripherals.insert(result.source_node, peripheral.clone());
3072 }
3073 }
3074
3075 self.peer_manager.record_sync(source_node, now_ms);
3077
3078 if result.is_emergency() {
3080 self.notify(PeatEvent::EmergencyReceived {
3081 from_node: result.source_node,
3082 });
3083 } else if result.is_ack() {
3084 self.notify(PeatEvent::AckReceived {
3085 from_node: result.source_node,
3086 });
3087 }
3088
3089 if result.counter_changed {
3090 self.notify(PeatEvent::DocumentSynced {
3091 from_node: result.source_node,
3092 total_count: result.total_count,
3093 });
3094 }
3095
3096 if relay_data.is_some() {
3098 let relay_targets = self.get_relay_targets(Some(source_node));
3099 self.notify(PeatEvent::MessageRelayed {
3100 origin_node: origin_node.unwrap_or(result.source_node),
3101 relay_count: relay_targets.len(),
3102 hop_count,
3103 });
3104 }
3105
3106 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
3107 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
3108
3109 Some(DataReceivedResult {
3110 source_node: result.source_node,
3111 is_emergency: result.is_emergency(),
3112 is_ack: result.is_ack(),
3113 counter_changed: result.counter_changed,
3114 emergency_changed: result.emergency_changed,
3115 total_count: result.total_count,
3116 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
3117 relay_data,
3118 origin_node,
3119 hop_count,
3120 callsign,
3121 battery_percent,
3122 heart_rate,
3123 event_type,
3124 latitude,
3125 longitude,
3126 altitude,
3127 ..Default::default()
3128 })
3129 }
3130
3131 fn handle_relay_envelope(
3133 &self,
3134 source_node: NodeId,
3135 data: &[u8],
3136 now_ms: u64,
3137 ) -> Option<DataReceivedResult> {
3138 let decision = self.process_relay_envelope(data, source_node, now_ms)?;
3140
3141 let relay_data = if decision.should_relay {
3143 decision.relay_data()
3144 } else {
3145 None
3146 };
3147
3148 self.process_document_data(
3150 source_node,
3151 &decision.payload,
3152 now_ms,
3153 relay_data,
3154 Some(decision.origin_node),
3155 decision.hop_count,
3156 )
3157 }
3158
3159 pub fn on_ble_data(
3168 &self,
3169 identifier: &str,
3170 data: &[u8],
3171 now_ms: u64,
3172 ) -> Option<DataReceivedResult> {
3173 if data.len() >= 2 {
3175 match data[0] {
3176 KEY_EXCHANGE_MARKER => {
3177 let _response = self.handle_key_exchange(data, now_ms);
3178 return None;
3179 }
3180 PEER_E2EE_MARKER => {
3181 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
3182 return None;
3183 }
3184 RELAY_ENVELOPE_MARKER => {
3185 return self.handle_relay_envelope_with_incoming(identifier, data, now_ms);
3187 }
3188 _ => {}
3189 }
3190 }
3191
3192 self.process_incoming_document(identifier, data, now_ms, None, None, 0)
3194 }
3195
3196 fn process_incoming_document(
3198 &self,
3199 identifier: &str,
3200 data: &[u8],
3201 now_ms: u64,
3202 relay_data: Option<Vec<u8>>,
3203 origin_node: Option<NodeId>,
3204 hop_count: u8,
3205 ) -> Option<DataReceivedResult> {
3206 let decrypted = self.decrypt_document(data, Some(identifier))?;
3208
3209 let result = self.document_sync.merge_document(&decrypted)?;
3211
3212 self.peer_manager.record_sync(result.source_node, now_ms);
3214
3215 if origin_node.is_none() {
3220 let is_new =
3222 self.peer_manager
3223 .on_incoming_connection(identifier, result.source_node, now_ms);
3224
3225 {
3227 let mut graph = self.connection_graph.lock().unwrap();
3228 if is_new {
3229 graph.on_discovered(
3230 result.source_node,
3231 identifier.to_string(),
3232 None,
3233 Some(self.config.mesh_id.clone()),
3234 -50, now_ms,
3236 );
3237 }
3238 graph.on_connected(result.source_node, now_ms);
3239 }
3240 }
3241
3242 if result.is_emergency() {
3244 self.notify(PeatEvent::EmergencyReceived {
3245 from_node: result.source_node,
3246 });
3247 } else if result.is_ack() {
3248 self.notify(PeatEvent::AckReceived {
3249 from_node: result.source_node,
3250 });
3251 }
3252
3253 if result.counter_changed {
3254 self.notify(PeatEvent::DocumentSynced {
3255 from_node: result.source_node,
3256 total_count: result.total_count,
3257 });
3258 }
3259
3260 if relay_data.is_some() {
3262 let relay_targets = self.get_relay_targets(Some(result.source_node));
3263 self.notify(PeatEvent::MessageRelayed {
3264 origin_node: origin_node.unwrap_or(result.source_node),
3265 relay_count: relay_targets.len(),
3266 hop_count,
3267 });
3268 }
3269
3270 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
3271 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
3272
3273 Some(DataReceivedResult {
3274 source_node: result.source_node,
3275 is_emergency: result.is_emergency(),
3276 is_ack: result.is_ack(),
3277 counter_changed: result.counter_changed,
3278 emergency_changed: result.emergency_changed,
3279 total_count: result.total_count,
3280 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
3281 relay_data,
3282 origin_node,
3283 hop_count,
3284 callsign,
3285 battery_percent,
3286 heart_rate,
3287 event_type,
3288 latitude,
3289 longitude,
3290 altitude,
3291 ..Default::default()
3292 })
3293 }
3294
3295 fn handle_relay_envelope_with_incoming(
3297 &self,
3298 identifier: &str,
3299 data: &[u8],
3300 now_ms: u64,
3301 ) -> Option<DataReceivedResult> {
3302 let envelope = RelayEnvelope::decode(data)?;
3304
3305 if let Some(source_peer) = self.peer_manager.get_node_id(identifier) {
3308 if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
3309 let is_new = self.connection_graph.lock().unwrap().on_relay_received(
3310 source_peer,
3311 envelope.origin_node,
3312 envelope.hop_count,
3313 now_ms,
3314 );
3315
3316 if is_new {
3317 log::debug!(
3318 "Discovered indirect peer {:08X} via {:08X} ({} hops)",
3319 envelope.origin_node.as_u32(),
3320 source_peer.as_u32(),
3321 envelope.hop_count
3322 );
3323 }
3324 }
3325 }
3326
3327 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
3329 let stats = self
3331 .seen_cache
3332 .lock()
3333 .unwrap()
3334 .get_stats(&envelope.message_id);
3335 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
3336
3337 self.notify(PeatEvent::DuplicateMessageDropped {
3338 origin_node: envelope.origin_node,
3339 seen_count,
3340 });
3341 return None;
3342 }
3343
3344 let (should_relay, relay_data) = if envelope.can_relay() && self.config.enable_relay {
3346 let relay_env = envelope.relay();
3347 (true, relay_env.map(|e| e.encode()))
3348 } else {
3349 if !envelope.can_relay() {
3350 self.notify(PeatEvent::MessageTtlExpired {
3351 origin_node: envelope.origin_node,
3352 hop_count: envelope.hop_count,
3353 });
3354 }
3355 (false, None)
3356 };
3357
3358 self.process_incoming_document(
3360 identifier,
3361 &envelope.payload,
3362 now_ms,
3363 if should_relay { relay_data } else { None },
3364 Some(envelope.origin_node),
3365 envelope.hop_count,
3366 )
3367 }
3368
3369 pub fn tick(&self, now_ms: u64) -> Option<Vec<u8>> {
3379 use std::sync::atomic::Ordering;
3380
3381 let now_ms_32 = now_ms as u32;
3383
3384 let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3386 let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3387 if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3388 self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3389 let removed = self.peer_manager.cleanup_stale(now_ms);
3390 for node_id in &removed {
3391 self.notify(PeatEvent::PeerLost { node_id: *node_id });
3392 }
3393 if !removed.is_empty() {
3394 self.notify_mesh_state_changed();
3395 }
3396
3397 {
3399 let mut graph = self.connection_graph.lock().unwrap();
3400 let newly_lost = graph.tick(now_ms);
3401 graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3403 drop(graph);
3404
3405 for node_id in newly_lost {
3408 if !removed.contains(&node_id) {
3410 self.notify(PeatEvent::PeerLost { node_id });
3411 }
3412 }
3413 }
3414 }
3415
3416 let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3418 let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3419 if sync_elapsed >= self.config.sync_interval_ms as u32 {
3420 self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3421 if self.peer_manager.connected_count() > 0 {
3423 let doc = self.document_sync.build_document();
3424 return Some(self.encrypt_document(&doc));
3425 }
3426 }
3427
3428 None
3429 }
3430
3431 pub fn tick_with_peer_deltas(&self, now_ms: u64) -> Vec<(NodeId, Vec<u8>)> {
3440 use std::sync::atomic::Ordering;
3441 let now_ms_32 = now_ms as u32;
3442
3443 let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3445 let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3446 if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3447 self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3448 let removed = self.peer_manager.cleanup_stale(now_ms);
3449 for node_id in &removed {
3450 self.notify(PeatEvent::PeerLost { node_id: *node_id });
3451 }
3452 if !removed.is_empty() {
3453 self.notify_mesh_state_changed();
3454 }
3455
3456 {
3458 let mut graph = self.connection_graph.lock().unwrap();
3459 let newly_lost = graph.tick(now_ms);
3460 graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3461 drop(graph);
3462
3463 for node_id in newly_lost {
3464 if !removed.contains(&node_id) {
3465 self.notify(PeatEvent::PeerLost { node_id });
3466 }
3467 }
3468 }
3469 }
3470
3471 let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3473 let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3474 if sync_elapsed >= self.config.sync_interval_ms as u32 {
3475 self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3476
3477 let doc = self.document_sync.build_document();
3479 let encrypted = self.encrypt_document(&doc);
3480 let mut results = Vec::new();
3481 for peer in self.get_connected_peers() {
3482 results.push((peer.node_id, encrypted.clone()));
3483 }
3484 return results;
3485 }
3486
3487 Vec::new()
3488 }
3489
3490 pub fn get_peers(&self) -> Vec<PeatPeer> {
3494 self.peer_manager.get_peers()
3495 }
3496
3497 pub fn get_connected_peers(&self) -> Vec<PeatPeer> {
3499 self.peer_manager.get_connected_peers()
3500 }
3501
3502 pub fn get_peer(&self, node_id: NodeId) -> Option<PeatPeer> {
3504 self.peer_manager.get_peer(node_id)
3505 }
3506
3507 pub fn peer_count(&self) -> usize {
3509 self.peer_manager.peer_count()
3510 }
3511
3512 pub fn connected_count(&self) -> usize {
3514 self.peer_manager.connected_count()
3515 }
3516
3517 pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
3519 self.peer_manager.matches_mesh(device_mesh_id)
3520 }
3521
3522 pub fn get_connection_graph(&self) -> Vec<PeerConnectionState> {
3546 self.connection_graph.lock().unwrap().get_all_owned()
3547 }
3548
3549 pub fn get_peer_connection_state(&self, node_id: NodeId) -> Option<PeerConnectionState> {
3551 self.connection_graph
3552 .lock()
3553 .unwrap()
3554 .get_peer(node_id)
3555 .cloned()
3556 }
3557
3558 pub fn get_connected_states(&self) -> Vec<PeerConnectionState> {
3560 self.connection_graph
3561 .lock()
3562 .unwrap()
3563 .get_connected()
3564 .into_iter()
3565 .cloned()
3566 .collect()
3567 }
3568
3569 pub fn get_degraded_peers(&self) -> Vec<PeerConnectionState> {
3571 self.connection_graph
3572 .lock()
3573 .unwrap()
3574 .get_degraded()
3575 .into_iter()
3576 .cloned()
3577 .collect()
3578 }
3579
3580 pub fn get_recently_disconnected(
3584 &self,
3585 within_ms: u64,
3586 now_ms: u64,
3587 ) -> Vec<PeerConnectionState> {
3588 self.connection_graph
3589 .lock()
3590 .unwrap()
3591 .get_recently_disconnected(within_ms, now_ms)
3592 .into_iter()
3593 .cloned()
3594 .collect()
3595 }
3596
3597 pub fn get_lost_peers(&self) -> Vec<PeerConnectionState> {
3599 self.connection_graph
3600 .lock()
3601 .unwrap()
3602 .get_lost()
3603 .into_iter()
3604 .cloned()
3605 .collect()
3606 }
3607
3608 pub fn get_connection_state_counts(&self) -> StateCountSummary {
3610 self.connection_graph.lock().unwrap().state_counts()
3611 }
3612
3613 pub fn get_indirect_peers(&self) -> Vec<IndirectPeer> {
3621 self.connection_graph
3622 .lock()
3623 .unwrap()
3624 .get_indirect_peers_owned()
3625 }
3626
3627 pub fn get_peer_degree(&self, node_id: NodeId) -> Option<PeerDegree> {
3634 self.connection_graph.lock().unwrap().peer_degree(node_id)
3635 }
3636
3637 pub fn get_full_state_counts(&self) -> FullStateCountSummary {
3642 self.connection_graph.lock().unwrap().full_state_counts()
3643 }
3644
3645 pub fn get_paths_to_peer(&self, node_id: NodeId) -> Vec<(NodeId, u8)> {
3650 self.connection_graph.lock().unwrap().get_paths_to(node_id)
3651 }
3652
3653 pub fn is_peer_known(&self, node_id: NodeId) -> bool {
3655 self.connection_graph.lock().unwrap().is_known(node_id)
3656 }
3657
3658 pub fn indirect_peer_count(&self) -> usize {
3660 self.connection_graph.lock().unwrap().indirect_peer_count()
3661 }
3662
3663 pub fn cleanup_indirect_peers(&self, now_ms: u64) -> Vec<NodeId> {
3668 self.connection_graph
3669 .lock()
3670 .unwrap()
3671 .cleanup_indirect(now_ms)
3672 }
3673
3674 pub fn total_count(&self) -> u64 {
3676 self.document_sync.total_count()
3677 }
3678
3679 pub fn document_version(&self) -> u32 {
3681 self.document_sync.version()
3682 }
3683
3684 pub fn version(&self) -> u32 {
3686 self.document_sync.version()
3687 }
3688
3689 pub fn update_health(&self, battery_percent: u8) {
3691 self.document_sync.update_health(battery_percent);
3692 }
3693
3694 pub fn update_activity(&self, activity: u8) {
3696 self.document_sync.update_activity(activity);
3697 }
3698
3699 pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
3701 self.document_sync
3702 .update_health_full(battery_percent, activity);
3703 }
3704
3705 pub fn update_heart_rate(&self, heart_rate: u8) {
3707 self.document_sync.update_heart_rate(heart_rate);
3708 }
3709
3710 pub fn update_location(&self, latitude: f32, longitude: f32, altitude: Option<f32>) {
3712 self.document_sync
3713 .update_location(latitude, longitude, altitude);
3714 }
3715
3716 pub fn clear_location(&self) {
3718 self.document_sync.clear_location();
3719 }
3720
3721 pub fn update_callsign(&self, callsign: &str) {
3723 self.document_sync.update_callsign(callsign);
3724 }
3725
3726 pub fn set_peripheral_event(&self, event_type: EventType, timestamp: u64) {
3728 self.document_sync
3729 .set_peripheral_event(event_type, timestamp);
3730 }
3731
3732 pub fn clear_peripheral_event(&self) {
3734 self.document_sync.clear_peripheral_event();
3735 }
3736
3737 #[allow(clippy::too_many_arguments)]
3742 pub fn update_peripheral_state(
3743 &self,
3744 callsign: &str,
3745 battery_percent: u8,
3746 heart_rate: Option<u8>,
3747 latitude: Option<f32>,
3748 longitude: Option<f32>,
3749 altitude: Option<f32>,
3750 event_type: Option<EventType>,
3751 timestamp: u64,
3752 ) {
3753 self.document_sync.update_peripheral_state(
3754 callsign,
3755 battery_percent,
3756 heart_rate,
3757 latitude,
3758 longitude,
3759 altitude,
3760 event_type,
3761 timestamp,
3762 );
3763 }
3764
3765 pub fn build_document(&self) -> Vec<u8> {
3769 let doc = self.document_sync.build_document();
3770 self.encrypt_document(&doc)
3771 }
3772
3773 pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<PeatPeer> {
3775 self.peer_manager.peers_needing_sync(now_ms)
3776 }
3777
3778 fn notify(&self, event: PeatEvent) {
3781 self.observers.notify(event);
3782 }
3783
3784 fn notify_mesh_state_changed(&self) {
3785 self.notify(PeatEvent::MeshStateChanged {
3786 peer_count: self.peer_manager.peer_count(),
3787 connected_count: self.peer_manager.connected_count(),
3788 });
3789 }
3790
3791 pub fn check_canned_message(&self, source_node: u32, timestamp: u64, _ttl_ms: u64) -> bool {
3811 let mut id_bytes = [0u8; 16];
3814 id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3815 id_bytes[4..12].copy_from_slice(×tamp.to_le_bytes());
3816 let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3817
3818 let seen = self.seen_cache.lock().unwrap();
3820 !seen.has_seen(&message_id)
3821 }
3822
3823 pub fn mark_canned_message_seen(&self, source_node: u32, timestamp: u64) {
3828 let now = std::time::SystemTime::now()
3829 .duration_since(std::time::UNIX_EPOCH)
3830 .map(|d| d.as_millis() as u64)
3831 .unwrap_or(0);
3832
3833 let mut id_bytes = [0u8; 16];
3835 id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3836 id_bytes[4..12].copy_from_slice(×tamp.to_le_bytes());
3837 let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3838 let origin = NodeId::new(source_node);
3839
3840 let mut seen = self.seen_cache.lock().unwrap();
3841 seen.mark_seen(message_id, origin, now);
3842 }
3843
3844 pub fn get_connected_peer_identifiers(&self) -> Vec<String> {
3849 self.peer_manager.get_connected_identifiers()
3850 }
3851}
3852
3853#[derive(Debug, Clone, Default)]
3855pub struct DataReceivedResult {
3856 pub source_node: NodeId,
3858
3859 pub is_emergency: bool,
3861
3862 pub is_ack: bool,
3864
3865 pub counter_changed: bool,
3867
3868 pub emergency_changed: bool,
3870
3871 pub total_count: u64,
3873
3874 pub event_timestamp: u64,
3876
3877 pub relay_data: Option<Vec<u8>>,
3882
3883 pub origin_node: Option<NodeId>,
3885
3886 pub hop_count: u8,
3888
3889 pub callsign: Option<String>,
3892
3893 pub battery_percent: Option<u8>,
3895
3896 pub heart_rate: Option<u8>,
3898
3899 pub event_type: Option<u8>,
3901
3902 pub latitude: Option<f32>,
3904
3905 pub longitude: Option<f32>,
3907
3908 pub altitude: Option<f32>,
3910
3911 pub decoded_translator_frame: Option<DecodedTranslatorFrame>,
3919}
3920
3921#[derive(Debug, Clone)]
3931pub struct DecodedTranslatorFrame {
3932 pub collection: String,
3934 pub doc_json: String,
3938 pub peer: Option<String>,
3940}
3941
3942#[cfg(feature = "mesh-translator")]
3947#[derive(Debug, Clone)]
3948enum TranslatorMarkerOutcome {
3949 NotTranslatorMarker,
3952 Decoded(DecodedTranslatorFrame),
3955 Handled,
3958}
3959
3960impl DataReceivedResult {
3961 #[cfg(feature = "mesh-translator")]
3969 pub(crate) fn translator_frame(source_node: NodeId, frame: DecodedTranslatorFrame) -> Self {
3970 Self {
3971 source_node,
3972 decoded_translator_frame: Some(frame),
3973 ..Default::default()
3974 }
3975 }
3976
3977 #[allow(clippy::type_complexity)]
3979 fn peripheral_fields(
3980 peripheral: &Option<crate::sync::crdt::Peripheral>,
3981 ) -> (
3982 Option<String>,
3983 Option<u8>,
3984 Option<u8>,
3985 Option<u8>,
3986 Option<f32>,
3987 Option<f32>,
3988 Option<f32>,
3989 ) {
3990 match peripheral {
3991 Some(p) => {
3992 let callsign = {
3993 let s = p.callsign_str();
3994 if s.is_empty() {
3995 None
3996 } else {
3997 Some(s.to_string())
3998 }
3999 };
4000 let battery = if p.health.battery_percent > 0 {
4001 Some(p.health.battery_percent)
4002 } else {
4003 None
4004 };
4005 let heart_rate = p.health.heart_rate;
4006 let event_type = p.last_event.as_ref().map(|e| e.event_type as u8);
4007 let (lat, lon, alt) = match &p.location {
4008 Some(loc) => (Some(loc.latitude), Some(loc.longitude), loc.altitude),
4009 None => (None, None, None),
4010 };
4011 (callsign, battery, heart_rate, event_type, lat, lon, alt)
4012 }
4013 None => (None, None, None, None, None, None, None),
4014 }
4015 }
4016}
4017
4018#[derive(Debug, Clone)]
4020pub struct RelayDecision {
4021 pub payload: Vec<u8>,
4023
4024 pub origin_node: NodeId,
4026
4027 pub hop_count: u8,
4029
4030 pub should_relay: bool,
4032
4033 pub relay_envelope: Option<RelayEnvelope>,
4037}
4038
4039impl RelayDecision {
4040 pub fn relay_data(&self) -> Option<Vec<u8>> {
4044 self.relay_envelope.as_ref().map(|e| e.encode())
4045 }
4046}
4047
4048#[cfg(all(test, feature = "std"))]
4049mod tests {
4050 use super::*;
4051 use crate::observer::CollectingObserver;
4052
4053 const TEST_TIMESTAMP: u64 = 1705276800000;
4055
4056 fn create_mesh(node_id: u32, callsign: &str) -> PeatMesh {
4057 let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST");
4058 PeatMesh::new(config)
4059 }
4060
4061 #[test]
4062 fn test_mesh_creation() {
4063 let mesh = create_mesh(0x12345678, "ALPHA-1");
4064
4065 assert_eq!(mesh.node_id().as_u32(), 0x12345678);
4066 assert_eq!(mesh.callsign(), "ALPHA-1");
4067 assert_eq!(mesh.mesh_id(), "TEST");
4068 assert_eq!(mesh.device_name(), "PEAT_TEST-12345678");
4069 }
4070
4071 #[test]
4072 fn test_peer_discovery() {
4073 let mesh = create_mesh(0x11111111, "ALPHA-1");
4074 let observer = Arc::new(CollectingObserver::new());
4075 mesh.add_observer(observer.clone());
4076
4077 let peer = mesh.on_ble_discovered(
4079 "device-uuid",
4080 Some("PEAT_TEST-22222222"),
4081 -65,
4082 Some("TEST"),
4083 1000,
4084 );
4085
4086 assert!(peer.is_some());
4087 let peer = peer.unwrap();
4088 assert_eq!(peer.node_id.as_u32(), 0x22222222);
4089
4090 let events = observer.events();
4092 assert!(events
4093 .iter()
4094 .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
4095 assert!(events
4096 .iter()
4097 .any(|e| matches!(e, PeatEvent::MeshStateChanged { .. })));
4098 }
4099
4100 #[test]
4101 fn test_connection_lifecycle() {
4102 let mesh = create_mesh(0x11111111, "ALPHA-1");
4103 let observer = Arc::new(CollectingObserver::new());
4104 mesh.add_observer(observer.clone());
4105
4106 mesh.on_ble_discovered(
4108 "device-uuid",
4109 Some("PEAT_TEST-22222222"),
4110 -65,
4111 Some("TEST"),
4112 1000,
4113 );
4114
4115 let node_id = mesh.on_ble_connected("device-uuid", 2000);
4116 assert_eq!(node_id, Some(NodeId::new(0x22222222)));
4117 assert_eq!(mesh.connected_count(), 1);
4118
4119 let node_id = mesh.on_ble_disconnected("device-uuid", DisconnectReason::RemoteRequest);
4121 assert_eq!(node_id, Some(NodeId::new(0x22222222)));
4122 assert_eq!(mesh.connected_count(), 0);
4123
4124 let events = observer.events();
4126 assert!(events
4127 .iter()
4128 .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
4129 assert!(events
4130 .iter()
4131 .any(|e| matches!(e, PeatEvent::PeerDisconnected { .. })));
4132 }
4133
4134 #[test]
4135 fn test_emergency_flow() {
4136 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4137 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4138
4139 let observer2 = Arc::new(CollectingObserver::new());
4140 mesh2.add_observer(observer2.clone());
4141
4142 let doc = mesh1.send_emergency(TEST_TIMESTAMP);
4144 assert!(mesh1.is_emergency_active());
4145
4146 let result =
4148 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4149
4150 assert!(result.is_some());
4151 let result = result.unwrap();
4152 assert!(result.is_emergency);
4153 assert_eq!(result.source_node.as_u32(), 0x11111111);
4154
4155 let events = observer2.events();
4157 assert!(events
4158 .iter()
4159 .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
4160 }
4161
4162 #[test]
4163 fn test_ack_flow() {
4164 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4165 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4166
4167 let observer2 = Arc::new(CollectingObserver::new());
4168 mesh2.add_observer(observer2.clone());
4169
4170 let doc = mesh1.send_ack(TEST_TIMESTAMP);
4172 assert!(mesh1.is_ack_active());
4173
4174 let result =
4176 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4177
4178 assert!(result.is_some());
4179 let result = result.unwrap();
4180 assert!(result.is_ack);
4181
4182 let events = observer2.events();
4184 assert!(events
4185 .iter()
4186 .any(|e| matches!(e, PeatEvent::AckReceived { .. })));
4187 }
4188
4189 #[test]
4190 fn test_tick_cleanup() {
4191 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4192 .with_peer_timeout(10_000);
4193 let mesh = PeatMesh::new(config);
4194
4195 let observer = Arc::new(CollectingObserver::new());
4196 mesh.add_observer(observer.clone());
4197
4198 mesh.on_ble_discovered(
4200 "device-uuid",
4201 Some("PEAT_TEST-22222222"),
4202 -65,
4203 Some("TEST"),
4204 1000,
4205 );
4206 assert_eq!(mesh.peer_count(), 1);
4207
4208 mesh.tick(5000);
4210 assert_eq!(mesh.peer_count(), 1);
4211
4212 mesh.tick(20000);
4214 assert_eq!(mesh.peer_count(), 0);
4215
4216 let events = observer.events();
4218 assert!(events
4219 .iter()
4220 .any(|e| matches!(e, PeatEvent::PeerLost { .. })));
4221 }
4222
4223 #[test]
4224 fn test_tick_sync_broadcast() {
4225 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4226 .with_sync_interval(5000);
4227 let mesh = PeatMesh::new(config);
4228
4229 mesh.on_ble_discovered(
4231 "device-uuid",
4232 Some("PEAT_TEST-22222222"),
4233 -65,
4234 Some("TEST"),
4235 1000,
4236 );
4237 mesh.on_ble_connected("device-uuid", 1000);
4238
4239 let _result = mesh.tick(0);
4241 let result = mesh.tick(3000);
4245 assert!(result.is_none());
4246
4247 let result = mesh.tick(6000);
4249 assert!(result.is_some());
4250
4251 let result = mesh.tick(6100);
4253 assert!(result.is_none());
4254
4255 let result = mesh.tick(12000);
4257 assert!(result.is_some());
4258 }
4259
4260 #[test]
4261 fn test_incoming_connection() {
4262 let mesh = create_mesh(0x11111111, "ALPHA-1");
4263 let observer = Arc::new(CollectingObserver::new());
4264 mesh.add_observer(observer.clone());
4265
4266 let is_new = mesh.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
4268
4269 assert!(is_new);
4270 assert_eq!(mesh.peer_count(), 1);
4271 assert_eq!(mesh.connected_count(), 1);
4272
4273 let events = observer.events();
4275 assert!(events
4276 .iter()
4277 .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
4278 assert!(events
4279 .iter()
4280 .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
4281 }
4282
4283 #[test]
4284 fn test_mesh_filtering() {
4285 let mesh = create_mesh(0x11111111, "ALPHA-1");
4286
4287 let peer = mesh.on_ble_discovered(
4289 "device-uuid-1",
4290 Some("PEAT_OTHER-22222222"),
4291 -65,
4292 Some("OTHER"),
4293 1000,
4294 );
4295 assert!(peer.is_none());
4296 assert_eq!(mesh.peer_count(), 0);
4297
4298 let peer = mesh.on_ble_discovered(
4300 "device-uuid-2",
4301 Some("PEAT_TEST-33333333"),
4302 -65,
4303 Some("TEST"),
4304 1000,
4305 );
4306 assert!(peer.is_some());
4307 assert_eq!(mesh.peer_count(), 1);
4308 }
4309
4310 fn create_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
4313 let config =
4314 PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_encryption(secret);
4315 PeatMesh::new(config)
4316 }
4317
4318 #[test]
4319 fn test_encryption_enabled() {
4320 let secret = [0x42u8; 32];
4321 let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4322
4323 assert!(mesh.is_encryption_enabled());
4324 }
4325
4326 #[test]
4327 fn test_encryption_disabled_by_default() {
4328 let mesh = create_mesh(0x11111111, "ALPHA-1");
4329
4330 assert!(!mesh.is_encryption_enabled());
4331 }
4332
4333 #[test]
4334 fn test_encrypted_document_exchange() {
4335 let secret = [0x42u8; 32];
4336 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4337 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4338
4339 let doc = mesh1.build_document();
4341
4342 assert!(doc.len() >= 2);
4344 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4345
4346 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4348
4349 assert!(result.is_some());
4350 let result = result.unwrap();
4351 assert_eq!(result.source_node.as_u32(), 0x11111111);
4352 }
4353
4354 #[test]
4355 fn test_encrypted_emergency_exchange() {
4356 let secret = [0x42u8; 32];
4357 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4358 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4359
4360 let observer = Arc::new(CollectingObserver::new());
4361 mesh2.add_observer(observer.clone());
4362
4363 let doc = mesh1.send_emergency(TEST_TIMESTAMP);
4365
4366 let result =
4368 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4369
4370 assert!(result.is_some());
4371 let result = result.unwrap();
4372 assert!(result.is_emergency);
4373
4374 let events = observer.events();
4376 assert!(events
4377 .iter()
4378 .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
4379 }
4380
4381 #[test]
4382 fn test_wrong_key_fails_decrypt() {
4383 let secret1 = [0x42u8; 32];
4384 let secret2 = [0x43u8; 32]; let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4386 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4387
4388 let doc = mesh1.build_document();
4390
4391 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4393
4394 assert!(result.is_none());
4395 }
4396
4397 #[test]
4398 fn test_unencrypted_mesh_can_read_unencrypted() {
4399 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4400 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4401
4402 let doc = mesh1.build_document();
4404
4405 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4407
4408 assert!(result.is_some());
4409 }
4410
4411 #[test]
4412 fn test_encrypted_mesh_can_receive_unencrypted() {
4413 let secret = [0x42u8; 32];
4415 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); let doc = mesh1.build_document();
4420
4421 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4423
4424 assert!(result.is_some());
4425 }
4426
4427 #[test]
4428 fn test_unencrypted_mesh_cannot_receive_encrypted() {
4429 let secret = [0x42u8; 32];
4430 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret); let mesh2 = create_mesh(0x22222222, "BRAVO-1"); let doc = mesh1.build_document();
4435
4436 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4438
4439 assert!(result.is_none());
4440 }
4441
4442 #[test]
4443 fn test_enable_disable_encryption() {
4444 let mut mesh = create_mesh(0x11111111, "ALPHA-1");
4445
4446 assert!(!mesh.is_encryption_enabled());
4447
4448 let secret = [0x42u8; 32];
4450 mesh.enable_encryption(&secret);
4451 assert!(mesh.is_encryption_enabled());
4452
4453 let doc = mesh.build_document();
4455 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4456
4457 mesh.disable_encryption();
4459 assert!(!mesh.is_encryption_enabled());
4460
4461 let doc = mesh.build_document();
4463 assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4464 }
4465
4466 #[test]
4467 fn test_encryption_overhead() {
4468 let secret = [0x42u8; 32];
4469 let mesh_encrypted = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4470 let mesh_unencrypted = create_mesh(0x22222222, "BRAVO-1");
4471
4472 let doc_encrypted = mesh_encrypted.build_document();
4473 let doc_unencrypted = mesh_unencrypted.build_document();
4474
4475 let overhead = doc_encrypted.len() - doc_unencrypted.len();
4481 assert_eq!(overhead, 30); }
4483
4484 #[test]
4487 fn test_peer_e2ee_enable_disable() {
4488 let mesh = create_mesh(0x11111111, "ALPHA-1");
4489
4490 assert!(!mesh.is_peer_e2ee_enabled());
4491 assert!(mesh.peer_e2ee_public_key().is_none());
4492
4493 mesh.enable_peer_e2ee();
4494 assert!(mesh.is_peer_e2ee_enabled());
4495 assert!(mesh.peer_e2ee_public_key().is_some());
4496
4497 mesh.disable_peer_e2ee();
4498 assert!(!mesh.is_peer_e2ee_enabled());
4499 }
4500
4501 #[test]
4502 fn test_peer_e2ee_initiate_session() {
4503 let mesh = create_mesh(0x11111111, "ALPHA-1");
4504 mesh.enable_peer_e2ee();
4505
4506 let key_exchange = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4507 assert!(key_exchange.is_some());
4508
4509 let key_exchange = key_exchange.unwrap();
4510 assert_eq!(key_exchange[0], crate::document::KEY_EXCHANGE_MARKER);
4512
4513 assert_eq!(mesh.peer_e2ee_session_count(), 1);
4515 assert_eq!(mesh.peer_e2ee_established_count(), 0);
4516 }
4517
4518 #[test]
4519 fn test_peer_e2ee_full_handshake() {
4520 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4521 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4522
4523 mesh1.enable_peer_e2ee();
4524 mesh2.enable_peer_e2ee();
4525
4526 let observer1 = Arc::new(CollectingObserver::new());
4527 let observer2 = Arc::new(CollectingObserver::new());
4528 mesh1.add_observer(observer1.clone());
4529 mesh2.add_observer(observer2.clone());
4530
4531 let key_exchange1 = mesh1
4533 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4534 .unwrap();
4535
4536 let response = mesh2.handle_key_exchange(&key_exchange1, 1000);
4538 assert!(response.is_some());
4539
4540 assert!(mesh2.has_peer_e2ee_session(NodeId::new(0x11111111)));
4542
4543 let key_exchange2 = response.unwrap();
4545 let _ = mesh1.handle_key_exchange(&key_exchange2, 1000);
4546
4547 assert!(mesh1.has_peer_e2ee_session(NodeId::new(0x22222222)));
4549
4550 let events1 = observer1.events();
4552 assert!(events1
4553 .iter()
4554 .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4555
4556 let events2 = observer2.events();
4557 assert!(events2
4558 .iter()
4559 .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4560 }
4561
4562 #[test]
4563 fn test_peer_e2ee_encrypt_decrypt() {
4564 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4565 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4566
4567 mesh1.enable_peer_e2ee();
4568 mesh2.enable_peer_e2ee();
4569
4570 let key_exchange1 = mesh1
4572 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4573 .unwrap();
4574 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4575 mesh1.handle_key_exchange(&key_exchange2, 1000);
4576
4577 let plaintext = b"Secret message from mesh1";
4579 let encrypted = mesh1.send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000);
4580 assert!(encrypted.is_some());
4581
4582 let encrypted = encrypted.unwrap();
4583 assert_eq!(encrypted[0], crate::document::PEER_E2EE_MARKER);
4585
4586 let observer2 = Arc::new(CollectingObserver::new());
4588 mesh2.add_observer(observer2.clone());
4589
4590 let decrypted = mesh2.handle_peer_e2ee_message(&encrypted, 2000);
4591 assert!(decrypted.is_some());
4592 assert_eq!(decrypted.unwrap(), plaintext);
4593
4594 let events = observer2.events();
4596 assert!(events.iter().any(|e| matches!(
4597 e,
4598 PeatEvent::PeerE2eeMessageReceived { from_node, data }
4599 if from_node.as_u32() == 0x11111111 && data == plaintext
4600 )));
4601 }
4602
4603 #[test]
4604 fn test_peer_e2ee_bidirectional() {
4605 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4606 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4607
4608 mesh1.enable_peer_e2ee();
4609 mesh2.enable_peer_e2ee();
4610
4611 let key_exchange1 = mesh1
4613 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4614 .unwrap();
4615 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4616 mesh1.handle_key_exchange(&key_exchange2, 1000);
4617
4618 let msg1 = mesh1
4620 .send_peer_e2ee(NodeId::new(0x22222222), b"Hello from mesh1", 2000)
4621 .unwrap();
4622 let dec1 = mesh2.handle_peer_e2ee_message(&msg1, 2000).unwrap();
4623 assert_eq!(dec1, b"Hello from mesh1");
4624
4625 let msg2 = mesh2
4627 .send_peer_e2ee(NodeId::new(0x11111111), b"Hello from mesh2", 2000)
4628 .unwrap();
4629 let dec2 = mesh1.handle_peer_e2ee_message(&msg2, 2000).unwrap();
4630 assert_eq!(dec2, b"Hello from mesh2");
4631 }
4632
4633 #[test]
4634 fn test_peer_e2ee_close_session() {
4635 let mesh = create_mesh(0x11111111, "ALPHA-1");
4636 mesh.enable_peer_e2ee();
4637
4638 let observer = Arc::new(CollectingObserver::new());
4639 mesh.add_observer(observer.clone());
4640
4641 mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4643 assert_eq!(mesh.peer_e2ee_session_count(), 1);
4644
4645 mesh.close_peer_e2ee(NodeId::new(0x22222222));
4647
4648 let events = observer.events();
4650 assert!(events
4651 .iter()
4652 .any(|e| matches!(e, PeatEvent::PeerE2eeClosed { .. })));
4653 }
4654
4655 #[test]
4656 fn test_peer_e2ee_without_enabling() {
4657 let mesh = create_mesh(0x11111111, "ALPHA-1");
4658
4659 let result = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4661 assert!(result.is_none());
4662
4663 let result = mesh.send_peer_e2ee(NodeId::new(0x22222222), b"test", 1000);
4664 assert!(result.is_none());
4665
4666 assert!(!mesh.has_peer_e2ee_session(NodeId::new(0x22222222)));
4667 }
4668
4669 #[test]
4670 fn test_peer_e2ee_overhead() {
4671 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4672 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4673
4674 mesh1.enable_peer_e2ee();
4675 mesh2.enable_peer_e2ee();
4676
4677 let key_exchange1 = mesh1
4679 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4680 .unwrap();
4681 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4682 mesh1.handle_key_exchange(&key_exchange2, 1000);
4683
4684 let plaintext = b"Test message";
4686 let encrypted = mesh1
4687 .send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000)
4688 .unwrap();
4689
4690 let overhead = encrypted.len() - plaintext.len();
4699 assert_eq!(overhead, 46);
4700 }
4701
4702 fn create_strict_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
4705 let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST")
4706 .with_encryption(secret)
4707 .with_strict_encryption();
4708 PeatMesh::new(config)
4709 }
4710
4711 #[test]
4712 fn test_strict_encryption_enabled() {
4713 let secret = [0x42u8; 32];
4714 let mesh = create_strict_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4715
4716 assert!(mesh.is_encryption_enabled());
4717 assert!(mesh.is_strict_encryption_enabled());
4718 }
4719
4720 #[test]
4721 fn test_strict_encryption_disabled_by_default() {
4722 let secret = [0x42u8; 32];
4723 let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4724
4725 assert!(mesh.is_encryption_enabled());
4726 assert!(!mesh.is_strict_encryption_enabled());
4727 }
4728
4729 #[test]
4730 fn test_strict_encryption_requires_encryption_enabled() {
4731 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4733 .with_strict_encryption(); let mesh = PeatMesh::new(config);
4735
4736 assert!(!mesh.is_encryption_enabled());
4737 assert!(!mesh.is_strict_encryption_enabled());
4738 }
4739
4740 #[test]
4741 fn test_strict_mode_accepts_encrypted_documents() {
4742 let secret = [0x42u8; 32];
4743 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4744 let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4745
4746 let doc = mesh1.build_document();
4748 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4749
4750 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4752 assert!(result.is_some());
4753 }
4754
4755 #[test]
4756 fn test_strict_mode_rejects_unencrypted_documents() {
4757 let secret = [0x42u8; 32];
4758 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret); let observer = Arc::new(CollectingObserver::new());
4762 mesh2.add_observer(observer.clone());
4763
4764 let doc = mesh1.build_document();
4766 assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4767
4768 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4770 assert!(result.is_none());
4771
4772 let events = observer.events();
4774 assert!(events.iter().any(|e| matches!(
4775 e,
4776 PeatEvent::SecurityViolation {
4777 kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4778 ..
4779 }
4780 )));
4781 }
4782
4783 #[test]
4784 fn test_non_strict_mode_accepts_unencrypted_documents() {
4785 let secret = [0x42u8; 32];
4786 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); let doc = mesh1.build_document();
4791
4792 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4794 assert!(result.is_some());
4795 }
4796
4797 #[test]
4798 fn test_strict_mode_security_violation_event_includes_source() {
4799 let secret = [0x42u8; 32];
4800 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4801 let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4802
4803 let observer = Arc::new(CollectingObserver::new());
4804 mesh2.add_observer(observer.clone());
4805
4806 let doc = mesh1.build_document();
4807
4808 mesh2.on_ble_discovered(
4810 "test-device-uuid",
4811 Some("PEAT_TEST-11111111"),
4812 -65,
4813 Some("TEST"),
4814 500,
4815 );
4816 mesh2.on_ble_connected("test-device-uuid", 600);
4817
4818 let _result = mesh2.on_ble_data_received("test-device-uuid", &doc, 1000);
4819
4820 let events = observer.events();
4822 let violation = events.iter().find(|e| {
4823 matches!(
4824 e,
4825 PeatEvent::SecurityViolation {
4826 kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4827 ..
4828 }
4829 )
4830 });
4831 assert!(violation.is_some());
4832
4833 if let Some(PeatEvent::SecurityViolation { source, .. }) = violation {
4834 assert!(source.is_some());
4835 assert_eq!(source.as_ref().unwrap(), "test-device-uuid");
4836 }
4837 }
4838
4839 #[test]
4840 fn test_decryption_failure_emits_security_violation() {
4841 let secret1 = [0x42u8; 32];
4842 let secret2 = [0x43u8; 32]; let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4844 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4845
4846 let observer = Arc::new(CollectingObserver::new());
4847 mesh2.add_observer(observer.clone());
4848
4849 let doc = mesh1.build_document();
4851
4852 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4854 assert!(result.is_none());
4855
4856 let events = observer.events();
4858 assert!(events.iter().any(|e| matches!(
4859 e,
4860 PeatEvent::SecurityViolation {
4861 kind: crate::observer::SecurityViolationKind::DecryptionFailed,
4862 ..
4863 }
4864 )));
4865 }
4866
4867 #[test]
4868 fn test_strict_mode_builder_chain() {
4869 let secret = [0x42u8; 32];
4870 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4871 .with_encryption(secret)
4872 .with_strict_encryption()
4873 .with_sync_interval(10_000)
4874 .with_peer_timeout(60_000);
4875
4876 let mesh = PeatMesh::new(config);
4877
4878 assert!(mesh.is_encryption_enabled());
4879 assert!(mesh.is_strict_encryption_enabled());
4880 }
4881
4882 fn create_relay_mesh(node_id: u32, callsign: &str) -> PeatMesh {
4885 let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_relay();
4886 PeatMesh::new(config)
4887 }
4888
4889 #[test]
4890 fn test_relay_disabled_by_default() {
4891 let mesh = create_mesh(0x11111111, "ALPHA-1");
4892 assert!(!mesh.is_relay_enabled());
4893 }
4894
4895 #[test]
4896 fn test_relay_enabled() {
4897 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4898 assert!(mesh.is_relay_enabled());
4899 }
4900
4901 #[test]
4902 fn test_relay_config_builder() {
4903 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4904 .with_relay()
4905 .with_max_relay_hops(5)
4906 .with_relay_fanout(3)
4907 .with_seen_cache_ttl(60_000);
4908
4909 assert!(config.enable_relay);
4910 assert_eq!(config.max_relay_hops, 5);
4911 assert_eq!(config.relay_fanout, 3);
4912 assert_eq!(config.seen_cache_ttl_ms, 60_000);
4913 }
4914
4915 #[test]
4916 fn test_seen_message_deduplication() {
4917 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4918 let origin = NodeId::new(0x22222222);
4919 let msg_id = crate::relay::MessageId::from_content(origin, 1000, 0xDEADBEEF);
4920
4921 assert!(mesh.mark_message_seen(msg_id, origin, 1000));
4923
4924 assert!(!mesh.mark_message_seen(msg_id, origin, 2000));
4926
4927 assert_eq!(mesh.seen_cache_size(), 1);
4928 }
4929
4930 #[test]
4931 fn test_wrap_for_relay() {
4932 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4933
4934 let payload = vec![1, 2, 3, 4, 5];
4935 let wrapped = mesh.wrap_for_relay(payload.clone());
4936
4937 assert_eq!(wrapped[0], crate::relay::RELAY_ENVELOPE_MARKER);
4939
4940 let envelope = crate::relay::RelayEnvelope::decode(&wrapped).unwrap();
4942 assert_eq!(envelope.payload, payload);
4943 assert_eq!(envelope.origin_node, NodeId::new(0x11111111));
4944 assert_eq!(envelope.hop_count, 0);
4945 }
4946
4947 #[test]
4948 fn test_process_relay_envelope_new_message() {
4949 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4950 let observer = Arc::new(CollectingObserver::new());
4951 mesh.add_observer(observer.clone());
4952
4953 let payload = vec![1, 2, 3, 4, 5];
4955 let envelope =
4956 crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4957 .with_max_hops(7);
4958 let data = envelope.encode();
4959
4960 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4962
4963 assert!(decision.is_some());
4964 let decision = decision.unwrap();
4965 assert_eq!(decision.payload, payload);
4966 assert_eq!(decision.origin_node.as_u32(), 0x22222222);
4967 assert_eq!(decision.hop_count, 0);
4968 assert!(decision.should_relay);
4969 assert!(decision.relay_envelope.is_some());
4970
4971 let relay_env = decision.relay_envelope.unwrap();
4973 assert_eq!(relay_env.hop_count, 1);
4974 }
4975
4976 #[test]
4977 fn test_process_relay_envelope_duplicate() {
4978 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4979 let observer = Arc::new(CollectingObserver::new());
4980 mesh.add_observer(observer.clone());
4981
4982 let payload = vec![1, 2, 3, 4, 5];
4983 let envelope = crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload);
4984 let data = envelope.encode();
4985
4986 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4988 assert!(decision.is_some());
4989
4990 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 2000);
4992 assert!(decision.is_none());
4993
4994 let events = observer.events();
4996 assert!(events
4997 .iter()
4998 .any(|e| matches!(e, PeatEvent::DuplicateMessageDropped { .. })));
4999 }
5000
5001 #[test]
5002 fn test_process_relay_envelope_ttl_expired() {
5003 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5004 let observer = Arc::new(CollectingObserver::new());
5005 mesh.add_observer(observer.clone());
5006
5007 let payload = vec![1, 2, 3, 4, 5];
5009 let mut envelope =
5010 crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
5011 .with_max_hops(3);
5012
5013 envelope = envelope.relay().unwrap(); envelope = envelope.relay().unwrap(); envelope = envelope.relay().unwrap(); let data = envelope.encode();
5019
5020 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
5022
5023 assert!(decision.is_some());
5024 let decision = decision.unwrap();
5025 assert_eq!(decision.payload, payload);
5026 assert!(!decision.should_relay); assert!(decision.relay_envelope.is_none());
5028
5029 let events = observer.events();
5031 assert!(events
5032 .iter()
5033 .any(|e| matches!(e, PeatEvent::MessageTtlExpired { .. })));
5034 }
5035
5036 #[test]
5037 fn test_build_relay_document() {
5038 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5039
5040 let relay_doc = mesh.build_relay_document();
5041
5042 assert_eq!(relay_doc[0], crate::relay::RELAY_ENVELOPE_MARKER);
5044
5045 let envelope = crate::relay::RelayEnvelope::decode(&relay_doc).unwrap();
5047 assert_eq!(envelope.origin_node.as_u32(), 0x11111111);
5048
5049 let doc = crate::document::PeatDocument::decode(&envelope.payload);
5051 assert!(doc.is_some());
5052 }
5053
5054 #[test]
5055 fn test_relay_targets_excludes_source() {
5056 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5057
5058 mesh.on_ble_discovered(
5060 "peer-1",
5061 Some("PEAT_TEST-22222222"),
5062 -60,
5063 Some("TEST"),
5064 1000,
5065 );
5066 mesh.on_ble_connected("peer-1", 1000);
5067
5068 mesh.on_ble_discovered(
5069 "peer-2",
5070 Some("PEAT_TEST-33333333"),
5071 -65,
5072 Some("TEST"),
5073 1000,
5074 );
5075 mesh.on_ble_connected("peer-2", 1000);
5076
5077 mesh.on_ble_discovered(
5078 "peer-3",
5079 Some("PEAT_TEST-44444444"),
5080 -70,
5081 Some("TEST"),
5082 1000,
5083 );
5084 mesh.on_ble_connected("peer-3", 1000);
5085
5086 let targets = mesh.get_relay_targets(Some(NodeId::new(0x33333333)));
5088
5089 assert!(targets.iter().all(|p| p.node_id.as_u32() != 0x33333333));
5091 }
5092
5093 #[test]
5094 fn test_clear_seen_cache() {
5095 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5096 let origin = NodeId::new(0x22222222);
5097
5098 mesh.mark_message_seen(
5100 crate::relay::MessageId::from_content(origin, 1000, 0x11111111),
5101 origin,
5102 1000,
5103 );
5104 mesh.mark_message_seen(
5105 crate::relay::MessageId::from_content(origin, 2000, 0x22222222),
5106 origin,
5107 2000,
5108 );
5109
5110 assert_eq!(mesh.seen_cache_size(), 2);
5111
5112 mesh.clear_seen_cache();
5114 assert_eq!(mesh.seen_cache_size(), 0);
5115 }
5116}