1#[cfg(not(feature = "std"))]
57use alloc::{string::String, sync::Arc, vec::Vec};
58#[cfg(feature = "std")]
59use std::collections::HashMap;
60#[cfg(feature = "std")]
61use std::sync::Arc;
62
63use crate::document::{ENCRYPTED_MARKER, KEY_EXCHANGE_MARKER, PEER_E2EE_MARKER};
64#[cfg(feature = "mesh-translator")]
65use crate::document::{
66 TRANSLATOR_FRAME_MARKER, TRANSLATOR_RESERVED_MARKER_END, TRANSLATOR_RESERVED_MARKER_START,
67};
68use crate::document_sync::DocumentSync;
69use crate::gossip::{GossipStrategy, RandomFanout};
70use crate::observer::{DisconnectReason, PeatEvent, PeatObserver, SecurityViolationKind};
71use crate::peer::{
72 ConnectionStateGraph, FullStateCountSummary, IndirectPeer, PeatPeer, PeerConnectionState,
73 PeerDegree, PeerManagerConfig, StateCountSummary,
74};
75use crate::peer_manager::PeerManager;
76use crate::relay::{
77 MessageId, RelayEnvelope, SeenMessageCache, DEFAULT_MAX_HOPS, DEFAULT_SEEN_TTL_MS,
78 RELAY_ENVELOPE_MARKER,
79};
80use crate::security::{
81 DeviceIdentity, IdentityAttestation, IdentityRegistry, KeyExchangeMessage, MeshEncryptionKey,
82 PeerEncryptedMessage, PeerSessionManager, RegistryResult, SessionState,
83};
84use crate::sync::crdt::{EventType, Peripheral, PeripheralType};
85use crate::sync::delta::{DeltaEncoder, DeltaStats};
86use crate::sync::delta_document::{DeltaDocument, Operation};
87use crate::NodeId;
88
89#[cfg(feature = "std")]
90use crate::observer::ObserverManager;
91
92use crate::registry::DocumentRegistry;
93
94#[derive(Debug, Clone)]
96pub struct PeatMeshConfig {
97 pub node_id: NodeId,
99
100 pub callsign: String,
102
103 pub mesh_id: String,
105
106 pub peripheral_type: PeripheralType,
108
109 pub peer_config: PeerManagerConfig,
111
112 pub sync_interval_ms: u64,
114
115 pub auto_broadcast_events: bool,
117
118 pub encryption_secret: Option<[u8; 32]>,
124
125 pub strict_encryption: bool,
133
134 pub enable_relay: bool,
141
142 pub max_relay_hops: u8,
147
148 pub relay_fanout: usize,
154
155 pub seen_cache_ttl_ms: u64,
160}
161
162impl PeatMeshConfig {
163 pub fn new(node_id: NodeId, callsign: &str, mesh_id: &str) -> Self {
165 Self {
166 node_id,
167 callsign: callsign.into(),
168 mesh_id: mesh_id.into(),
169 peripheral_type: PeripheralType::SoldierSensor,
170 peer_config: PeerManagerConfig::with_mesh_id(mesh_id),
171 sync_interval_ms: 5000,
172 auto_broadcast_events: true,
173 encryption_secret: None,
174 strict_encryption: false,
175 enable_relay: false,
176 max_relay_hops: DEFAULT_MAX_HOPS,
177 relay_fanout: 2,
178 seen_cache_ttl_ms: DEFAULT_SEEN_TTL_MS,
179 }
180 }
181
182 pub fn with_encryption(mut self, secret: [u8; 32]) -> Self {
187 self.encryption_secret = Some(secret);
188 self
189 }
190
191 pub fn with_peripheral_type(mut self, ptype: PeripheralType) -> Self {
193 self.peripheral_type = ptype;
194 self
195 }
196
197 pub fn with_sync_interval(mut self, interval_ms: u64) -> Self {
199 self.sync_interval_ms = interval_ms;
200 self
201 }
202
203 pub fn with_peer_timeout(mut self, timeout_ms: u64) -> Self {
205 self.peer_config.peer_timeout_ms = timeout_ms;
206 self
207 }
208
209 pub fn with_max_peers(mut self, max: usize) -> Self {
211 self.peer_config.max_peers = max;
212 self
213 }
214
215 pub fn with_strict_encryption(mut self) -> Self {
223 self.strict_encryption = true;
224 self
225 }
226
227 pub fn with_relay(mut self) -> Self {
232 self.enable_relay = true;
233 self
234 }
235
236 pub fn with_max_relay_hops(mut self, max_hops: u8) -> Self {
240 self.max_relay_hops = max_hops;
241 self
242 }
243
244 pub fn with_relay_fanout(mut self, fanout: usize) -> Self {
248 self.relay_fanout = fanout.max(1);
249 self
250 }
251
252 pub fn with_seen_cache_ttl(mut self, ttl_ms: u64) -> Self {
256 self.seen_cache_ttl_ms = ttl_ms;
257 self
258 }
259}
260
261#[cfg(feature = "std")]
263type AppDocumentStore =
264 std::sync::RwLock<HashMap<(u8, u32, u64), Box<dyn core::any::Any + Send + Sync>>>;
265
266#[cfg(feature = "std")]
271pub struct PeatMesh {
272 config: PeatMeshConfig,
274
275 peer_manager: PeerManager,
277
278 document_sync: DocumentSync,
280
281 observers: ObserverManager,
283
284 last_sync_ms: std::sync::atomic::AtomicU32,
286
287 last_cleanup_ms: std::sync::atomic::AtomicU32,
289
290 encryption_key: Option<MeshEncryptionKey>,
292
293 peer_sessions: std::sync::Mutex<Option<PeerSessionManager>>,
295
296 connection_graph: std::sync::Mutex<ConnectionStateGraph>,
298
299 seen_cache: std::sync::Mutex<SeenMessageCache>,
301
302 gossip_strategy: Box<dyn GossipStrategy>,
304
305 delta_encoder: std::sync::Mutex<DeltaEncoder>,
310
311 identity: Option<DeviceIdentity>,
316
317 identity_registry: std::sync::Mutex<IdentityRegistry>,
321
322 peer_peripherals: std::sync::RwLock<HashMap<NodeId, Peripheral>>,
327
328 document_registry: DocumentRegistry,
333
334 app_documents: AppDocumentStore,
340
341 #[cfg(feature = "mesh-translator")]
350 ble_translator: std::sync::RwLock<crate::translator::BleTranslator>,
351
352 #[cfg(feature = "mesh-translator")]
366 decoded_document_callback: std::sync::RwLock<Option<Arc<dyn crate::DecodedDocumentCallback>>>,
367
368 #[cfg(all(feature = "mesh-translator", feature = "uniffi"))]
376 decoded_document_json_callback:
377 std::sync::RwLock<Option<Arc<dyn crate::DecodedDocumentJsonCallback>>>,
378
379 polled_translator_consumer_attested: std::sync::atomic::AtomicBool,
395}
396
397#[cfg(feature = "std")]
398impl PeatMesh {
399 pub fn new(config: PeatMeshConfig) -> Self {
401 let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
402 let document_sync = DocumentSync::with_peripheral_type(
403 config.node_id,
404 &config.callsign,
405 config.peripheral_type,
406 );
407
408 let encryption_key = config
410 .encryption_secret
411 .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
412
413 let connection_graph = ConnectionStateGraph::with_config(
415 config.peer_config.rssi_degraded_threshold,
416 config.peer_config.lost_timeout_ms,
417 );
418
419 let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
421
422 let gossip_strategy: Box<dyn GossipStrategy> =
424 Box::new(RandomFanout::new(config.relay_fanout));
425
426 let delta_encoder = DeltaEncoder::new(config.node_id);
428
429 let document_registry = DocumentRegistry::new();
430
431 Self {
432 config,
433 peer_manager,
434 document_sync,
435 observers: ObserverManager::new(),
436 last_sync_ms: std::sync::atomic::AtomicU32::new(0),
437 last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
438 encryption_key,
439 peer_sessions: std::sync::Mutex::new(None),
440 connection_graph: std::sync::Mutex::new(connection_graph),
441 seen_cache: std::sync::Mutex::new(seen_cache),
442 gossip_strategy,
443 delta_encoder: std::sync::Mutex::new(delta_encoder),
444 identity: None,
445 identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
446 peer_peripherals: std::sync::RwLock::new(HashMap::new()),
447 document_registry,
448 app_documents: std::sync::RwLock::new(HashMap::new()),
449 #[cfg(feature = "mesh-translator")]
450 ble_translator: std::sync::RwLock::new(
451 crate::translator::BleTranslator::with_defaults(),
452 ),
453 #[cfg(feature = "mesh-translator")]
454 decoded_document_callback: std::sync::RwLock::new(None),
455 #[cfg(all(feature = "mesh-translator", feature = "uniffi"))]
456 decoded_document_json_callback: std::sync::RwLock::new(None),
457 polled_translator_consumer_attested: std::sync::atomic::AtomicBool::new(false),
458 }
459 }
460
461 pub fn with_identity(config: PeatMeshConfig, identity: DeviceIdentity) -> Self {
467 let mut config = config;
469 config.node_id = identity.node_id();
470
471 let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
472 let document_sync = DocumentSync::with_peripheral_type(
473 config.node_id,
474 &config.callsign,
475 config.peripheral_type,
476 );
477
478 let encryption_key = config
479 .encryption_secret
480 .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
481
482 let connection_graph = ConnectionStateGraph::with_config(
483 config.peer_config.rssi_degraded_threshold,
484 config.peer_config.lost_timeout_ms,
485 );
486
487 let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
488 let gossip_strategy: Box<dyn GossipStrategy> =
489 Box::new(RandomFanout::new(config.relay_fanout));
490 let delta_encoder = DeltaEncoder::new(config.node_id);
491
492 let document_registry = DocumentRegistry::new();
493
494 Self {
495 config,
496 peer_manager,
497 document_sync,
498 observers: ObserverManager::new(),
499 last_sync_ms: std::sync::atomic::AtomicU32::new(0),
500 last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
501 encryption_key,
502 peer_sessions: std::sync::Mutex::new(None),
503 connection_graph: std::sync::Mutex::new(connection_graph),
504 seen_cache: std::sync::Mutex::new(seen_cache),
505 gossip_strategy,
506 delta_encoder: std::sync::Mutex::new(delta_encoder),
507 identity: Some(identity),
508 identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
509 peer_peripherals: std::sync::RwLock::new(HashMap::new()),
510 document_registry,
511 app_documents: std::sync::RwLock::new(HashMap::new()),
512 #[cfg(feature = "mesh-translator")]
513 ble_translator: std::sync::RwLock::new(
514 crate::translator::BleTranslator::with_defaults(),
515 ),
516 #[cfg(feature = "mesh-translator")]
517 decoded_document_callback: std::sync::RwLock::new(None),
518 #[cfg(all(feature = "mesh-translator", feature = "uniffi"))]
519 decoded_document_json_callback: std::sync::RwLock::new(None),
520 polled_translator_consumer_attested: std::sync::atomic::AtomicBool::new(false),
521 }
522 }
523
524 pub fn from_genesis(
532 genesis: &crate::security::MeshGenesis,
533 identity: DeviceIdentity,
534 callsign: &str,
535 ) -> Self {
536 let config = PeatMeshConfig::new(identity.node_id(), callsign, &genesis.mesh_id())
537 .with_encryption(genesis.encryption_secret());
538
539 Self::with_identity(config, identity)
540 }
541
542 #[cfg(feature = "std")]
568 pub fn from_persisted(
569 state: crate::security::PersistedState,
570 callsign: &str,
571 ) -> Result<Self, crate::security::PersistenceError> {
572 let identity = state.restore_identity()?;
574
575 let genesis = state.restore_genesis();
577
578 let mesh = if let Some(ref gen) = genesis {
580 Self::from_genesis(gen, identity, callsign)
581 } else {
582 let config = PeatMeshConfig::new(identity.node_id(), callsign, "RESTORED");
583 Self::with_identity(config, identity)
584 };
585
586 let restored_registry = state.restore_registry();
588 if let Ok(mut registry) = mesh.identity_registry.lock() {
589 *registry = restored_registry;
590 }
591
592 log::info!(
593 "PeatMesh restored from persisted state: node_id={:08X}, known_peers={}",
594 mesh.config.node_id.as_u32(),
595 mesh.known_identity_count()
596 );
597
598 Ok(mesh)
599 }
600
601 #[cfg(feature = "std")]
614 pub fn to_persisted_state(
615 &self,
616 genesis: Option<&crate::security::MeshGenesis>,
617 ) -> Option<crate::security::PersistedState> {
618 let identity = self.identity.as_ref()?;
619 let registry = self.identity_registry.lock().ok()?;
620
621 Some(crate::security::PersistedState::with_registry(
622 identity, genesis, ®istry,
623 ))
624 }
625
626 #[cfg(feature = "mesh-translator")]
640 pub fn set_decoded_document_callback(&self, cb: Arc<dyn crate::DecodedDocumentCallback>) {
641 if let Ok(mut slot) = self.decoded_document_callback.write() {
642 *slot = Some(cb);
643 }
644 }
645
646 #[cfg(all(feature = "mesh-translator", feature = "uniffi"))]
658 pub fn set_decoded_document_json_callback(
659 &self,
660 cb: Box<dyn crate::DecodedDocumentJsonCallback>,
661 ) {
662 let cb_arc: Arc<dyn crate::DecodedDocumentJsonCallback> = Arc::from(cb);
668 if let Ok(mut slot) = self.decoded_document_json_callback.write() {
669 *slot = Some(cb_arc);
670 }
671 }
672
673 pub fn acknowledge_polled_translator_consumer(&self) {
689 self.polled_translator_consumer_attested
690 .store(true, std::sync::atomic::Ordering::Release);
691 }
692
693 #[cfg(feature = "mesh-translator")]
697 pub fn set_translator_config(&self, config: crate::translator::TranslationConfig) {
698 if let Ok(mut t) = self.ble_translator.write() {
699 *t = crate::translator::BleTranslator::new(config);
700 }
701 }
702
703 #[cfg(feature = "mesh-translator")]
716 fn try_handle_translator_marker(
717 &self,
718 decrypted: &[u8],
719 peer: Option<&str>,
720 source_node: Option<NodeId>,
721 ) -> TranslatorMarkerOutcome {
722 if decrypted.is_empty() {
723 return TranslatorMarkerOutcome::NotTranslatorMarker;
724 }
725 let marker = decrypted[0];
726
727 if (TRANSLATOR_RESERVED_MARKER_START..=TRANSLATOR_RESERVED_MARKER_END).contains(&marker) {
733 log::warn!(
734 "ble: dropping reserved translator-marker frame (marker=0x{marker:02X}, len={})",
735 decrypted.len()
736 );
737 return TranslatorMarkerOutcome::Handled;
738 }
739
740 if marker != TRANSLATOR_FRAME_MARKER {
741 return TranslatorMarkerOutcome::NotTranslatorMarker;
742 }
743
744 if decrypted.len() < 2 {
746 log::warn!(
747 "ble: dropping truncated translator frame (len={}, missing collection code)",
748 decrypted.len()
749 );
750 return TranslatorMarkerOutcome::Handled;
751 }
752
753 let code = decrypted[1];
754 let payload = &decrypted[2..];
755
756 let (collection, decode_result) = {
759 let translator = match self.ble_translator.read() {
760 Ok(g) => g,
761 Err(_) => {
762 log::warn!("ble: translator RwLock poisoned; dropping frame");
763 return TranslatorMarkerOutcome::Handled;
764 }
765 };
766 let collection = match translator.code_to_collection(code) {
767 Some(c) => c.to_string(),
768 None => {
769 log::warn!(
770 "ble: dropping translator frame with unknown collection code 0x{code:02X}"
771 );
772 return TranslatorMarkerOutcome::Handled;
773 }
774 };
775
776 let mut ctx =
777 peat_mesh::transport::TranslationContext::inbound(peer.unwrap_or("unknown"))
778 .with_collection(collection.clone());
779 if let Some(node) = source_node {
780 ctx = ctx.with_local_wire_id(format!("{:08X}", node.as_u32()));
781 }
782
783 let result = translator.decode_inbound_sync(payload, &ctx);
784 (collection, result)
785 };
786
787 match decode_result {
788 Ok(Some(doc)) => {
789 let rust_cb = self
800 .decoded_document_callback
801 .read()
802 .ok()
803 .and_then(|g| g.as_ref().cloned());
804
805 #[cfg(feature = "uniffi")]
806 let json_cb = self
807 .decoded_document_json_callback
808 .read()
809 .ok()
810 .and_then(|g| g.as_ref().cloned());
811 #[cfg(not(feature = "uniffi"))]
812 let json_cb: Option<()> = None;
813
814 let polled_attested = self
815 .polled_translator_consumer_attested
816 .load(std::sync::atomic::Ordering::Acquire);
817
818 let any_consumer = rust_cb.is_some() || json_cb.is_some() || polled_attested;
819
820 if let Some(cb) = &rust_cb {
823 cb.on_document(&collection, doc.clone(), peer);
824 }
825
826 let doc_json_result = serde_json::to_string(&doc);
833
834 #[cfg(feature = "uniffi")]
835 if let (Some(json_cb), Ok(ref doc_json)) = (json_cb, &doc_json_result) {
836 json_cb.on_document(
837 collection.clone(),
838 doc_json.clone(),
839 peer.map(str::to_string),
840 );
841 }
842
843 if let Err(ref e) = doc_json_result {
844 log::warn!(
845 "ble: failed to serialize decoded {} doc to JSON: {}",
846 collection,
847 e
848 );
849 }
850
851 if !any_consumer {
852 log::debug!(
862 "ble: decoded {} frame but no consumer registered (no callback, no polled attestation)",
863 collection
864 );
865 self.notify(crate::observer::PeatEvent::TranslatorNoCallback {
866 collection: collection.clone(),
867 peer: peer.map(str::to_string),
868 });
869 }
870
871 if let Ok(doc_json) = doc_json_result {
877 return TranslatorMarkerOutcome::Decoded(DecodedTranslatorFrame {
878 collection,
879 doc_json,
880 peer: peer.map(str::to_string),
881 });
882 }
883 }
884 Ok(None) => {
885 log::debug!(
886 "ble: codec declined translator frame for collection {}",
887 collection
888 );
889 }
890 Err(e) => {
891 log::warn!(
892 "ble: translator frame decode error (collection={}): {:#}",
893 collection,
894 e
895 );
896 }
897 }
898
899 TranslatorMarkerOutcome::Handled
900 }
901
902 pub fn is_encryption_enabled(&self) -> bool {
906 self.encryption_key.is_some()
907 }
908
909 pub fn is_strict_encryption_enabled(&self) -> bool {
913 self.config.strict_encryption && self.encryption_key.is_some()
914 }
915
916 pub fn enable_encryption(&mut self, secret: &[u8; 32]) {
921 self.encryption_key = Some(MeshEncryptionKey::from_shared_secret(
922 &self.config.mesh_id,
923 secret,
924 ));
925 }
926
927 pub fn disable_encryption(&mut self) {
929 self.encryption_key = None;
930 }
931
932 fn encrypt_document(&self, plaintext: &[u8]) -> Vec<u8> {
937 match &self.encryption_key {
938 Some(key) => {
939 match key.encrypt_to_bytes(plaintext) {
941 Ok(ciphertext) => {
942 let mut buf = Vec::with_capacity(2 + ciphertext.len());
943 buf.push(ENCRYPTED_MARKER);
944 buf.push(0x00); buf.extend_from_slice(&ciphertext);
946 buf
947 }
948 Err(e) => {
949 log::error!("Encryption failed: {}", e);
950 plaintext.to_vec()
952 }
953 }
954 }
955 None => plaintext.to_vec(),
956 }
957 }
958
959 fn decrypt_document<'a>(
967 &self,
968 data: &'a [u8],
969 source_hint: Option<&str>,
970 ) -> Option<std::borrow::Cow<'a, [u8]>> {
971 log::debug!(
972 "decrypt_document: len={}, first_byte=0x{:02X}, source={:?}",
973 data.len(),
974 data.first().copied().unwrap_or(0),
975 source_hint
976 );
977
978 if data.len() >= 2 && data[0] == ENCRYPTED_MARKER {
980 let _reserved = data[1];
982 let encrypted_payload = &data[2..];
983
984 log::debug!(
985 "decrypt_document: encrypted payload len={}, nonce+ciphertext",
986 encrypted_payload.len()
987 );
988
989 match &self.encryption_key {
990 Some(key) => match key.decrypt_from_bytes(encrypted_payload) {
991 Ok(plaintext) => {
992 log::debug!(
993 "decrypt_document: SUCCESS, plaintext len={}",
994 plaintext.len()
995 );
996 Some(std::borrow::Cow::Owned(plaintext))
997 }
998 Err(e) => {
999 log::warn!(
1000 "decrypt_document: FAILED (wrong key or corrupted): {} [payload_len={}, source={:?}]",
1001 e,
1002 encrypted_payload.len(),
1003 source_hint
1004 );
1005 self.notify(PeatEvent::SecurityViolation {
1006 kind: SecurityViolationKind::DecryptionFailed,
1007 source: source_hint.map(String::from),
1008 });
1009 None
1010 }
1011 },
1012 None => {
1013 log::warn!(
1014 "decrypt_document: encryption not enabled but received encrypted doc"
1015 );
1016 None
1017 }
1018 }
1019 } else {
1020 if self.config.strict_encryption && self.encryption_key.is_some() {
1023 log::warn!(
1024 "Rejected unencrypted document in strict encryption mode (source: {:?})",
1025 source_hint
1026 );
1027 self.notify(PeatEvent::SecurityViolation {
1028 kind: SecurityViolationKind::UnencryptedInStrictMode,
1029 source: source_hint.map(String::from),
1030 });
1031 None
1032 } else {
1033 Some(std::borrow::Cow::Borrowed(data))
1035 }
1036 }
1037 }
1038
1039 pub fn decrypt_only(&self, data: &[u8]) -> Option<Vec<u8>> {
1053 self.decrypt_document(data, None)
1054 .map(|cow| cow.into_owned())
1055 }
1056
1057 pub fn has_identity(&self) -> bool {
1061 self.identity.is_some()
1062 }
1063
1064 pub fn public_key(&self) -> Option<[u8; 32]> {
1066 self.identity.as_ref().map(|id| id.public_key())
1067 }
1068
1069 pub fn create_attestation(&self, now_ms: u64) -> Option<IdentityAttestation> {
1073 self.identity
1074 .as_ref()
1075 .map(|id| id.create_attestation(now_ms))
1076 }
1077
1078 pub fn verify_peer_identity(&self, attestation: &IdentityAttestation) -> RegistryResult {
1087 self.identity_registry
1088 .lock()
1089 .unwrap()
1090 .verify_or_register(attestation)
1091 }
1092
1093 pub fn is_peer_identity_known(&self, node_id: NodeId) -> bool {
1095 self.identity_registry.lock().unwrap().is_known(node_id)
1096 }
1097
1098 pub fn peer_public_key(&self, node_id: NodeId) -> Option<[u8; 32]> {
1100 self.identity_registry
1101 .lock()
1102 .unwrap()
1103 .get_public_key(node_id)
1104 .copied()
1105 }
1106
1107 pub fn known_identity_count(&self) -> usize {
1109 self.identity_registry.lock().unwrap().len()
1110 }
1111
1112 pub fn pre_register_peer_identity(&self, node_id: NodeId, public_key: [u8; 32], now_ms: u64) {
1117 self.identity_registry
1118 .lock()
1119 .unwrap()
1120 .pre_register(node_id, public_key, now_ms);
1121 }
1122
1123 pub fn forget_peer_identity(&self, node_id: NodeId) {
1127 self.identity_registry.lock().unwrap().remove(node_id);
1128 }
1129
1130 pub fn sign(&self, data: &[u8]) -> Option<[u8; 64]> {
1134 self.identity.as_ref().map(|id| id.sign(data))
1135 }
1136
1137 pub fn verify_peer_signature(
1142 &self,
1143 node_id: NodeId,
1144 data: &[u8],
1145 signature: &[u8; 64],
1146 ) -> bool {
1147 if let Some(public_key) = self.peer_public_key(node_id) {
1148 crate::security::verify_signature(&public_key, data, signature)
1149 } else {
1150 false
1151 }
1152 }
1153
1154 pub fn is_relay_enabled(&self) -> bool {
1158 self.config.enable_relay
1159 }
1160
1161 pub fn enable_relay(&mut self) {
1163 self.config.enable_relay = true;
1164 }
1165
1166 pub fn disable_relay(&mut self) {
1168 self.config.enable_relay = false;
1169 }
1170
1171 pub fn has_seen_message(&self, message_id: &MessageId) -> bool {
1175 self.seen_cache.lock().unwrap().has_seen(message_id)
1176 }
1177
1178 pub fn mark_message_seen(&self, message_id: MessageId, origin: NodeId, now_ms: u64) -> bool {
1182 self.seen_cache
1183 .lock()
1184 .unwrap()
1185 .check_and_mark(message_id, origin, now_ms)
1186 }
1187
1188 pub fn seen_cache_size(&self) -> usize {
1190 self.seen_cache.lock().unwrap().len()
1191 }
1192
1193 pub fn clear_seen_cache(&self) {
1195 self.seen_cache.lock().unwrap().clear();
1196 }
1197
1198 pub fn wrap_for_relay(&self, payload: Vec<u8>) -> Vec<u8> {
1203 let envelope = RelayEnvelope::broadcast(self.config.node_id, payload)
1204 .with_max_hops(self.config.max_relay_hops);
1205 envelope.encode()
1206 }
1207
1208 pub fn get_relay_targets(&self, exclude_peer: Option<NodeId>) -> Vec<PeatPeer> {
1213 let connected = self.peer_manager.get_connected_peers();
1214 let filtered: Vec<_> = if let Some(exclude) = exclude_peer {
1215 connected
1216 .into_iter()
1217 .filter(|p| p.node_id != exclude)
1218 .collect()
1219 } else {
1220 connected
1221 };
1222
1223 self.gossip_strategy
1224 .select_peers(&filtered)
1225 .into_iter()
1226 .cloned()
1227 .collect()
1228 }
1229
1230 pub fn process_relay_envelope(
1240 &self,
1241 data: &[u8],
1242 source_peer: NodeId,
1243 now_ms: u64,
1244 ) -> Option<RelayDecision> {
1245 let envelope = RelayEnvelope::decode(data)?;
1247
1248 if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
1251 let is_new = self.connection_graph.lock().unwrap().on_relay_received(
1252 source_peer,
1253 envelope.origin_node,
1254 envelope.hop_count,
1255 now_ms,
1256 );
1257
1258 if is_new {
1259 log::debug!(
1260 "Discovered indirect peer {:08X} via {:08X} ({} hops)",
1261 envelope.origin_node.as_u32(),
1262 source_peer.as_u32(),
1263 envelope.hop_count
1264 );
1265 }
1266 }
1267
1268 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
1270 let stats = self
1272 .seen_cache
1273 .lock()
1274 .unwrap()
1275 .get_stats(&envelope.message_id);
1276 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
1277
1278 self.notify(PeatEvent::DuplicateMessageDropped {
1279 origin_node: envelope.origin_node,
1280 seen_count,
1281 });
1282
1283 log::debug!(
1284 "Dropped duplicate message {} from {:08X} (seen {} times)",
1285 envelope.message_id,
1286 envelope.origin_node.as_u32(),
1287 seen_count
1288 );
1289 return None;
1290 }
1291
1292 if !envelope.can_relay() {
1294 self.notify(PeatEvent::MessageTtlExpired {
1295 origin_node: envelope.origin_node,
1296 hop_count: envelope.hop_count,
1297 });
1298
1299 log::debug!(
1300 "Message {} from {:08X} TTL expired at hop {}",
1301 envelope.message_id,
1302 envelope.origin_node.as_u32(),
1303 envelope.hop_count
1304 );
1305
1306 return Some(RelayDecision {
1308 payload: envelope.payload,
1309 origin_node: envelope.origin_node,
1310 hop_count: envelope.hop_count,
1311 should_relay: false,
1312 relay_envelope: None,
1313 });
1314 }
1315
1316 let should_relay = self.config.enable_relay;
1318 let relay_envelope = if should_relay {
1319 envelope.relay() } else {
1321 None
1322 };
1323
1324 Some(RelayDecision {
1325 payload: envelope.payload,
1326 origin_node: envelope.origin_node,
1327 hop_count: envelope.hop_count,
1328 should_relay,
1329 relay_envelope,
1330 })
1331 }
1332
1333 pub fn build_relay_document(&self) -> Vec<u8> {
1338 let doc = self.build_document(); self.wrap_for_relay(doc)
1340 }
1341
1342 pub fn register_peer_for_delta(&self, peer_id: &NodeId) {
1349 let mut encoder = self.delta_encoder.lock().unwrap();
1350 encoder.add_peer(peer_id);
1351 log::debug!(
1352 "Registered peer {:08X} for delta sync tracking",
1353 peer_id.as_u32()
1354 );
1355 }
1356
1357 pub fn unregister_peer_for_delta(&self, peer_id: &NodeId) {
1361 let mut encoder = self.delta_encoder.lock().unwrap();
1362 encoder.remove_peer(peer_id);
1363 log::debug!(
1364 "Unregistered peer {:08X} from delta sync tracking",
1365 peer_id.as_u32()
1366 );
1367 }
1368
1369 pub fn reset_peer_delta_state(&self, peer_id: &NodeId) {
1374 let mut encoder = self.delta_encoder.lock().unwrap();
1375 encoder.reset_peer(peer_id);
1376 log::debug!("Reset delta sync state for peer {:08X}", peer_id.as_u32());
1377 }
1378
1379 pub fn record_delta_sent(&self, peer_id: &NodeId, bytes: usize) {
1381 let mut encoder = self.delta_encoder.lock().unwrap();
1382 encoder.record_sent(peer_id, bytes);
1383 }
1384
1385 pub fn record_delta_received(&self, peer_id: &NodeId, bytes: usize, timestamp: u64) {
1387 let mut encoder = self.delta_encoder.lock().unwrap();
1388 encoder.record_received(peer_id, bytes, timestamp);
1389 }
1390
1391 pub fn delta_stats(&self) -> DeltaStats {
1396 self.delta_encoder.lock().unwrap().stats()
1397 }
1398
1399 pub fn peer_delta_stats(&self, peer_id: &NodeId) -> Option<(u64, u64, u32)> {
1403 let encoder = self.delta_encoder.lock().unwrap();
1404 encoder
1405 .get_peer_state(peer_id)
1406 .map(|state| (state.bytes_sent, state.bytes_received, state.sync_count))
1407 }
1408
1409 pub fn build_delta_document_for_peer(&self, peer_id: &NodeId, now_ms: u64) -> Option<Vec<u8>> {
1417 let mut all_operations: Vec<Operation> = Vec::new();
1419
1420 for (node_id_u32, count) in self.document_sync.counter_entries() {
1423 all_operations.push(Operation::IncrementCounter {
1424 counter_id: 0, node_id: NodeId::new(node_id_u32),
1426 amount: count,
1427 timestamp: count, });
1429 }
1430
1431 let peripheral = self.document_sync.peripheral_snapshot();
1434 let peripheral_timestamp = peripheral
1435 .last_event
1436 .as_ref()
1437 .map(|e| e.timestamp)
1438 .unwrap_or(1); all_operations.push(Operation::UpdatePeripheral {
1440 peripheral,
1441 timestamp: peripheral_timestamp,
1442 });
1443
1444 if let Some(emergency) = self.document_sync.emergency_snapshot() {
1446 let source_node = NodeId::new(emergency.source_node());
1447 let timestamp = emergency.timestamp();
1448
1449 all_operations.push(Operation::SetEmergency {
1451 source_node,
1452 timestamp,
1453 known_peers: emergency.all_nodes(),
1454 });
1455
1456 for acked_node in emergency.acked_nodes() {
1458 all_operations.push(Operation::AckEmergency {
1459 node_id: NodeId::new(acked_node),
1460 emergency_timestamp: timestamp,
1461 });
1462 }
1463 }
1464
1465 for app_op in self.app_document_delta_ops() {
1467 all_operations.push(Operation::App(app_op));
1468 }
1469
1470 let filtered_operations: Vec<Operation> = {
1472 let encoder = self.delta_encoder.lock().unwrap();
1473 if let Some(peer_state) = encoder.get_peer_state(peer_id) {
1474 all_operations
1475 .into_iter()
1476 .filter(|op| peer_state.needs_send(&op.key(), op.timestamp()))
1477 .collect()
1478 } else {
1479 all_operations
1481 }
1482 };
1483
1484 if filtered_operations.is_empty() {
1486 return None;
1487 }
1488
1489 {
1491 let mut encoder = self.delta_encoder.lock().unwrap();
1492 if let Some(peer_state) = encoder.get_peer_state_mut(peer_id) {
1493 for op in &filtered_operations {
1494 peer_state.mark_sent(&op.key(), op.timestamp());
1495 }
1496 }
1497 }
1498
1499 let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1501 for op in filtered_operations {
1502 delta.add_operation(op);
1503 }
1504
1505 let encoded = delta.encode();
1507 let result = self.encrypt_document(&encoded);
1508
1509 {
1511 let mut encoder = self.delta_encoder.lock().unwrap();
1512 encoder.record_sent(peer_id, result.len());
1513 }
1514
1515 Some(result)
1516 }
1517
1518 pub fn build_full_delta_document(&self, now_ms: u64) -> Vec<u8> {
1523 let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1524
1525 for (node_id_u32, count) in self.document_sync.counter_entries() {
1527 delta.add_operation(Operation::IncrementCounter {
1528 counter_id: 0,
1529 node_id: NodeId::new(node_id_u32),
1530 amount: count,
1531 timestamp: now_ms,
1532 });
1533 }
1534
1535 let peripheral = self.document_sync.peripheral_snapshot();
1537 let peripheral_timestamp = peripheral
1538 .last_event
1539 .as_ref()
1540 .map(|e| e.timestamp)
1541 .unwrap_or(now_ms);
1542 delta.add_operation(Operation::UpdatePeripheral {
1543 peripheral,
1544 timestamp: peripheral_timestamp,
1545 });
1546
1547 if let Some(emergency) = self.document_sync.emergency_snapshot() {
1549 let source_node = NodeId::new(emergency.source_node());
1550 let timestamp = emergency.timestamp();
1551
1552 delta.add_operation(Operation::SetEmergency {
1553 source_node,
1554 timestamp,
1555 known_peers: emergency.all_nodes(),
1556 });
1557
1558 for acked_node in emergency.acked_nodes() {
1559 delta.add_operation(Operation::AckEmergency {
1560 node_id: NodeId::new(acked_node),
1561 emergency_timestamp: timestamp,
1562 });
1563 }
1564 }
1565
1566 for app_op in self.app_document_delta_ops() {
1568 delta.add_operation(Operation::App(app_op));
1569 }
1570
1571 let encoded = delta.encode();
1572 self.encrypt_document(&encoded)
1573 }
1574
1575 fn process_delta_document_internal(
1579 &self,
1580 source_node: NodeId,
1581 data: &[u8],
1582 now_ms: u64,
1583 relay_data: Option<Vec<u8>>,
1584 origin_node: Option<NodeId>,
1585 hop_count: u8,
1586 ) -> Option<DataReceivedResult> {
1587 let delta = DeltaDocument::decode(data)?;
1589
1590 if delta.origin_node == self.config.node_id {
1592 return None;
1593 }
1594
1595 let mut counter_changed = false;
1597 let mut emergency_changed = false;
1598 let mut is_emergency = false;
1599 let mut is_ack = false;
1600 let mut event_timestamp = 0u64;
1601 let mut peer_peripheral: Option<crate::sync::crdt::Peripheral> = None;
1602
1603 log::info!(
1604 "Delta document from {:08X}: {} operations, data_len={}",
1605 delta.origin_node.as_u32(),
1606 delta.operations.len(),
1607 data.len()
1608 );
1609 for op in &delta.operations {
1610 log::info!(" Operation: {}", op.key());
1611 match op {
1612 Operation::IncrementCounter {
1613 node_id, amount, ..
1614 } => {
1615 let current = self.document_sync.counter_entries();
1617 let current_value = current
1618 .iter()
1619 .find(|(id, _)| *id == node_id.as_u32())
1620 .map(|(_, v)| *v)
1621 .unwrap_or(0);
1622
1623 if *amount > current_value {
1624 counter_changed = true;
1627 }
1628 }
1629 Operation::UpdatePeripheral {
1630 peripheral,
1631 timestamp,
1632 } => {
1633 if let Ok(mut peripherals) = self.peer_peripherals.write() {
1635 peripherals.insert(delta.origin_node, peripheral.clone());
1636 }
1637 peer_peripheral = Some(peripheral.clone());
1639 if *timestamp > event_timestamp {
1641 event_timestamp = *timestamp;
1642 }
1643 }
1644 Operation::SetEmergency { timestamp, .. } => {
1645 is_emergency = true;
1646 emergency_changed = true;
1647 event_timestamp = *timestamp;
1648 }
1649 Operation::AckEmergency {
1650 emergency_timestamp,
1651 ..
1652 } => {
1653 is_ack = true;
1654 emergency_changed = true;
1655 if *emergency_timestamp > event_timestamp {
1656 event_timestamp = *emergency_timestamp;
1657 }
1658 }
1659 Operation::ClearEmergency {
1660 emergency_timestamp,
1661 } => {
1662 emergency_changed = true;
1663 if *emergency_timestamp > event_timestamp {
1664 event_timestamp = *emergency_timestamp;
1665 }
1666 }
1667 Operation::App(app_op) => {
1668 let doc_timestamp = app_op.timestamp & 0x0000_FFFF_FFFF_FFFF;
1674
1675 log::info!(
1676 "App operation received: type={:02X} op_code={:02X} from {:08X} ts={} payload_len={}",
1677 app_op.type_id,
1678 app_op.op_code,
1679 app_op.source_node,
1680 doc_timestamp,
1681 app_op.payload.len()
1682 );
1683
1684 let doc_key = (app_op.type_id, app_op.source_node, doc_timestamp);
1686 let changed = {
1687 let mut docs = self.app_documents.write().unwrap();
1688
1689 if let Some(existing) = docs.get_mut(&doc_key) {
1690 self.document_registry.apply_delta_op(
1692 app_op.type_id,
1693 existing.as_mut(),
1694 app_op,
1695 )
1696 } else {
1697 if let Some(decoded) = self
1699 .document_registry
1700 .decode(app_op.type_id, &app_op.payload)
1701 {
1702 docs.insert(doc_key, decoded);
1703 true
1704 } else {
1705 log::debug!(
1708 "Received delta for unknown doc {:?}, waiting for full state",
1709 doc_key
1710 );
1711 false
1712 }
1713 }
1714 };
1715
1716 self.observers.notify(PeatEvent::app_document_received(
1718 app_op.type_id,
1719 NodeId::new(app_op.source_node),
1720 doc_timestamp,
1721 changed,
1722 ));
1723 }
1724 }
1725 }
1726
1727 self.peer_manager.record_sync(source_node, now_ms);
1729
1730 {
1732 let mut encoder = self.delta_encoder.lock().unwrap();
1733 encoder.record_received(&source_node, data.len(), now_ms);
1734 }
1735
1736 if is_emergency {
1738 self.notify(PeatEvent::EmergencyReceived {
1739 from_node: delta.origin_node,
1740 });
1741 } else if is_ack {
1742 self.notify(PeatEvent::AckReceived {
1743 from_node: delta.origin_node,
1744 });
1745 }
1746
1747 if counter_changed {
1748 let total_count = self.document_sync.total_count();
1749 self.notify(PeatEvent::DocumentSynced {
1750 from_node: delta.origin_node,
1751 total_count,
1752 });
1753 }
1754
1755 if relay_data.is_some() {
1757 let relay_targets = self.get_relay_targets(Some(source_node));
1758 self.notify(PeatEvent::MessageRelayed {
1759 origin_node: origin_node.unwrap_or(delta.origin_node),
1760 relay_count: relay_targets.len(),
1761 hop_count,
1762 });
1763 }
1764
1765 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
1766 DataReceivedResult::peripheral_fields(&peer_peripheral);
1767
1768 Some(DataReceivedResult {
1769 source_node: delta.origin_node,
1770 is_emergency,
1771 is_ack,
1772 counter_changed,
1773 emergency_changed,
1774 total_count: self.document_sync.total_count(),
1775 event_timestamp,
1776 relay_data,
1777 origin_node,
1778 hop_count,
1779 callsign,
1780 battery_percent,
1781 heart_rate,
1782 event_type,
1783 latitude,
1784 longitude,
1785 altitude,
1786 ..Default::default()
1787 })
1788 }
1789
1790 pub fn enable_peer_e2ee(&self) {
1798 let mut sessions = self.peer_sessions.lock().unwrap();
1799 if sessions.is_none() {
1800 *sessions = Some(PeerSessionManager::new(self.config.node_id));
1801 log::info!(
1802 "Per-peer E2EE enabled for node {:08X}",
1803 self.config.node_id.as_u32()
1804 );
1805 }
1806 }
1807
1808 pub fn disable_peer_e2ee(&self) {
1812 let mut sessions = self.peer_sessions.lock().unwrap();
1813 *sessions = None;
1814 log::info!("Per-peer E2EE disabled");
1815 }
1816
1817 pub fn is_peer_e2ee_enabled(&self) -> bool {
1819 self.peer_sessions.lock().unwrap().is_some()
1820 }
1821
1822 pub fn peer_e2ee_public_key(&self) -> Option<[u8; 32]> {
1826 self.peer_sessions
1827 .lock()
1828 .unwrap()
1829 .as_ref()
1830 .map(|s| s.our_public_key())
1831 }
1832
1833 pub fn initiate_peer_e2ee(&self, peer_node_id: NodeId, now_ms: u64) -> Option<Vec<u8>> {
1839 let mut sessions = self.peer_sessions.lock().unwrap();
1840 let session_mgr = sessions.as_mut()?;
1841
1842 let key_exchange = session_mgr.initiate_session(peer_node_id, now_ms);
1843 let mut buf = Vec::with_capacity(2 + 37);
1844 buf.push(KEY_EXCHANGE_MARKER);
1845 buf.push(0x00); buf.extend_from_slice(&key_exchange.encode());
1847
1848 log::info!(
1849 "Initiated E2EE session with peer {:08X}",
1850 peer_node_id.as_u32()
1851 );
1852 Some(buf)
1853 }
1854
1855 pub fn has_peer_e2ee_session(&self, peer_node_id: NodeId) -> bool {
1857 self.peer_sessions
1858 .lock()
1859 .unwrap()
1860 .as_ref()
1861 .is_some_and(|s| s.has_session(peer_node_id))
1862 }
1863
1864 pub fn peer_e2ee_session_state(&self, peer_node_id: NodeId) -> Option<SessionState> {
1866 self.peer_sessions
1867 .lock()
1868 .unwrap()
1869 .as_ref()
1870 .and_then(|s| s.session_state(peer_node_id))
1871 }
1872
1873 pub fn send_peer_e2ee(
1878 &self,
1879 peer_node_id: NodeId,
1880 plaintext: &[u8],
1881 now_ms: u64,
1882 ) -> Option<Vec<u8>> {
1883 let mut sessions = self.peer_sessions.lock().unwrap();
1884 let session_mgr = sessions.as_mut()?;
1885
1886 match session_mgr.encrypt_for_peer(peer_node_id, plaintext, now_ms) {
1887 Ok(encrypted) => {
1888 let mut buf = Vec::with_capacity(2 + encrypted.encode().len());
1889 buf.push(PEER_E2EE_MARKER);
1890 buf.push(0x00); buf.extend_from_slice(&encrypted.encode());
1892 Some(buf)
1893 }
1894 Err(e) => {
1895 log::warn!(
1896 "Failed to encrypt for peer {:08X}: {:?}",
1897 peer_node_id.as_u32(),
1898 e
1899 );
1900 None
1901 }
1902 }
1903 }
1904
1905 pub fn close_peer_e2ee(&self, peer_node_id: NodeId) {
1907 let mut sessions = self.peer_sessions.lock().unwrap();
1908 if let Some(session_mgr) = sessions.as_mut() {
1909 session_mgr.close_session(peer_node_id);
1910 self.notify(PeatEvent::PeerE2eeClosed { peer_node_id });
1911 log::info!(
1912 "Closed E2EE session with peer {:08X}",
1913 peer_node_id.as_u32()
1914 );
1915 }
1916 }
1917
1918 pub fn peer_e2ee_session_count(&self) -> usize {
1920 self.peer_sessions
1921 .lock()
1922 .unwrap()
1923 .as_ref()
1924 .map(|s| s.session_count())
1925 .unwrap_or(0)
1926 }
1927
1928 pub fn peer_e2ee_established_count(&self) -> usize {
1930 self.peer_sessions
1931 .lock()
1932 .unwrap()
1933 .as_ref()
1934 .map(|s| s.established_count())
1935 .unwrap_or(0)
1936 }
1937
1938 fn handle_key_exchange(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1943 if data.len() < 2 || data[0] != KEY_EXCHANGE_MARKER {
1944 return None;
1945 }
1946
1947 let payload = &data[2..];
1948 let msg = KeyExchangeMessage::decode(payload)?;
1949
1950 let mut sessions = self.peer_sessions.lock().unwrap();
1951 let session_mgr = sessions.as_mut()?;
1952
1953 let (response, established) = session_mgr.handle_key_exchange(&msg, now_ms)?;
1954
1955 if established {
1956 self.notify(PeatEvent::PeerE2eeEstablished {
1957 peer_node_id: msg.sender_node_id,
1958 });
1959 log::info!(
1960 "E2EE session established with peer {:08X}",
1961 msg.sender_node_id.as_u32()
1962 );
1963 }
1964
1965 let mut buf = Vec::with_capacity(2 + 37);
1967 buf.push(KEY_EXCHANGE_MARKER);
1968 buf.push(0x00);
1969 buf.extend_from_slice(&response.encode());
1970 Some(buf)
1971 }
1972
1973 fn handle_peer_e2ee_message(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1978 if data.len() < 2 || data[0] != PEER_E2EE_MARKER {
1979 return None;
1980 }
1981
1982 let payload = &data[2..];
1983 let msg = PeerEncryptedMessage::decode(payload)?;
1984
1985 let mut sessions = self.peer_sessions.lock().unwrap();
1986 let session_mgr = sessions.as_mut()?;
1987
1988 match session_mgr.decrypt_from_peer(&msg, now_ms) {
1989 Ok(plaintext) => {
1990 self.notify(PeatEvent::PeerE2eeMessageReceived {
1992 from_node: msg.sender_node_id,
1993 data: plaintext.clone(),
1994 });
1995 Some(plaintext)
1996 }
1997 Err(e) => {
1998 log::warn!(
1999 "Failed to decrypt E2EE message from {:08X}: {:?}",
2000 msg.sender_node_id.as_u32(),
2001 e
2002 );
2003 None
2004 }
2005 }
2006 }
2007
2008 pub fn node_id(&self) -> NodeId {
2012 self.config.node_id
2013 }
2014
2015 pub fn callsign(&self) -> &str {
2017 &self.config.callsign
2018 }
2019
2020 pub fn mesh_id(&self) -> &str {
2022 &self.config.mesh_id
2023 }
2024
2025 pub fn device_name(&self) -> String {
2027 format!(
2028 "PEAT_{}-{:08X}",
2029 self.config.mesh_id,
2030 self.config.node_id.as_u32()
2031 )
2032 }
2033
2034 pub fn get_peer_callsign(&self, node_id: NodeId) -> Option<String> {
2039 self.peer_peripherals.read().ok().and_then(|peripherals| {
2040 peripherals
2041 .get(&node_id)
2042 .map(|p| p.callsign_str().to_string())
2043 })
2044 }
2045
2046 pub fn get_peer_peripheral(&self, node_id: NodeId) -> Option<Peripheral> {
2051 self.peer_peripherals
2052 .read()
2053 .ok()
2054 .and_then(|peripherals| peripherals.get(&node_id).cloned())
2055 }
2056
2057 pub fn document_registry(&self) -> &DocumentRegistry {
2072 &self.document_registry
2073 }
2074
2075 pub fn store_app_document<T: crate::registry::DocumentType>(&self, doc: T) -> bool {
2082 let type_id = T::TYPE_ID;
2083 let (source_node, timestamp) = doc.identity();
2084 let key = (type_id, source_node, timestamp);
2085
2086 let mut docs = self.app_documents.write().unwrap();
2087
2088 if let Some(existing) = docs.get_mut(&key) {
2089 self.document_registry
2091 .merge(type_id, existing.as_mut(), &doc)
2092 } else {
2093 docs.insert(key, Box::new(doc));
2095 true
2096 }
2097 }
2098
2099 pub fn store_app_document_boxed(
2106 &self,
2107 type_id: u8,
2108 source_node: u32,
2109 timestamp: u64,
2110 doc: Box<dyn core::any::Any + Send + Sync>,
2111 ) -> bool {
2112 let key = (type_id, source_node, timestamp);
2113
2114 let mut docs = self.app_documents.write().unwrap();
2115
2116 if let Some(existing) = docs.get_mut(&key) {
2117 self.document_registry
2119 .merge(type_id, existing.as_mut(), doc.as_ref())
2120 } else {
2121 docs.insert(key, doc);
2123 true
2124 }
2125 }
2126
2127 pub fn get_app_document<T: crate::registry::DocumentType>(
2131 &self,
2132 source_node: u32,
2133 timestamp: u64,
2134 ) -> Option<T> {
2135 let key = (T::TYPE_ID, source_node, timestamp);
2136
2137 let docs = self.app_documents.read().unwrap();
2138 docs.get(&key).and_then(|d| d.downcast_ref::<T>()).cloned()
2139 }
2140
2141 pub fn get_all_app_documents_of_type<T: crate::registry::DocumentType>(&self) -> Vec<T> {
2145 let docs = self.app_documents.read().unwrap();
2146 docs.iter()
2147 .filter(|((type_id, _, _), _)| *type_id == T::TYPE_ID)
2148 .filter_map(|(_, doc)| doc.downcast_ref::<T>().cloned())
2149 .collect()
2150 }
2151
2152 pub fn app_document_delta_ops(&self) -> Vec<crate::registry::AppOperation> {
2156 let docs = self.app_documents.read().unwrap();
2157 let mut ops = Vec::new();
2158
2159 for ((type_id, _source, _ts), doc) in docs.iter() {
2160 if let Some(op) = self.document_registry.to_delta_op(*type_id, doc.as_ref()) {
2161 ops.push(op);
2162 }
2163 }
2164
2165 ops
2166 }
2167
2168 pub fn app_document_keys(&self, type_id: u8) -> Vec<(u32, u64)> {
2172 let docs = self.app_documents.read().unwrap();
2173 docs.keys()
2174 .filter(|(tid, _, _)| *tid == type_id)
2175 .map(|(_, source, ts)| (*source, *ts))
2176 .collect()
2177 }
2178
2179 pub fn app_document_count(&self) -> usize {
2181 self.app_documents.read().unwrap().len()
2182 }
2183
2184 pub fn add_observer(&self, observer: Arc<dyn PeatObserver>) {
2188 self.observers.add(observer);
2189 }
2190
2191 pub fn remove_observer(&self, observer: &Arc<dyn PeatObserver>) {
2193 self.observers.remove(observer);
2194 }
2195
2196 pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
2203 let data = self.document_sync.send_emergency(timestamp);
2204 self.notify(PeatEvent::MeshStateChanged {
2205 peer_count: self.peer_manager.peer_count(),
2206 connected_count: self.peer_manager.connected_count(),
2207 });
2208 self.encrypt_document(&data)
2209 }
2210
2211 pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
2216 let data = self.document_sync.send_ack(timestamp);
2217 self.notify(PeatEvent::MeshStateChanged {
2218 peer_count: self.peer_manager.peer_count(),
2219 connected_count: self.peer_manager.connected_count(),
2220 });
2221 self.encrypt_document(&data)
2222 }
2223
2224 pub fn broadcast_bytes(&self, payload: &[u8]) -> Vec<u8> {
2231 self.encrypt_document(payload)
2232 }
2233
2234 pub fn clear_event(&self) {
2236 self.document_sync.clear_event();
2237 }
2238
2239 pub fn is_emergency_active(&self) -> bool {
2241 self.document_sync.is_emergency_active()
2242 }
2243
2244 pub fn is_ack_active(&self) -> bool {
2246 self.document_sync.is_ack_active()
2247 }
2248
2249 pub fn current_event(&self) -> Option<EventType> {
2251 self.document_sync.current_event()
2252 }
2253
2254 pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
2263 let data = self.document_sync.start_emergency(timestamp, known_peers);
2264 self.notify(PeatEvent::MeshStateChanged {
2265 peer_count: self.peer_manager.peer_count(),
2266 connected_count: self.peer_manager.connected_count(),
2267 });
2268 self.encrypt_document(&data)
2269 }
2270
2271 pub fn start_emergency_with_known_peers(&self, timestamp: u64) -> Vec<u8> {
2275 let peers: Vec<u32> = self
2276 .peer_manager
2277 .get_peers()
2278 .iter()
2279 .map(|p| p.node_id.as_u32())
2280 .collect();
2281 self.start_emergency(timestamp, &peers)
2282 }
2283
2284 pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
2289 let result = self.document_sync.ack_emergency(timestamp);
2290 if result.is_some() {
2291 self.notify(PeatEvent::MeshStateChanged {
2292 peer_count: self.peer_manager.peer_count(),
2293 connected_count: self.peer_manager.connected_count(),
2294 });
2295 }
2296 result.map(|data| self.encrypt_document(&data))
2297 }
2298
2299 pub fn clear_emergency(&self) {
2301 self.document_sync.clear_emergency();
2302 }
2303
2304 pub fn has_active_emergency(&self) -> bool {
2306 self.document_sync.has_active_emergency()
2307 }
2308
2309 pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
2313 self.document_sync.get_emergency_status()
2314 }
2315
2316 pub fn has_peer_acked(&self, peer_id: u32) -> bool {
2318 self.document_sync.has_peer_acked(peer_id)
2319 }
2320
2321 pub fn all_peers_acked(&self) -> bool {
2323 self.document_sync.all_peers_acked()
2324 }
2325
2326 #[cfg(feature = "legacy-chat")]
2336 pub fn send_chat(&self, sender: &str, text: &str, timestamp: u64) -> Option<Vec<u8>> {
2337 if self.document_sync.add_chat_message(sender, text, timestamp) {
2338 Some(self.encrypt_document(&self.build_document()))
2339 } else {
2340 None
2341 }
2342 }
2343
2344 #[cfg(feature = "legacy-chat")]
2352 pub fn send_chat_reply(
2353 &self,
2354 sender: &str,
2355 text: &str,
2356 reply_to_node: u32,
2357 reply_to_timestamp: u64,
2358 timestamp: u64,
2359 ) -> Option<Vec<u8>> {
2360 if self.document_sync.add_chat_reply(
2361 sender,
2362 text,
2363 reply_to_node,
2364 reply_to_timestamp,
2365 timestamp,
2366 ) {
2367 Some(self.encrypt_document(&self.build_document()))
2368 } else {
2369 None
2370 }
2371 }
2372
2373 #[cfg(feature = "legacy-chat")]
2375 pub fn chat_count(&self) -> usize {
2376 self.document_sync.chat_count()
2377 }
2378
2379 #[cfg(feature = "legacy-chat")]
2383 pub fn chat_messages_since(
2384 &self,
2385 since_timestamp: u64,
2386 ) -> Vec<(u32, u64, String, String, u32, u64)> {
2387 self.document_sync.chat_messages_since(since_timestamp)
2388 }
2389
2390 #[cfg(feature = "legacy-chat")]
2394 pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
2395 self.document_sync.all_chat_messages()
2396 }
2397
2398 pub fn on_ble_discovered(
2404 &self,
2405 identifier: &str,
2406 name: Option<&str>,
2407 rssi: i8,
2408 mesh_id: Option<&str>,
2409 now_ms: u64,
2410 ) -> Option<PeatPeer> {
2411 let (node_id, is_new) = self
2412 .peer_manager
2413 .on_discovered(identifier, name, rssi, mesh_id, now_ms)?;
2414
2415 let peer = self.peer_manager.get_peer(node_id)?;
2416
2417 {
2419 let mut graph = self.connection_graph.lock().unwrap();
2420 graph.on_discovered(
2421 node_id,
2422 identifier.to_string(),
2423 name.map(|s| s.to_string()),
2424 mesh_id.map(|s| s.to_string()),
2425 rssi,
2426 now_ms,
2427 );
2428 }
2429
2430 if is_new {
2431 self.notify(PeatEvent::PeerDiscovered { peer: peer.clone() });
2432 self.notify_mesh_state_changed();
2433 }
2434
2435 Some(peer)
2436 }
2437
2438 pub fn on_ble_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
2442 let node_id = match self.peer_manager.on_connected(identifier, now_ms) {
2443 Some(id) => id,
2444 None => {
2445 log::warn!(
2446 "on_ble_connected: identifier {:?} not in peer map — \
2447 use on_incoming_connection() for peripheral connections",
2448 identifier
2449 );
2450 return None;
2451 }
2452 };
2453
2454 {
2456 let mut graph = self.connection_graph.lock().unwrap();
2457 graph.on_connected(node_id, now_ms);
2458 }
2459
2460 self.register_peer_for_delta(&node_id);
2462
2463 self.notify(PeatEvent::PeerConnected { node_id });
2464 self.notify_mesh_state_changed();
2465 Some(node_id)
2466 }
2467
2468 pub fn on_ble_disconnected(
2470 &self,
2471 identifier: &str,
2472 reason: DisconnectReason,
2473 ) -> Option<NodeId> {
2474 let (node_id, observer_reason) = self.peer_manager.on_disconnected(identifier, reason)?;
2475
2476 {
2478 let mut graph = self.connection_graph.lock().unwrap();
2479 let platform_reason = match observer_reason {
2480 DisconnectReason::LocalRequest => crate::platform::DisconnectReason::LocalRequest,
2481 DisconnectReason::RemoteRequest => crate::platform::DisconnectReason::RemoteRequest,
2482 DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2483 DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2484 DisconnectReason::ConnectionFailed => {
2485 crate::platform::DisconnectReason::ConnectionFailed
2486 }
2487 DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2488 };
2489 let now_ms = std::time::SystemTime::now()
2490 .duration_since(std::time::UNIX_EPOCH)
2491 .map(|d| d.as_millis() as u64)
2492 .unwrap_or(0);
2493 graph.on_disconnected(node_id, platform_reason, now_ms);
2494
2495 graph.remove_via_peer(node_id);
2498 }
2499
2500 self.unregister_peer_for_delta(&node_id);
2502
2503 self.notify(PeatEvent::PeerDisconnected {
2504 node_id,
2505 reason: observer_reason,
2506 });
2507 self.notify_mesh_state_changed();
2508 Some(node_id)
2509 }
2510
2511 pub fn on_peer_disconnected(&self, node_id: NodeId, reason: DisconnectReason) {
2515 if self
2516 .peer_manager
2517 .on_disconnected_by_node_id(node_id, reason)
2518 {
2519 {
2521 let mut graph = self.connection_graph.lock().unwrap();
2522 let platform_reason = match reason {
2523 DisconnectReason::LocalRequest => {
2524 crate::platform::DisconnectReason::LocalRequest
2525 }
2526 DisconnectReason::RemoteRequest => {
2527 crate::platform::DisconnectReason::RemoteRequest
2528 }
2529 DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2530 DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2531 DisconnectReason::ConnectionFailed => {
2532 crate::platform::DisconnectReason::ConnectionFailed
2533 }
2534 DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2535 };
2536 let now_ms = std::time::SystemTime::now()
2537 .duration_since(std::time::UNIX_EPOCH)
2538 .map(|d| d.as_millis() as u64)
2539 .unwrap_or(0);
2540 graph.on_disconnected(node_id, platform_reason, now_ms);
2541
2542 graph.remove_via_peer(node_id);
2544 }
2545
2546 self.unregister_peer_for_delta(&node_id);
2548
2549 self.notify(PeatEvent::PeerDisconnected { node_id, reason });
2550 self.notify_mesh_state_changed();
2551 }
2552 }
2553
2554 pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
2558 let is_new = self
2559 .peer_manager
2560 .on_incoming_connection(identifier, node_id, now_ms);
2561
2562 {
2564 let mut graph = self.connection_graph.lock().unwrap();
2565 if is_new {
2566 graph.on_discovered(
2567 node_id,
2568 identifier.to_string(),
2569 None,
2570 Some(self.config.mesh_id.clone()),
2571 -50, now_ms,
2573 );
2574 }
2575 graph.on_connected(node_id, now_ms);
2576 }
2577
2578 self.register_peer_for_delta(&node_id);
2580
2581 if is_new {
2582 if let Some(peer) = self.peer_manager.get_peer(node_id) {
2583 self.notify(PeatEvent::PeerDiscovered { peer });
2584 }
2585 }
2586
2587 self.notify(PeatEvent::PeerConnected { node_id });
2588 self.notify_mesh_state_changed();
2589
2590 is_new
2591 }
2592
2593 pub fn on_ble_data_received(
2600 &self,
2601 identifier: &str,
2602 data: &[u8],
2603 now_ms: u64,
2604 ) -> Option<DataReceivedResult> {
2605 let node_id = self.peer_manager.get_node_id(identifier)?;
2607
2608 if data.len() >= 2 {
2610 match data[0] {
2611 KEY_EXCHANGE_MARKER => {
2612 let _response = self.handle_key_exchange(data, now_ms);
2614 return None;
2616 }
2617 PEER_E2EE_MARKER => {
2618 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2620 return None;
2622 }
2623 RELAY_ENVELOPE_MARKER => {
2624 return self
2626 .handle_relay_envelope_with_identifier(node_id, identifier, data, now_ms);
2627 }
2628 _ => {}
2629 }
2630 }
2631
2632 self.process_document_data_with_identifier(node_id, identifier, data, now_ms, None, None, 0)
2634 }
2635
2636 #[allow(clippy::too_many_arguments)]
2638 fn process_document_data_with_identifier(
2639 &self,
2640 source_node: NodeId,
2641 identifier: &str,
2642 data: &[u8],
2643 now_ms: u64,
2644 relay_data: Option<Vec<u8>>,
2645 origin_node: Option<NodeId>,
2646 hop_count: u8,
2647 ) -> Option<DataReceivedResult> {
2648 let decrypted = self.decrypt_document(data, Some(identifier))?;
2650
2651 #[cfg(feature = "mesh-translator")]
2658 match self.try_handle_translator_marker(&decrypted, Some(identifier), Some(source_node)) {
2659 TranslatorMarkerOutcome::NotTranslatorMarker => {}
2660 TranslatorMarkerOutcome::Handled => return None,
2661 TranslatorMarkerOutcome::Decoded(frame) => {
2662 return Some(DataReceivedResult::translator_frame(source_node, frame));
2663 }
2664 }
2665
2666 if DeltaDocument::is_delta_document(&decrypted) {
2668 return self.process_delta_document_internal(
2669 source_node,
2670 &decrypted,
2671 now_ms,
2672 relay_data,
2673 origin_node,
2674 hop_count,
2675 );
2676 }
2677
2678 let result = self.document_sync.merge_document(&decrypted)?;
2680
2681 if let Some(ref peripheral) = result.peer_peripheral {
2683 if let Ok(mut peripherals) = self.peer_peripherals.write() {
2684 peripherals.insert(result.source_node, peripheral.clone());
2685 }
2686 }
2687
2688 self.peer_manager.record_sync(source_node, now_ms);
2690
2691 if result.is_emergency() {
2693 self.notify(PeatEvent::EmergencyReceived {
2694 from_node: result.source_node,
2695 });
2696 } else if result.is_ack() {
2697 self.notify(PeatEvent::AckReceived {
2698 from_node: result.source_node,
2699 });
2700 }
2701
2702 if result.counter_changed {
2703 self.notify(PeatEvent::DocumentSynced {
2704 from_node: result.source_node,
2705 total_count: result.total_count,
2706 });
2707 }
2708
2709 if relay_data.is_some() {
2711 let relay_targets = self.get_relay_targets(Some(source_node));
2712 self.notify(PeatEvent::MessageRelayed {
2713 origin_node: origin_node.unwrap_or(result.source_node),
2714 relay_count: relay_targets.len(),
2715 hop_count,
2716 });
2717 }
2718
2719 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2720 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2721
2722 Some(DataReceivedResult {
2723 source_node: result.source_node,
2724 is_emergency: result.is_emergency(),
2725 is_ack: result.is_ack(),
2726 counter_changed: result.counter_changed,
2727 emergency_changed: result.emergency_changed,
2728 total_count: result.total_count,
2729 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2730 relay_data,
2731 origin_node,
2732 hop_count,
2733 callsign,
2734 battery_percent,
2735 heart_rate,
2736 event_type,
2737 latitude,
2738 longitude,
2739 altitude,
2740 ..Default::default()
2741 })
2742 }
2743
2744 fn handle_relay_envelope_with_identifier(
2746 &self,
2747 source_node: NodeId,
2748 identifier: &str,
2749 data: &[u8],
2750 now_ms: u64,
2751 ) -> Option<DataReceivedResult> {
2752 let envelope = RelayEnvelope::decode(data)?;
2754
2755 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2757 let stats = self
2758 .seen_cache
2759 .lock()
2760 .unwrap()
2761 .get_stats(&envelope.message_id);
2762 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2763
2764 self.notify(PeatEvent::DuplicateMessageDropped {
2765 origin_node: envelope.origin_node,
2766 seen_count,
2767 });
2768 return None;
2769 }
2770
2771 let relay_data = if envelope.can_relay() && self.config.enable_relay {
2773 envelope.relay().map(|e| e.encode())
2774 } else {
2775 if !envelope.can_relay() {
2776 self.notify(PeatEvent::MessageTtlExpired {
2777 origin_node: envelope.origin_node,
2778 hop_count: envelope.hop_count,
2779 });
2780 }
2781 None
2782 };
2783
2784 self.process_document_data_with_identifier(
2786 source_node,
2787 identifier,
2788 &envelope.payload,
2789 now_ms,
2790 relay_data,
2791 Some(envelope.origin_node),
2792 envelope.hop_count,
2793 )
2794 }
2795
2796 pub fn on_ble_data_received_from_node(
2803 &self,
2804 node_id: NodeId,
2805 data: &[u8],
2806 now_ms: u64,
2807 ) -> Option<DataReceivedResult> {
2808 if data.len() >= 2 {
2810 match data[0] {
2811 KEY_EXCHANGE_MARKER => {
2812 let _response = self.handle_key_exchange(data, now_ms);
2813 return None;
2814 }
2815 PEER_E2EE_MARKER => {
2816 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2817 return None;
2818 }
2819 RELAY_ENVELOPE_MARKER => {
2820 return self.handle_relay_envelope(node_id, data, now_ms);
2822 }
2823 _ => {}
2824 }
2825 }
2826
2827 self.process_document_data(node_id, data, now_ms, None, None, 0)
2829 }
2830
2831 pub fn on_ble_data_received_anonymous(
2841 &self,
2842 identifier: &str,
2843 data: &[u8],
2844 now_ms: u64,
2845 ) -> Option<DataReceivedResult> {
2846 log::debug!(
2847 "on_ble_data_received_anonymous: identifier={}, len={}, marker=0x{:02X}",
2848 identifier,
2849 data.len(),
2850 data.first().copied().unwrap_or(0)
2851 );
2852
2853 let decrypted = match self.decrypt_document(data, Some(identifier)) {
2855 Some(d) => d,
2856 None => {
2857 log::warn!(
2858 "on_ble_data_received_anonymous: decrypt/parse FAILED for {} byte doc from {}",
2859 data.len(),
2860 identifier
2861 );
2862 return None;
2863 }
2864 };
2865
2866 #[cfg(feature = "mesh-translator")]
2877 match self.try_handle_translator_marker(&decrypted, Some(identifier), None) {
2878 TranslatorMarkerOutcome::NotTranslatorMarker => {}
2879 TranslatorMarkerOutcome::Handled => return None,
2880 TranslatorMarkerOutcome::Decoded(frame) => {
2881 return Some(DataReceivedResult::translator_frame(NodeId::new(0), frame));
2886 }
2887 }
2888
2889 if decrypted.len() < 8 {
2892 log::warn!("Decrypted document too short to extract source_node");
2893 return None;
2894 }
2895
2896 let source_node_u32 =
2897 u32::from_le_bytes([decrypted[4], decrypted[5], decrypted[6], decrypted[7]]);
2898 let source_node = NodeId::new(source_node_u32);
2899
2900 log::info!(
2901 "Anonymous document from {}: source_node={:08X}, len={}",
2902 identifier,
2903 source_node_u32,
2904 decrypted.len()
2905 );
2906
2907 self.peer_manager
2910 .register_identifier(identifier, source_node);
2911
2912 let is_delta = DeltaDocument::is_delta_document(&decrypted);
2914 log::info!(
2915 "Document format: delta={}, first_byte=0x{:02X}, len={}",
2916 is_delta,
2917 decrypted.first().copied().unwrap_or(0),
2918 decrypted.len()
2919 );
2920
2921 if is_delta {
2922 return self.process_delta_document_internal(
2923 source_node,
2924 &decrypted,
2925 now_ms,
2926 None,
2927 None,
2928 0,
2929 );
2930 }
2931
2932 const APP_LAYER_MARKER: u8 = 0xAF;
2936 if decrypted.first().copied() == Some(APP_LAYER_MARKER) {
2937 log::debug!(
2938 "App-layer message (0xAF) from {:08X}, {} bytes - passing to relay",
2939 source_node.as_u32(),
2940 decrypted.len()
2941 );
2942 return Some(DataReceivedResult {
2943 source_node,
2944 is_emergency: false,
2945 is_ack: false,
2946 counter_changed: false,
2947 emergency_changed: false,
2948 total_count: 0,
2949 event_timestamp: now_ms,
2950 relay_data: Some(decrypted.to_vec()),
2951 origin_node: None,
2952 hop_count: 0,
2953 callsign: None,
2954 battery_percent: None,
2955 heart_rate: None,
2956 event_type: None,
2957 latitude: None,
2958 longitude: None,
2959 altitude: None,
2960 ..Default::default()
2961 });
2962 }
2963
2964 log::info!(
2966 "Processing legacy document from {:08X}",
2967 source_node.as_u32()
2968 );
2969 let result = self.document_sync.merge_document(&decrypted)?;
2970
2971 log::info!(
2973 "Merge result: peer_peripheral={}, counter_changed={}",
2974 result.peer_peripheral.is_some(),
2975 result.counter_changed
2976 );
2977 if let Some(ref p) = result.peer_peripheral {
2978 log::info!("Peripheral callsign: '{}'", p.callsign_str());
2979 }
2980
2981 self.peer_manager.record_sync(source_node, now_ms);
2983
2984 if result.is_emergency() {
2986 self.notify(PeatEvent::EmergencyReceived {
2987 from_node: result.source_node,
2988 });
2989 } else if result.is_ack() {
2990 self.notify(PeatEvent::AckReceived {
2991 from_node: result.source_node,
2992 });
2993 }
2994
2995 if result.counter_changed {
2996 self.notify(PeatEvent::DocumentSynced {
2997 from_node: result.source_node,
2998 total_count: result.total_count,
2999 });
3000 }
3001
3002 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
3003 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
3004
3005 Some(DataReceivedResult {
3006 source_node: result.source_node,
3007 is_emergency: result.is_emergency(),
3008 is_ack: result.is_ack(),
3009 counter_changed: result.counter_changed,
3010 emergency_changed: result.emergency_changed,
3011 total_count: result.total_count,
3012 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
3013 relay_data: None,
3014 origin_node: None,
3015 hop_count: 0,
3016 callsign,
3017 battery_percent,
3018 heart_rate,
3019 event_type,
3020 latitude,
3021 longitude,
3022 altitude,
3023 ..Default::default()
3024 })
3025 }
3026
3027 fn process_document_data(
3029 &self,
3030 source_node: NodeId,
3031 data: &[u8],
3032 now_ms: u64,
3033 relay_data: Option<Vec<u8>>,
3034 origin_node: Option<NodeId>,
3035 hop_count: u8,
3036 ) -> Option<DataReceivedResult> {
3037 let source_hint = format!("node:{:08X}", source_node.as_u32());
3039 let decrypted = self.decrypt_document(data, Some(&source_hint))?;
3040
3041 #[cfg(feature = "mesh-translator")]
3045 match self.try_handle_translator_marker(&decrypted, None, Some(source_node)) {
3046 TranslatorMarkerOutcome::NotTranslatorMarker => {}
3047 TranslatorMarkerOutcome::Handled => return None,
3048 TranslatorMarkerOutcome::Decoded(frame) => {
3049 return Some(DataReceivedResult::translator_frame(source_node, frame));
3050 }
3051 }
3052
3053 if DeltaDocument::is_delta_document(&decrypted) {
3055 return self.process_delta_document_internal(
3056 source_node,
3057 &decrypted,
3058 now_ms,
3059 relay_data,
3060 origin_node,
3061 hop_count,
3062 );
3063 }
3064
3065 let result = self.document_sync.merge_document(&decrypted)?;
3067
3068 if let Some(ref peripheral) = result.peer_peripheral {
3070 if let Ok(mut peripherals) = self.peer_peripherals.write() {
3071 peripherals.insert(result.source_node, peripheral.clone());
3072 }
3073 }
3074
3075 self.peer_manager.record_sync(source_node, now_ms);
3077
3078 if result.is_emergency() {
3080 self.notify(PeatEvent::EmergencyReceived {
3081 from_node: result.source_node,
3082 });
3083 } else if result.is_ack() {
3084 self.notify(PeatEvent::AckReceived {
3085 from_node: result.source_node,
3086 });
3087 }
3088
3089 if result.counter_changed {
3090 self.notify(PeatEvent::DocumentSynced {
3091 from_node: result.source_node,
3092 total_count: result.total_count,
3093 });
3094 }
3095
3096 if relay_data.is_some() {
3098 let relay_targets = self.get_relay_targets(Some(source_node));
3099 self.notify(PeatEvent::MessageRelayed {
3100 origin_node: origin_node.unwrap_or(result.source_node),
3101 relay_count: relay_targets.len(),
3102 hop_count,
3103 });
3104 }
3105
3106 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
3107 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
3108
3109 Some(DataReceivedResult {
3110 source_node: result.source_node,
3111 is_emergency: result.is_emergency(),
3112 is_ack: result.is_ack(),
3113 counter_changed: result.counter_changed,
3114 emergency_changed: result.emergency_changed,
3115 total_count: result.total_count,
3116 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
3117 relay_data,
3118 origin_node,
3119 hop_count,
3120 callsign,
3121 battery_percent,
3122 heart_rate,
3123 event_type,
3124 latitude,
3125 longitude,
3126 altitude,
3127 ..Default::default()
3128 })
3129 }
3130
3131 fn handle_relay_envelope(
3133 &self,
3134 source_node: NodeId,
3135 data: &[u8],
3136 now_ms: u64,
3137 ) -> Option<DataReceivedResult> {
3138 let decision = self.process_relay_envelope(data, source_node, now_ms)?;
3140
3141 let relay_data = if decision.should_relay {
3143 decision.relay_data()
3144 } else {
3145 None
3146 };
3147
3148 self.process_document_data(
3150 source_node,
3151 &decision.payload,
3152 now_ms,
3153 relay_data,
3154 Some(decision.origin_node),
3155 decision.hop_count,
3156 )
3157 }
3158
3159 pub fn on_ble_data(
3168 &self,
3169 identifier: &str,
3170 data: &[u8],
3171 now_ms: u64,
3172 ) -> Option<DataReceivedResult> {
3173 if data.len() >= 2 {
3175 match data[0] {
3176 KEY_EXCHANGE_MARKER => {
3177 let _response = self.handle_key_exchange(data, now_ms);
3178 return None;
3179 }
3180 PEER_E2EE_MARKER => {
3181 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
3182 return None;
3183 }
3184 RELAY_ENVELOPE_MARKER => {
3185 return self.handle_relay_envelope_with_incoming(identifier, data, now_ms);
3187 }
3188 _ => {}
3189 }
3190 }
3191
3192 self.process_incoming_document(identifier, data, now_ms, None, None, 0)
3194 }
3195
3196 fn process_incoming_document(
3198 &self,
3199 identifier: &str,
3200 data: &[u8],
3201 now_ms: u64,
3202 relay_data: Option<Vec<u8>>,
3203 origin_node: Option<NodeId>,
3204 hop_count: u8,
3205 ) -> Option<DataReceivedResult> {
3206 let decrypted = self.decrypt_document(data, Some(identifier))?;
3208
3209 let result = self.document_sync.merge_document(&decrypted)?;
3211
3212 self.peer_manager.record_sync(result.source_node, now_ms);
3214
3215 if origin_node.is_none() {
3220 let is_new =
3222 self.peer_manager
3223 .on_incoming_connection(identifier, result.source_node, now_ms);
3224
3225 {
3227 let mut graph = self.connection_graph.lock().unwrap();
3228 if is_new {
3229 graph.on_discovered(
3230 result.source_node,
3231 identifier.to_string(),
3232 None,
3233 Some(self.config.mesh_id.clone()),
3234 -50, now_ms,
3236 );
3237 }
3238 graph.on_connected(result.source_node, now_ms);
3239 }
3240 }
3241
3242 if result.is_emergency() {
3244 self.notify(PeatEvent::EmergencyReceived {
3245 from_node: result.source_node,
3246 });
3247 } else if result.is_ack() {
3248 self.notify(PeatEvent::AckReceived {
3249 from_node: result.source_node,
3250 });
3251 }
3252
3253 if result.counter_changed {
3254 self.notify(PeatEvent::DocumentSynced {
3255 from_node: result.source_node,
3256 total_count: result.total_count,
3257 });
3258 }
3259
3260 if relay_data.is_some() {
3262 let relay_targets = self.get_relay_targets(Some(result.source_node));
3263 self.notify(PeatEvent::MessageRelayed {
3264 origin_node: origin_node.unwrap_or(result.source_node),
3265 relay_count: relay_targets.len(),
3266 hop_count,
3267 });
3268 }
3269
3270 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
3271 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
3272
3273 Some(DataReceivedResult {
3274 source_node: result.source_node,
3275 is_emergency: result.is_emergency(),
3276 is_ack: result.is_ack(),
3277 counter_changed: result.counter_changed,
3278 emergency_changed: result.emergency_changed,
3279 total_count: result.total_count,
3280 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
3281 relay_data,
3282 origin_node,
3283 hop_count,
3284 callsign,
3285 battery_percent,
3286 heart_rate,
3287 event_type,
3288 latitude,
3289 longitude,
3290 altitude,
3291 ..Default::default()
3292 })
3293 }
3294
3295 fn handle_relay_envelope_with_incoming(
3297 &self,
3298 identifier: &str,
3299 data: &[u8],
3300 now_ms: u64,
3301 ) -> Option<DataReceivedResult> {
3302 let envelope = RelayEnvelope::decode(data)?;
3304
3305 if let Some(source_peer) = self.peer_manager.get_node_id(identifier) {
3308 if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
3309 let is_new = self.connection_graph.lock().unwrap().on_relay_received(
3310 source_peer,
3311 envelope.origin_node,
3312 envelope.hop_count,
3313 now_ms,
3314 );
3315
3316 if is_new {
3317 log::debug!(
3318 "Discovered indirect peer {:08X} via {:08X} ({} hops)",
3319 envelope.origin_node.as_u32(),
3320 source_peer.as_u32(),
3321 envelope.hop_count
3322 );
3323 }
3324 }
3325 }
3326
3327 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
3329 let stats = self
3331 .seen_cache
3332 .lock()
3333 .unwrap()
3334 .get_stats(&envelope.message_id);
3335 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
3336
3337 self.notify(PeatEvent::DuplicateMessageDropped {
3338 origin_node: envelope.origin_node,
3339 seen_count,
3340 });
3341 return None;
3342 }
3343
3344 let (should_relay, relay_data) = if envelope.can_relay() && self.config.enable_relay {
3346 let relay_env = envelope.relay();
3347 (true, relay_env.map(|e| e.encode()))
3348 } else {
3349 if !envelope.can_relay() {
3350 self.notify(PeatEvent::MessageTtlExpired {
3351 origin_node: envelope.origin_node,
3352 hop_count: envelope.hop_count,
3353 });
3354 }
3355 (false, None)
3356 };
3357
3358 self.process_incoming_document(
3360 identifier,
3361 &envelope.payload,
3362 now_ms,
3363 if should_relay { relay_data } else { None },
3364 Some(envelope.origin_node),
3365 envelope.hop_count,
3366 )
3367 }
3368
3369 pub fn tick(&self, now_ms: u64) -> Option<Vec<u8>> {
3379 use std::sync::atomic::Ordering;
3380
3381 let now_ms_32 = now_ms as u32;
3383
3384 let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3386 let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3387 if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3388 self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3389 let removed = self.peer_manager.cleanup_stale(now_ms);
3390 for node_id in &removed {
3391 self.notify(PeatEvent::PeerLost { node_id: *node_id });
3392 }
3393 if !removed.is_empty() {
3394 self.notify_mesh_state_changed();
3395 }
3396
3397 {
3399 let mut graph = self.connection_graph.lock().unwrap();
3400 let newly_lost = graph.tick(now_ms);
3401 graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3403 drop(graph);
3404
3405 for node_id in newly_lost {
3408 if !removed.contains(&node_id) {
3410 self.notify(PeatEvent::PeerLost { node_id });
3411 }
3412 }
3413 }
3414 }
3415
3416 let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3418 let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3419 if sync_elapsed >= self.config.sync_interval_ms as u32 {
3420 self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3421 if self.peer_manager.connected_count() > 0 {
3423 let doc = self.document_sync.build_document();
3424 return Some(self.encrypt_document(&doc));
3425 }
3426 }
3427
3428 None
3429 }
3430
3431 pub fn tick_with_peer_deltas(&self, now_ms: u64) -> Vec<(NodeId, Vec<u8>)> {
3440 use std::sync::atomic::Ordering;
3441 let now_ms_32 = now_ms as u32;
3442
3443 let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3445 let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3446 if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3447 self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3448 let removed = self.peer_manager.cleanup_stale(now_ms);
3449 for node_id in &removed {
3450 self.notify(PeatEvent::PeerLost { node_id: *node_id });
3451 }
3452 if !removed.is_empty() {
3453 self.notify_mesh_state_changed();
3454 }
3455
3456 {
3458 let mut graph = self.connection_graph.lock().unwrap();
3459 let newly_lost = graph.tick(now_ms);
3460 graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3461 drop(graph);
3462
3463 for node_id in newly_lost {
3464 if !removed.contains(&node_id) {
3465 self.notify(PeatEvent::PeerLost { node_id });
3466 }
3467 }
3468 }
3469 }
3470
3471 let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3473 let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3474 if sync_elapsed >= self.config.sync_interval_ms as u32 {
3475 self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3476
3477 let doc = self.document_sync.build_document();
3479 let encrypted = self.encrypt_document(&doc);
3480 let mut results = Vec::new();
3481 for peer in self.get_connected_peers() {
3482 results.push((peer.node_id, encrypted.clone()));
3483 }
3484 return results;
3485 }
3486
3487 Vec::new()
3488 }
3489
3490 pub fn get_peers(&self) -> Vec<PeatPeer> {
3494 self.peer_manager.get_peers()
3495 }
3496
3497 pub fn get_connected_peers(&self) -> Vec<PeatPeer> {
3499 self.peer_manager.get_connected_peers()
3500 }
3501
3502 pub fn get_peer(&self, node_id: NodeId) -> Option<PeatPeer> {
3504 self.peer_manager.get_peer(node_id)
3505 }
3506
3507 pub fn peer_count(&self) -> usize {
3509 self.peer_manager.peer_count()
3510 }
3511
3512 pub fn connected_count(&self) -> usize {
3514 self.peer_manager.connected_count()
3515 }
3516
3517 pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
3519 self.peer_manager.matches_mesh(device_mesh_id)
3520 }
3521
3522 pub fn get_connection_graph(&self) -> Vec<PeerConnectionState> {
3546 self.connection_graph.lock().unwrap().get_all_owned()
3547 }
3548
3549 pub fn get_peer_connection_state(&self, node_id: NodeId) -> Option<PeerConnectionState> {
3551 self.connection_graph
3552 .lock()
3553 .unwrap()
3554 .get_peer(node_id)
3555 .cloned()
3556 }
3557
3558 pub fn get_connected_states(&self) -> Vec<PeerConnectionState> {
3560 self.connection_graph
3561 .lock()
3562 .unwrap()
3563 .get_connected()
3564 .into_iter()
3565 .cloned()
3566 .collect()
3567 }
3568
3569 pub fn get_degraded_peers(&self) -> Vec<PeerConnectionState> {
3571 self.connection_graph
3572 .lock()
3573 .unwrap()
3574 .get_degraded()
3575 .into_iter()
3576 .cloned()
3577 .collect()
3578 }
3579
3580 pub fn get_recently_disconnected(
3584 &self,
3585 within_ms: u64,
3586 now_ms: u64,
3587 ) -> Vec<PeerConnectionState> {
3588 self.connection_graph
3589 .lock()
3590 .unwrap()
3591 .get_recently_disconnected(within_ms, now_ms)
3592 .into_iter()
3593 .cloned()
3594 .collect()
3595 }
3596
3597 pub fn get_lost_peers(&self) -> Vec<PeerConnectionState> {
3599 self.connection_graph
3600 .lock()
3601 .unwrap()
3602 .get_lost()
3603 .into_iter()
3604 .cloned()
3605 .collect()
3606 }
3607
3608 pub fn get_connection_state_counts(&self) -> StateCountSummary {
3610 self.connection_graph.lock().unwrap().state_counts()
3611 }
3612
3613 pub fn get_indirect_peers(&self) -> Vec<IndirectPeer> {
3621 self.connection_graph
3622 .lock()
3623 .unwrap()
3624 .get_indirect_peers_owned()
3625 }
3626
3627 pub fn get_peer_degree(&self, node_id: NodeId) -> Option<PeerDegree> {
3634 self.connection_graph.lock().unwrap().peer_degree(node_id)
3635 }
3636
3637 pub fn get_full_state_counts(&self) -> FullStateCountSummary {
3642 self.connection_graph.lock().unwrap().full_state_counts()
3643 }
3644
3645 pub fn get_paths_to_peer(&self, node_id: NodeId) -> Vec<(NodeId, u8)> {
3650 self.connection_graph.lock().unwrap().get_paths_to(node_id)
3651 }
3652
3653 pub fn is_peer_known(&self, node_id: NodeId) -> bool {
3655 self.connection_graph.lock().unwrap().is_known(node_id)
3656 }
3657
3658 pub fn indirect_peer_count(&self) -> usize {
3660 self.connection_graph.lock().unwrap().indirect_peer_count()
3661 }
3662
3663 pub fn cleanup_indirect_peers(&self, now_ms: u64) -> Vec<NodeId> {
3668 self.connection_graph
3669 .lock()
3670 .unwrap()
3671 .cleanup_indirect(now_ms)
3672 }
3673
3674 pub fn total_count(&self) -> u64 {
3676 self.document_sync.total_count()
3677 }
3678
3679 pub fn document_version(&self) -> u32 {
3681 self.document_sync.version()
3682 }
3683
3684 pub fn version(&self) -> u32 {
3686 self.document_sync.version()
3687 }
3688
3689 pub fn update_health(&self, battery_percent: u8) {
3691 self.document_sync.update_health(battery_percent);
3692 }
3693
3694 pub fn update_activity(&self, activity: u8) {
3700 self.document_sync.update_activity(activity);
3701 }
3702
3703 pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
3705 self.document_sync
3706 .update_health_full(battery_percent, activity);
3707 }
3708
3709 pub fn update_heart_rate(&self, heart_rate: u8) {
3711 self.document_sync.update_heart_rate(heart_rate);
3712 }
3713
3714 pub fn update_alerts(&self, alerts: u8) {
3718 self.document_sync.update_alerts(alerts);
3719 }
3720
3721 pub fn update_location(&self, latitude: f32, longitude: f32, altitude: Option<f32>) {
3723 self.document_sync
3724 .update_location(latitude, longitude, altitude);
3725 }
3726
3727 pub fn clear_location(&self) {
3729 self.document_sync.clear_location();
3730 }
3731
3732 pub fn update_callsign(&self, callsign: &str) {
3734 self.document_sync.update_callsign(callsign);
3735 }
3736
3737 pub fn set_peripheral_event(&self, event_type: EventType, timestamp: u64) {
3739 self.document_sync
3740 .set_peripheral_event(event_type, timestamp);
3741 }
3742
3743 pub fn clear_peripheral_event(&self) {
3745 self.document_sync.clear_peripheral_event();
3746 }
3747
3748 #[allow(clippy::too_many_arguments)]
3753 pub fn update_peripheral_state(
3754 &self,
3755 callsign: &str,
3756 battery_percent: u8,
3757 heart_rate: Option<u8>,
3758 latitude: Option<f32>,
3759 longitude: Option<f32>,
3760 altitude: Option<f32>,
3761 event_type: Option<EventType>,
3762 timestamp: u64,
3763 ) {
3764 self.document_sync.update_peripheral_state(
3765 callsign,
3766 battery_percent,
3767 heart_rate,
3768 latitude,
3769 longitude,
3770 altitude,
3771 event_type,
3772 timestamp,
3773 );
3774 }
3775
3776 pub fn build_document(&self) -> Vec<u8> {
3780 let doc = self.document_sync.build_document();
3781 self.encrypt_document(&doc)
3782 }
3783
3784 #[cfg(feature = "mesh-translator")]
3801 pub fn publish_translator_frame(
3802 &self,
3803 collection: &str,
3804 doc: &peat_mesh::sync::Document,
3805 ) -> Option<Vec<u8>> {
3806 let translator = self.ble_translator.read().ok()?;
3807 let ctx = peat_mesh::transport::TranslationContext::outbound()
3808 .with_collection(collection.to_string());
3809 let framed = translator.encode_outbound_sync(doc, &ctx)?;
3810 drop(translator);
3811 Some(self.encrypt_document(&framed))
3812 }
3813
3814 #[cfg(feature = "translator-codec")]
3833 pub fn publish_platform_advertisement(
3834 &self,
3835 peripheral: &crate::translator::BlePeripheral,
3836 ) -> Option<Vec<u8>> {
3837 let payload = crate::translator::postcard_encode(peripheral)?;
3838 let mut framed = Vec::with_capacity(2 + payload.len());
3839 framed.push(crate::document::TRANSLATOR_FRAME_MARKER);
3840 framed.push(crate::translator::COLLECTION_CODE_PLATFORMS);
3841 framed.extend_from_slice(&payload);
3842 Some(self.encrypt_document(&framed))
3843 }
3844
3845 pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<PeatPeer> {
3847 self.peer_manager.peers_needing_sync(now_ms)
3848 }
3849
3850 fn notify(&self, event: PeatEvent) {
3853 self.observers.notify(event);
3854 }
3855
3856 fn notify_mesh_state_changed(&self) {
3857 self.notify(PeatEvent::MeshStateChanged {
3858 peer_count: self.peer_manager.peer_count(),
3859 connected_count: self.peer_manager.connected_count(),
3860 });
3861 }
3862
3863 pub fn check_canned_message(&self, source_node: u32, timestamp: u64, _ttl_ms: u64) -> bool {
3883 let mut id_bytes = [0u8; 16];
3886 id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3887 id_bytes[4..12].copy_from_slice(×tamp.to_le_bytes());
3888 let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3889
3890 let seen = self.seen_cache.lock().unwrap();
3892 !seen.has_seen(&message_id)
3893 }
3894
3895 pub fn mark_canned_message_seen(&self, source_node: u32, timestamp: u64) {
3900 let now = std::time::SystemTime::now()
3901 .duration_since(std::time::UNIX_EPOCH)
3902 .map(|d| d.as_millis() as u64)
3903 .unwrap_or(0);
3904
3905 let mut id_bytes = [0u8; 16];
3907 id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3908 id_bytes[4..12].copy_from_slice(×tamp.to_le_bytes());
3909 let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3910 let origin = NodeId::new(source_node);
3911
3912 let mut seen = self.seen_cache.lock().unwrap();
3913 seen.mark_seen(message_id, origin, now);
3914 }
3915
3916 pub fn get_connected_peer_identifiers(&self) -> Vec<String> {
3921 self.peer_manager.get_connected_identifiers()
3922 }
3923}
3924
3925#[derive(Debug, Clone, Default)]
3927pub struct DataReceivedResult {
3928 pub source_node: NodeId,
3930
3931 pub is_emergency: bool,
3933
3934 pub is_ack: bool,
3936
3937 pub counter_changed: bool,
3939
3940 pub emergency_changed: bool,
3942
3943 pub total_count: u64,
3945
3946 pub event_timestamp: u64,
3948
3949 pub relay_data: Option<Vec<u8>>,
3954
3955 pub origin_node: Option<NodeId>,
3957
3958 pub hop_count: u8,
3960
3961 pub callsign: Option<String>,
3964
3965 pub battery_percent: Option<u8>,
3967
3968 pub heart_rate: Option<u8>,
3970
3971 pub event_type: Option<u8>,
3973
3974 pub latitude: Option<f32>,
3976
3977 pub longitude: Option<f32>,
3979
3980 pub altitude: Option<f32>,
3982
3983 pub decoded_translator_frame: Option<DecodedTranslatorFrame>,
3991}
3992
3993#[derive(Debug, Clone)]
4003pub struct DecodedTranslatorFrame {
4004 pub collection: String,
4006 pub doc_json: String,
4010 pub peer: Option<String>,
4012}
4013
4014#[cfg(feature = "mesh-translator")]
4019#[derive(Debug, Clone)]
4020enum TranslatorMarkerOutcome {
4021 NotTranslatorMarker,
4024 Decoded(DecodedTranslatorFrame),
4027 Handled,
4030}
4031
4032impl DataReceivedResult {
4033 #[cfg(feature = "mesh-translator")]
4041 pub(crate) fn translator_frame(source_node: NodeId, frame: DecodedTranslatorFrame) -> Self {
4042 Self {
4043 source_node,
4044 decoded_translator_frame: Some(frame),
4045 ..Default::default()
4046 }
4047 }
4048
4049 #[allow(clippy::type_complexity)]
4051 fn peripheral_fields(
4052 peripheral: &Option<crate::sync::crdt::Peripheral>,
4053 ) -> (
4054 Option<String>,
4055 Option<u8>,
4056 Option<u8>,
4057 Option<u8>,
4058 Option<f32>,
4059 Option<f32>,
4060 Option<f32>,
4061 ) {
4062 match peripheral {
4063 Some(p) => {
4064 let callsign = {
4065 let s = p.callsign_str();
4066 if s.is_empty() {
4067 None
4068 } else {
4069 Some(s.to_string())
4070 }
4071 };
4072 let battery = if p.health.battery_percent > 0 {
4073 Some(p.health.battery_percent)
4074 } else {
4075 None
4076 };
4077 let heart_rate = p.health.heart_rate;
4078 let event_type = p.last_event.as_ref().map(|e| e.event_type as u8);
4079 let (lat, lon, alt) = match &p.location {
4080 Some(loc) => (Some(loc.latitude), Some(loc.longitude), loc.altitude),
4081 None => (None, None, None),
4082 };
4083 (callsign, battery, heart_rate, event_type, lat, lon, alt)
4084 }
4085 None => (None, None, None, None, None, None, None),
4086 }
4087 }
4088}
4089
4090#[derive(Debug, Clone)]
4092pub struct RelayDecision {
4093 pub payload: Vec<u8>,
4095
4096 pub origin_node: NodeId,
4098
4099 pub hop_count: u8,
4101
4102 pub should_relay: bool,
4104
4105 pub relay_envelope: Option<RelayEnvelope>,
4109}
4110
4111impl RelayDecision {
4112 pub fn relay_data(&self) -> Option<Vec<u8>> {
4116 self.relay_envelope.as_ref().map(|e| e.encode())
4117 }
4118}
4119
4120#[cfg(all(test, feature = "std"))]
4121mod tests {
4122 use super::*;
4123 use crate::observer::CollectingObserver;
4124
4125 const TEST_TIMESTAMP: u64 = 1705276800000;
4127
4128 fn create_mesh(node_id: u32, callsign: &str) -> PeatMesh {
4129 let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST");
4130 PeatMesh::new(config)
4131 }
4132
4133 #[test]
4134 fn test_mesh_creation() {
4135 let mesh = create_mesh(0x12345678, "ALPHA-1");
4136
4137 assert_eq!(mesh.node_id().as_u32(), 0x12345678);
4138 assert_eq!(mesh.callsign(), "ALPHA-1");
4139 assert_eq!(mesh.mesh_id(), "TEST");
4140 assert_eq!(mesh.device_name(), "PEAT_TEST-12345678");
4141 }
4142
4143 #[test]
4144 fn test_peer_discovery() {
4145 let mesh = create_mesh(0x11111111, "ALPHA-1");
4146 let observer = Arc::new(CollectingObserver::new());
4147 mesh.add_observer(observer.clone());
4148
4149 let peer = mesh.on_ble_discovered(
4151 "device-uuid",
4152 Some("PEAT_TEST-22222222"),
4153 -65,
4154 Some("TEST"),
4155 1000,
4156 );
4157
4158 assert!(peer.is_some());
4159 let peer = peer.unwrap();
4160 assert_eq!(peer.node_id.as_u32(), 0x22222222);
4161
4162 let events = observer.events();
4164 assert!(events
4165 .iter()
4166 .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
4167 assert!(events
4168 .iter()
4169 .any(|e| matches!(e, PeatEvent::MeshStateChanged { .. })));
4170 }
4171
4172 #[test]
4173 fn test_connection_lifecycle() {
4174 let mesh = create_mesh(0x11111111, "ALPHA-1");
4175 let observer = Arc::new(CollectingObserver::new());
4176 mesh.add_observer(observer.clone());
4177
4178 mesh.on_ble_discovered(
4180 "device-uuid",
4181 Some("PEAT_TEST-22222222"),
4182 -65,
4183 Some("TEST"),
4184 1000,
4185 );
4186
4187 let node_id = mesh.on_ble_connected("device-uuid", 2000);
4188 assert_eq!(node_id, Some(NodeId::new(0x22222222)));
4189 assert_eq!(mesh.connected_count(), 1);
4190
4191 let node_id = mesh.on_ble_disconnected("device-uuid", DisconnectReason::RemoteRequest);
4193 assert_eq!(node_id, Some(NodeId::new(0x22222222)));
4194 assert_eq!(mesh.connected_count(), 0);
4195
4196 let events = observer.events();
4198 assert!(events
4199 .iter()
4200 .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
4201 assert!(events
4202 .iter()
4203 .any(|e| matches!(e, PeatEvent::PeerDisconnected { .. })));
4204 }
4205
4206 #[test]
4207 fn test_emergency_flow() {
4208 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4209 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4210
4211 let observer2 = Arc::new(CollectingObserver::new());
4212 mesh2.add_observer(observer2.clone());
4213
4214 let doc = mesh1.send_emergency(TEST_TIMESTAMP);
4216 assert!(mesh1.is_emergency_active());
4217
4218 let result =
4220 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4221
4222 assert!(result.is_some());
4223 let result = result.unwrap();
4224 assert!(result.is_emergency);
4225 assert_eq!(result.source_node.as_u32(), 0x11111111);
4226
4227 let events = observer2.events();
4229 assert!(events
4230 .iter()
4231 .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
4232 }
4233
4234 #[test]
4235 fn test_ack_flow() {
4236 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4237 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4238
4239 let observer2 = Arc::new(CollectingObserver::new());
4240 mesh2.add_observer(observer2.clone());
4241
4242 let doc = mesh1.send_ack(TEST_TIMESTAMP);
4244 assert!(mesh1.is_ack_active());
4245
4246 let result =
4248 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4249
4250 assert!(result.is_some());
4251 let result = result.unwrap();
4252 assert!(result.is_ack);
4253
4254 let events = observer2.events();
4256 assert!(events
4257 .iter()
4258 .any(|e| matches!(e, PeatEvent::AckReceived { .. })));
4259 }
4260
4261 #[test]
4262 fn test_tick_cleanup() {
4263 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4264 .with_peer_timeout(10_000);
4265 let mesh = PeatMesh::new(config);
4266
4267 let observer = Arc::new(CollectingObserver::new());
4268 mesh.add_observer(observer.clone());
4269
4270 mesh.on_ble_discovered(
4272 "device-uuid",
4273 Some("PEAT_TEST-22222222"),
4274 -65,
4275 Some("TEST"),
4276 1000,
4277 );
4278 assert_eq!(mesh.peer_count(), 1);
4279
4280 mesh.tick(5000);
4282 assert_eq!(mesh.peer_count(), 1);
4283
4284 mesh.tick(20000);
4286 assert_eq!(mesh.peer_count(), 0);
4287
4288 let events = observer.events();
4290 assert!(events
4291 .iter()
4292 .any(|e| matches!(e, PeatEvent::PeerLost { .. })));
4293 }
4294
4295 #[test]
4296 fn test_tick_sync_broadcast() {
4297 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4298 .with_sync_interval(5000);
4299 let mesh = PeatMesh::new(config);
4300
4301 mesh.on_ble_discovered(
4303 "device-uuid",
4304 Some("PEAT_TEST-22222222"),
4305 -65,
4306 Some("TEST"),
4307 1000,
4308 );
4309 mesh.on_ble_connected("device-uuid", 1000);
4310
4311 let _result = mesh.tick(0);
4313 let result = mesh.tick(3000);
4317 assert!(result.is_none());
4318
4319 let result = mesh.tick(6000);
4321 assert!(result.is_some());
4322
4323 let result = mesh.tick(6100);
4325 assert!(result.is_none());
4326
4327 let result = mesh.tick(12000);
4329 assert!(result.is_some());
4330 }
4331
4332 #[test]
4333 fn test_incoming_connection() {
4334 let mesh = create_mesh(0x11111111, "ALPHA-1");
4335 let observer = Arc::new(CollectingObserver::new());
4336 mesh.add_observer(observer.clone());
4337
4338 let is_new = mesh.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
4340
4341 assert!(is_new);
4342 assert_eq!(mesh.peer_count(), 1);
4343 assert_eq!(mesh.connected_count(), 1);
4344
4345 let events = observer.events();
4347 assert!(events
4348 .iter()
4349 .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
4350 assert!(events
4351 .iter()
4352 .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
4353 }
4354
4355 #[test]
4356 fn test_mesh_filtering() {
4357 let mesh = create_mesh(0x11111111, "ALPHA-1");
4358
4359 let peer = mesh.on_ble_discovered(
4361 "device-uuid-1",
4362 Some("PEAT_OTHER-22222222"),
4363 -65,
4364 Some("OTHER"),
4365 1000,
4366 );
4367 assert!(peer.is_none());
4368 assert_eq!(mesh.peer_count(), 0);
4369
4370 let peer = mesh.on_ble_discovered(
4372 "device-uuid-2",
4373 Some("PEAT_TEST-33333333"),
4374 -65,
4375 Some("TEST"),
4376 1000,
4377 );
4378 assert!(peer.is_some());
4379 assert_eq!(mesh.peer_count(), 1);
4380 }
4381
4382 fn create_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
4385 let config =
4386 PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_encryption(secret);
4387 PeatMesh::new(config)
4388 }
4389
4390 #[test]
4391 fn test_encryption_enabled() {
4392 let secret = [0x42u8; 32];
4393 let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4394
4395 assert!(mesh.is_encryption_enabled());
4396 }
4397
4398 #[test]
4399 fn test_encryption_disabled_by_default() {
4400 let mesh = create_mesh(0x11111111, "ALPHA-1");
4401
4402 assert!(!mesh.is_encryption_enabled());
4403 }
4404
4405 #[test]
4406 fn test_encrypted_document_exchange() {
4407 let secret = [0x42u8; 32];
4408 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4409 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4410
4411 let doc = mesh1.build_document();
4413
4414 assert!(doc.len() >= 2);
4416 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4417
4418 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4420
4421 assert!(result.is_some());
4422 let result = result.unwrap();
4423 assert_eq!(result.source_node.as_u32(), 0x11111111);
4424 }
4425
4426 #[test]
4427 fn test_encrypted_emergency_exchange() {
4428 let secret = [0x42u8; 32];
4429 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4430 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4431
4432 let observer = Arc::new(CollectingObserver::new());
4433 mesh2.add_observer(observer.clone());
4434
4435 let doc = mesh1.send_emergency(TEST_TIMESTAMP);
4437
4438 let result =
4440 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4441
4442 assert!(result.is_some());
4443 let result = result.unwrap();
4444 assert!(result.is_emergency);
4445
4446 let events = observer.events();
4448 assert!(events
4449 .iter()
4450 .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
4451 }
4452
4453 #[test]
4454 fn test_wrong_key_fails_decrypt() {
4455 let secret1 = [0x42u8; 32];
4456 let secret2 = [0x43u8; 32]; let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4458 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4459
4460 let doc = mesh1.build_document();
4462
4463 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4465
4466 assert!(result.is_none());
4467 }
4468
4469 #[test]
4470 fn test_unencrypted_mesh_can_read_unencrypted() {
4471 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4472 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4473
4474 let doc = mesh1.build_document();
4476
4477 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4479
4480 assert!(result.is_some());
4481 }
4482
4483 #[test]
4484 fn test_encrypted_mesh_can_receive_unencrypted() {
4485 let secret = [0x42u8; 32];
4487 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); let doc = mesh1.build_document();
4492
4493 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4495
4496 assert!(result.is_some());
4497 }
4498
4499 #[test]
4500 fn test_unencrypted_mesh_cannot_receive_encrypted() {
4501 let secret = [0x42u8; 32];
4502 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret); let mesh2 = create_mesh(0x22222222, "BRAVO-1"); let doc = mesh1.build_document();
4507
4508 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4510
4511 assert!(result.is_none());
4512 }
4513
4514 #[test]
4515 fn test_enable_disable_encryption() {
4516 let mut mesh = create_mesh(0x11111111, "ALPHA-1");
4517
4518 assert!(!mesh.is_encryption_enabled());
4519
4520 let secret = [0x42u8; 32];
4522 mesh.enable_encryption(&secret);
4523 assert!(mesh.is_encryption_enabled());
4524
4525 let doc = mesh.build_document();
4527 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4528
4529 mesh.disable_encryption();
4531 assert!(!mesh.is_encryption_enabled());
4532
4533 let doc = mesh.build_document();
4535 assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4536 }
4537
4538 #[test]
4539 fn test_encryption_overhead() {
4540 let secret = [0x42u8; 32];
4541 let mesh_encrypted = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4542 let mesh_unencrypted = create_mesh(0x22222222, "BRAVO-1");
4543
4544 let doc_encrypted = mesh_encrypted.build_document();
4545 let doc_unencrypted = mesh_unencrypted.build_document();
4546
4547 let overhead = doc_encrypted.len() - doc_unencrypted.len();
4553 assert_eq!(overhead, 30); }
4555
4556 #[test]
4559 fn test_peer_e2ee_enable_disable() {
4560 let mesh = create_mesh(0x11111111, "ALPHA-1");
4561
4562 assert!(!mesh.is_peer_e2ee_enabled());
4563 assert!(mesh.peer_e2ee_public_key().is_none());
4564
4565 mesh.enable_peer_e2ee();
4566 assert!(mesh.is_peer_e2ee_enabled());
4567 assert!(mesh.peer_e2ee_public_key().is_some());
4568
4569 mesh.disable_peer_e2ee();
4570 assert!(!mesh.is_peer_e2ee_enabled());
4571 }
4572
4573 #[test]
4574 fn test_peer_e2ee_initiate_session() {
4575 let mesh = create_mesh(0x11111111, "ALPHA-1");
4576 mesh.enable_peer_e2ee();
4577
4578 let key_exchange = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4579 assert!(key_exchange.is_some());
4580
4581 let key_exchange = key_exchange.unwrap();
4582 assert_eq!(key_exchange[0], crate::document::KEY_EXCHANGE_MARKER);
4584
4585 assert_eq!(mesh.peer_e2ee_session_count(), 1);
4587 assert_eq!(mesh.peer_e2ee_established_count(), 0);
4588 }
4589
4590 #[test]
4591 fn test_peer_e2ee_full_handshake() {
4592 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4593 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4594
4595 mesh1.enable_peer_e2ee();
4596 mesh2.enable_peer_e2ee();
4597
4598 let observer1 = Arc::new(CollectingObserver::new());
4599 let observer2 = Arc::new(CollectingObserver::new());
4600 mesh1.add_observer(observer1.clone());
4601 mesh2.add_observer(observer2.clone());
4602
4603 let key_exchange1 = mesh1
4605 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4606 .unwrap();
4607
4608 let response = mesh2.handle_key_exchange(&key_exchange1, 1000);
4610 assert!(response.is_some());
4611
4612 assert!(mesh2.has_peer_e2ee_session(NodeId::new(0x11111111)));
4614
4615 let key_exchange2 = response.unwrap();
4617 let _ = mesh1.handle_key_exchange(&key_exchange2, 1000);
4618
4619 assert!(mesh1.has_peer_e2ee_session(NodeId::new(0x22222222)));
4621
4622 let events1 = observer1.events();
4624 assert!(events1
4625 .iter()
4626 .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4627
4628 let events2 = observer2.events();
4629 assert!(events2
4630 .iter()
4631 .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4632 }
4633
4634 #[test]
4635 fn test_peer_e2ee_encrypt_decrypt() {
4636 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4637 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4638
4639 mesh1.enable_peer_e2ee();
4640 mesh2.enable_peer_e2ee();
4641
4642 let key_exchange1 = mesh1
4644 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4645 .unwrap();
4646 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4647 mesh1.handle_key_exchange(&key_exchange2, 1000);
4648
4649 let plaintext = b"Secret message from mesh1";
4651 let encrypted = mesh1.send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000);
4652 assert!(encrypted.is_some());
4653
4654 let encrypted = encrypted.unwrap();
4655 assert_eq!(encrypted[0], crate::document::PEER_E2EE_MARKER);
4657
4658 let observer2 = Arc::new(CollectingObserver::new());
4660 mesh2.add_observer(observer2.clone());
4661
4662 let decrypted = mesh2.handle_peer_e2ee_message(&encrypted, 2000);
4663 assert!(decrypted.is_some());
4664 assert_eq!(decrypted.unwrap(), plaintext);
4665
4666 let events = observer2.events();
4668 assert!(events.iter().any(|e| matches!(
4669 e,
4670 PeatEvent::PeerE2eeMessageReceived { from_node, data }
4671 if from_node.as_u32() == 0x11111111 && data == plaintext
4672 )));
4673 }
4674
4675 #[test]
4676 fn test_peer_e2ee_bidirectional() {
4677 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4678 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4679
4680 mesh1.enable_peer_e2ee();
4681 mesh2.enable_peer_e2ee();
4682
4683 let key_exchange1 = mesh1
4685 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4686 .unwrap();
4687 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4688 mesh1.handle_key_exchange(&key_exchange2, 1000);
4689
4690 let msg1 = mesh1
4692 .send_peer_e2ee(NodeId::new(0x22222222), b"Hello from mesh1", 2000)
4693 .unwrap();
4694 let dec1 = mesh2.handle_peer_e2ee_message(&msg1, 2000).unwrap();
4695 assert_eq!(dec1, b"Hello from mesh1");
4696
4697 let msg2 = mesh2
4699 .send_peer_e2ee(NodeId::new(0x11111111), b"Hello from mesh2", 2000)
4700 .unwrap();
4701 let dec2 = mesh1.handle_peer_e2ee_message(&msg2, 2000).unwrap();
4702 assert_eq!(dec2, b"Hello from mesh2");
4703 }
4704
4705 #[test]
4706 fn test_peer_e2ee_close_session() {
4707 let mesh = create_mesh(0x11111111, "ALPHA-1");
4708 mesh.enable_peer_e2ee();
4709
4710 let observer = Arc::new(CollectingObserver::new());
4711 mesh.add_observer(observer.clone());
4712
4713 mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4715 assert_eq!(mesh.peer_e2ee_session_count(), 1);
4716
4717 mesh.close_peer_e2ee(NodeId::new(0x22222222));
4719
4720 let events = observer.events();
4722 assert!(events
4723 .iter()
4724 .any(|e| matches!(e, PeatEvent::PeerE2eeClosed { .. })));
4725 }
4726
4727 #[test]
4728 fn test_peer_e2ee_without_enabling() {
4729 let mesh = create_mesh(0x11111111, "ALPHA-1");
4730
4731 let result = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4733 assert!(result.is_none());
4734
4735 let result = mesh.send_peer_e2ee(NodeId::new(0x22222222), b"test", 1000);
4736 assert!(result.is_none());
4737
4738 assert!(!mesh.has_peer_e2ee_session(NodeId::new(0x22222222)));
4739 }
4740
4741 #[test]
4742 fn test_peer_e2ee_overhead() {
4743 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4744 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4745
4746 mesh1.enable_peer_e2ee();
4747 mesh2.enable_peer_e2ee();
4748
4749 let key_exchange1 = mesh1
4751 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4752 .unwrap();
4753 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4754 mesh1.handle_key_exchange(&key_exchange2, 1000);
4755
4756 let plaintext = b"Test message";
4758 let encrypted = mesh1
4759 .send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000)
4760 .unwrap();
4761
4762 let overhead = encrypted.len() - plaintext.len();
4771 assert_eq!(overhead, 46);
4772 }
4773
4774 fn create_strict_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
4777 let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST")
4778 .with_encryption(secret)
4779 .with_strict_encryption();
4780 PeatMesh::new(config)
4781 }
4782
4783 #[test]
4784 fn test_strict_encryption_enabled() {
4785 let secret = [0x42u8; 32];
4786 let mesh = create_strict_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4787
4788 assert!(mesh.is_encryption_enabled());
4789 assert!(mesh.is_strict_encryption_enabled());
4790 }
4791
4792 #[test]
4793 fn test_strict_encryption_disabled_by_default() {
4794 let secret = [0x42u8; 32];
4795 let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4796
4797 assert!(mesh.is_encryption_enabled());
4798 assert!(!mesh.is_strict_encryption_enabled());
4799 }
4800
4801 #[test]
4802 fn test_strict_encryption_requires_encryption_enabled() {
4803 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4805 .with_strict_encryption(); let mesh = PeatMesh::new(config);
4807
4808 assert!(!mesh.is_encryption_enabled());
4809 assert!(!mesh.is_strict_encryption_enabled());
4810 }
4811
4812 #[test]
4813 fn test_strict_mode_accepts_encrypted_documents() {
4814 let secret = [0x42u8; 32];
4815 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4816 let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4817
4818 let doc = mesh1.build_document();
4820 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4821
4822 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4824 assert!(result.is_some());
4825 }
4826
4827 #[test]
4828 fn test_strict_mode_rejects_unencrypted_documents() {
4829 let secret = [0x42u8; 32];
4830 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret); let observer = Arc::new(CollectingObserver::new());
4834 mesh2.add_observer(observer.clone());
4835
4836 let doc = mesh1.build_document();
4838 assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4839
4840 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4842 assert!(result.is_none());
4843
4844 let events = observer.events();
4846 assert!(events.iter().any(|e| matches!(
4847 e,
4848 PeatEvent::SecurityViolation {
4849 kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4850 ..
4851 }
4852 )));
4853 }
4854
4855 #[test]
4856 fn test_non_strict_mode_accepts_unencrypted_documents() {
4857 let secret = [0x42u8; 32];
4858 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); let doc = mesh1.build_document();
4863
4864 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4866 assert!(result.is_some());
4867 }
4868
4869 #[test]
4870 fn test_strict_mode_security_violation_event_includes_source() {
4871 let secret = [0x42u8; 32];
4872 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4873 let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4874
4875 let observer = Arc::new(CollectingObserver::new());
4876 mesh2.add_observer(observer.clone());
4877
4878 let doc = mesh1.build_document();
4879
4880 mesh2.on_ble_discovered(
4882 "test-device-uuid",
4883 Some("PEAT_TEST-11111111"),
4884 -65,
4885 Some("TEST"),
4886 500,
4887 );
4888 mesh2.on_ble_connected("test-device-uuid", 600);
4889
4890 let _result = mesh2.on_ble_data_received("test-device-uuid", &doc, 1000);
4891
4892 let events = observer.events();
4894 let violation = events.iter().find(|e| {
4895 matches!(
4896 e,
4897 PeatEvent::SecurityViolation {
4898 kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4899 ..
4900 }
4901 )
4902 });
4903 assert!(violation.is_some());
4904
4905 if let Some(PeatEvent::SecurityViolation { source, .. }) = violation {
4906 assert!(source.is_some());
4907 assert_eq!(source.as_ref().unwrap(), "test-device-uuid");
4908 }
4909 }
4910
4911 #[test]
4912 fn test_decryption_failure_emits_security_violation() {
4913 let secret1 = [0x42u8; 32];
4914 let secret2 = [0x43u8; 32]; let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4916 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4917
4918 let observer = Arc::new(CollectingObserver::new());
4919 mesh2.add_observer(observer.clone());
4920
4921 let doc = mesh1.build_document();
4923
4924 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4926 assert!(result.is_none());
4927
4928 let events = observer.events();
4930 assert!(events.iter().any(|e| matches!(
4931 e,
4932 PeatEvent::SecurityViolation {
4933 kind: crate::observer::SecurityViolationKind::DecryptionFailed,
4934 ..
4935 }
4936 )));
4937 }
4938
4939 #[test]
4940 fn test_strict_mode_builder_chain() {
4941 let secret = [0x42u8; 32];
4942 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4943 .with_encryption(secret)
4944 .with_strict_encryption()
4945 .with_sync_interval(10_000)
4946 .with_peer_timeout(60_000);
4947
4948 let mesh = PeatMesh::new(config);
4949
4950 assert!(mesh.is_encryption_enabled());
4951 assert!(mesh.is_strict_encryption_enabled());
4952 }
4953
4954 fn create_relay_mesh(node_id: u32, callsign: &str) -> PeatMesh {
4957 let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_relay();
4958 PeatMesh::new(config)
4959 }
4960
4961 #[test]
4962 fn test_relay_disabled_by_default() {
4963 let mesh = create_mesh(0x11111111, "ALPHA-1");
4964 assert!(!mesh.is_relay_enabled());
4965 }
4966
4967 #[test]
4968 fn test_relay_enabled() {
4969 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4970 assert!(mesh.is_relay_enabled());
4971 }
4972
4973 #[test]
4974 fn test_relay_config_builder() {
4975 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4976 .with_relay()
4977 .with_max_relay_hops(5)
4978 .with_relay_fanout(3)
4979 .with_seen_cache_ttl(60_000);
4980
4981 assert!(config.enable_relay);
4982 assert_eq!(config.max_relay_hops, 5);
4983 assert_eq!(config.relay_fanout, 3);
4984 assert_eq!(config.seen_cache_ttl_ms, 60_000);
4985 }
4986
4987 #[test]
4988 fn test_seen_message_deduplication() {
4989 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4990 let origin = NodeId::new(0x22222222);
4991 let msg_id = crate::relay::MessageId::from_content(origin, 1000, 0xDEADBEEF);
4992
4993 assert!(mesh.mark_message_seen(msg_id, origin, 1000));
4995
4996 assert!(!mesh.mark_message_seen(msg_id, origin, 2000));
4998
4999 assert_eq!(mesh.seen_cache_size(), 1);
5000 }
5001
5002 #[test]
5003 fn test_wrap_for_relay() {
5004 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5005
5006 let payload = vec![1, 2, 3, 4, 5];
5007 let wrapped = mesh.wrap_for_relay(payload.clone());
5008
5009 assert_eq!(wrapped[0], crate::relay::RELAY_ENVELOPE_MARKER);
5011
5012 let envelope = crate::relay::RelayEnvelope::decode(&wrapped).unwrap();
5014 assert_eq!(envelope.payload, payload);
5015 assert_eq!(envelope.origin_node, NodeId::new(0x11111111));
5016 assert_eq!(envelope.hop_count, 0);
5017 }
5018
5019 #[test]
5020 fn test_process_relay_envelope_new_message() {
5021 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5022 let observer = Arc::new(CollectingObserver::new());
5023 mesh.add_observer(observer.clone());
5024
5025 let payload = vec![1, 2, 3, 4, 5];
5027 let envelope =
5028 crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
5029 .with_max_hops(7);
5030 let data = envelope.encode();
5031
5032 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
5034
5035 assert!(decision.is_some());
5036 let decision = decision.unwrap();
5037 assert_eq!(decision.payload, payload);
5038 assert_eq!(decision.origin_node.as_u32(), 0x22222222);
5039 assert_eq!(decision.hop_count, 0);
5040 assert!(decision.should_relay);
5041 assert!(decision.relay_envelope.is_some());
5042
5043 let relay_env = decision.relay_envelope.unwrap();
5045 assert_eq!(relay_env.hop_count, 1);
5046 }
5047
5048 #[test]
5049 fn test_process_relay_envelope_duplicate() {
5050 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5051 let observer = Arc::new(CollectingObserver::new());
5052 mesh.add_observer(observer.clone());
5053
5054 let payload = vec![1, 2, 3, 4, 5];
5055 let envelope = crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload);
5056 let data = envelope.encode();
5057
5058 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
5060 assert!(decision.is_some());
5061
5062 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 2000);
5064 assert!(decision.is_none());
5065
5066 let events = observer.events();
5068 assert!(events
5069 .iter()
5070 .any(|e| matches!(e, PeatEvent::DuplicateMessageDropped { .. })));
5071 }
5072
5073 #[test]
5074 fn test_process_relay_envelope_ttl_expired() {
5075 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5076 let observer = Arc::new(CollectingObserver::new());
5077 mesh.add_observer(observer.clone());
5078
5079 let payload = vec![1, 2, 3, 4, 5];
5081 let mut envelope =
5082 crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
5083 .with_max_hops(3);
5084
5085 envelope = envelope.relay().unwrap(); envelope = envelope.relay().unwrap(); envelope = envelope.relay().unwrap(); let data = envelope.encode();
5091
5092 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
5094
5095 assert!(decision.is_some());
5096 let decision = decision.unwrap();
5097 assert_eq!(decision.payload, payload);
5098 assert!(!decision.should_relay); assert!(decision.relay_envelope.is_none());
5100
5101 let events = observer.events();
5103 assert!(events
5104 .iter()
5105 .any(|e| matches!(e, PeatEvent::MessageTtlExpired { .. })));
5106 }
5107
5108 #[test]
5109 fn test_build_relay_document() {
5110 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5111
5112 let relay_doc = mesh.build_relay_document();
5113
5114 assert_eq!(relay_doc[0], crate::relay::RELAY_ENVELOPE_MARKER);
5116
5117 let envelope = crate::relay::RelayEnvelope::decode(&relay_doc).unwrap();
5119 assert_eq!(envelope.origin_node.as_u32(), 0x11111111);
5120
5121 let doc = crate::document::PeatDocument::decode(&envelope.payload);
5123 assert!(doc.is_some());
5124 }
5125
5126 #[test]
5127 fn test_relay_targets_excludes_source() {
5128 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5129
5130 mesh.on_ble_discovered(
5132 "peer-1",
5133 Some("PEAT_TEST-22222222"),
5134 -60,
5135 Some("TEST"),
5136 1000,
5137 );
5138 mesh.on_ble_connected("peer-1", 1000);
5139
5140 mesh.on_ble_discovered(
5141 "peer-2",
5142 Some("PEAT_TEST-33333333"),
5143 -65,
5144 Some("TEST"),
5145 1000,
5146 );
5147 mesh.on_ble_connected("peer-2", 1000);
5148
5149 mesh.on_ble_discovered(
5150 "peer-3",
5151 Some("PEAT_TEST-44444444"),
5152 -70,
5153 Some("TEST"),
5154 1000,
5155 );
5156 mesh.on_ble_connected("peer-3", 1000);
5157
5158 let targets = mesh.get_relay_targets(Some(NodeId::new(0x33333333)));
5160
5161 assert!(targets.iter().all(|p| p.node_id.as_u32() != 0x33333333));
5163 }
5164
5165 #[test]
5166 fn test_clear_seen_cache() {
5167 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5168 let origin = NodeId::new(0x22222222);
5169
5170 mesh.mark_message_seen(
5172 crate::relay::MessageId::from_content(origin, 1000, 0x11111111),
5173 origin,
5174 1000,
5175 );
5176 mesh.mark_message_seen(
5177 crate::relay::MessageId::from_content(origin, 2000, 0x22222222),
5178 origin,
5179 2000,
5180 );
5181
5182 assert_eq!(mesh.seen_cache_size(), 2);
5183
5184 mesh.clear_seen_cache();
5186 assert_eq!(mesh.seen_cache_size(), 0);
5187 }
5188}