1#[cfg(not(feature = "std"))]
57use alloc::{string::String, sync::Arc, vec::Vec};
58#[cfg(feature = "std")]
59use std::collections::HashMap;
60#[cfg(feature = "std")]
61use std::sync::Arc;
62
63use crate::document::{ENCRYPTED_MARKER, KEY_EXCHANGE_MARKER, PEER_E2EE_MARKER};
64#[cfg(feature = "translator-codec")]
65use crate::document::{
66 TRANSLATOR_FRAME_MARKER, TRANSLATOR_RESERVED_MARKER_END, TRANSLATOR_RESERVED_MARKER_START,
67};
68use crate::document_sync::DocumentSync;
69use crate::gossip::{GossipStrategy, RandomFanout};
70use crate::observer::{DisconnectReason, PeatEvent, PeatObserver, SecurityViolationKind};
71use crate::peer::{
72 ConnectionStateGraph, FullStateCountSummary, IndirectPeer, PeatPeer, PeerConnectionState,
73 PeerDegree, PeerManagerConfig, StateCountSummary,
74};
75use crate::peer_manager::PeerManager;
76use crate::relay::{
77 MessageId, RelayEnvelope, SeenMessageCache, DEFAULT_MAX_HOPS, DEFAULT_SEEN_TTL_MS,
78 RELAY_ENVELOPE_MARKER,
79};
80use crate::security::{
81 DeviceIdentity, IdentityAttestation, IdentityRegistry, KeyExchangeMessage, MeshEncryptionKey,
82 PeerEncryptedMessage, PeerSessionManager, RegistryResult, SessionState,
83};
84use crate::sync::crdt::{EventType, Peripheral, PeripheralType};
85use crate::sync::delta::{DeltaEncoder, DeltaStats};
86use crate::sync::delta_document::{DeltaDocument, Operation};
87use crate::NodeId;
88
89#[cfg(feature = "std")]
90use crate::observer::ObserverManager;
91
92use crate::registry::DocumentRegistry;
93
94#[derive(Debug, Clone)]
96pub struct PeatMeshConfig {
97 pub node_id: NodeId,
99
100 pub callsign: String,
102
103 pub mesh_id: String,
105
106 pub peripheral_type: PeripheralType,
108
109 pub peer_config: PeerManagerConfig,
111
112 pub sync_interval_ms: u64,
114
115 pub auto_broadcast_events: bool,
117
118 pub encryption_secret: Option<[u8; 32]>,
124
125 pub strict_encryption: bool,
133
134 pub enable_relay: bool,
141
142 pub max_relay_hops: u8,
147
148 pub relay_fanout: usize,
154
155 pub seen_cache_ttl_ms: u64,
160}
161
162impl PeatMeshConfig {
163 pub fn new(node_id: NodeId, callsign: &str, mesh_id: &str) -> Self {
165 Self {
166 node_id,
167 callsign: callsign.into(),
168 mesh_id: mesh_id.into(),
169 peripheral_type: PeripheralType::SoldierSensor,
170 peer_config: PeerManagerConfig::with_mesh_id(mesh_id),
171 sync_interval_ms: 5000,
172 auto_broadcast_events: true,
173 encryption_secret: None,
174 strict_encryption: false,
175 enable_relay: false,
176 max_relay_hops: DEFAULT_MAX_HOPS,
177 relay_fanout: 2,
178 seen_cache_ttl_ms: DEFAULT_SEEN_TTL_MS,
179 }
180 }
181
182 pub fn with_encryption(mut self, secret: [u8; 32]) -> Self {
187 self.encryption_secret = Some(secret);
188 self
189 }
190
191 pub fn with_peripheral_type(mut self, ptype: PeripheralType) -> Self {
193 self.peripheral_type = ptype;
194 self
195 }
196
197 pub fn with_sync_interval(mut self, interval_ms: u64) -> Self {
199 self.sync_interval_ms = interval_ms;
200 self
201 }
202
203 pub fn with_peer_timeout(mut self, timeout_ms: u64) -> Self {
205 self.peer_config.peer_timeout_ms = timeout_ms;
206 self
207 }
208
209 pub fn with_max_peers(mut self, max: usize) -> Self {
211 self.peer_config.max_peers = max;
212 self
213 }
214
215 pub fn with_strict_encryption(mut self) -> Self {
223 self.strict_encryption = true;
224 self
225 }
226
227 pub fn with_relay(mut self) -> Self {
232 self.enable_relay = true;
233 self
234 }
235
236 pub fn with_max_relay_hops(mut self, max_hops: u8) -> Self {
240 self.max_relay_hops = max_hops;
241 self
242 }
243
244 pub fn with_relay_fanout(mut self, fanout: usize) -> Self {
248 self.relay_fanout = fanout.max(1);
249 self
250 }
251
252 pub fn with_seen_cache_ttl(mut self, ttl_ms: u64) -> Self {
256 self.seen_cache_ttl_ms = ttl_ms;
257 self
258 }
259}
260
261#[cfg(feature = "std")]
263type AppDocumentStore =
264 std::sync::RwLock<HashMap<(u8, u32, u64), Box<dyn core::any::Any + Send + Sync>>>;
265
266#[cfg(feature = "std")]
271pub struct PeatMesh {
272 config: PeatMeshConfig,
274
275 peer_manager: PeerManager,
277
278 document_sync: DocumentSync,
280
281 observers: ObserverManager,
283
284 last_sync_ms: std::sync::atomic::AtomicU32,
286
287 last_cleanup_ms: std::sync::atomic::AtomicU32,
289
290 #[cfg(feature = "peat-lite-frame")]
297 peat_lite_seq: std::sync::atomic::AtomicU32,
298
299 encryption_key: Option<MeshEncryptionKey>,
301
302 peer_sessions: std::sync::Mutex<Option<PeerSessionManager>>,
304
305 connection_graph: std::sync::Mutex<ConnectionStateGraph>,
307
308 seen_cache: std::sync::Mutex<SeenMessageCache>,
310
311 gossip_strategy: Box<dyn GossipStrategy>,
313
314 delta_encoder: std::sync::Mutex<DeltaEncoder>,
319
320 identity: Option<DeviceIdentity>,
325
326 identity_registry: std::sync::Mutex<IdentityRegistry>,
330
331 peer_peripherals: std::sync::RwLock<HashMap<NodeId, Peripheral>>,
336
337 document_registry: DocumentRegistry,
342
343 app_documents: AppDocumentStore,
349
350 #[cfg(feature = "translator-codec")]
359 ble_translator: std::sync::RwLock<crate::translator::BleTranslator>,
360
361 #[cfg(all(feature = "translator-codec", feature = "uniffi"))]
379 decoded_document_json_callback:
380 std::sync::RwLock<Option<Arc<dyn crate::DecodedDocumentJsonCallback>>>,
381
382 polled_translator_consumer_attested: std::sync::atomic::AtomicBool,
398}
399
400#[cfg(feature = "std")]
401impl PeatMesh {
402 pub fn new(config: PeatMeshConfig) -> Self {
404 let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
405 let document_sync = DocumentSync::with_peripheral_type(
406 config.node_id,
407 &config.callsign,
408 config.peripheral_type,
409 );
410
411 let encryption_key = config
413 .encryption_secret
414 .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
415
416 let connection_graph = ConnectionStateGraph::with_config(
418 config.peer_config.rssi_degraded_threshold,
419 config.peer_config.lost_timeout_ms,
420 );
421
422 let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
424
425 let gossip_strategy: Box<dyn GossipStrategy> =
427 Box::new(RandomFanout::new(config.relay_fanout));
428
429 let delta_encoder = DeltaEncoder::new(config.node_id);
431
432 let document_registry = DocumentRegistry::new();
433
434 Self {
435 config,
436 peer_manager,
437 document_sync,
438 observers: ObserverManager::new(),
439 last_sync_ms: std::sync::atomic::AtomicU32::new(0),
440 last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
441 #[cfg(feature = "peat-lite-frame")]
442 peat_lite_seq: std::sync::atomic::AtomicU32::new(0),
443 encryption_key,
444 peer_sessions: std::sync::Mutex::new(None),
445 connection_graph: std::sync::Mutex::new(connection_graph),
446 seen_cache: std::sync::Mutex::new(seen_cache),
447 gossip_strategy,
448 delta_encoder: std::sync::Mutex::new(delta_encoder),
449 identity: None,
450 identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
451 peer_peripherals: std::sync::RwLock::new(HashMap::new()),
452 document_registry,
453 app_documents: std::sync::RwLock::new(HashMap::new()),
454 #[cfg(feature = "translator-codec")]
455 ble_translator: std::sync::RwLock::new(
456 crate::translator::BleTranslator::with_defaults(),
457 ),
458 #[cfg(all(feature = "translator-codec", feature = "uniffi"))]
459 decoded_document_json_callback: std::sync::RwLock::new(None),
460 polled_translator_consumer_attested: std::sync::atomic::AtomicBool::new(false),
461 }
462 }
463
464 pub fn with_identity(config: PeatMeshConfig, identity: DeviceIdentity) -> Self {
470 let mut config = config;
472 config.node_id = identity.node_id();
473
474 let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
475 let document_sync = DocumentSync::with_peripheral_type(
476 config.node_id,
477 &config.callsign,
478 config.peripheral_type,
479 );
480
481 let encryption_key = config
482 .encryption_secret
483 .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
484
485 let connection_graph = ConnectionStateGraph::with_config(
486 config.peer_config.rssi_degraded_threshold,
487 config.peer_config.lost_timeout_ms,
488 );
489
490 let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
491 let gossip_strategy: Box<dyn GossipStrategy> =
492 Box::new(RandomFanout::new(config.relay_fanout));
493 let delta_encoder = DeltaEncoder::new(config.node_id);
494
495 let document_registry = DocumentRegistry::new();
496
497 Self {
498 config,
499 peer_manager,
500 document_sync,
501 observers: ObserverManager::new(),
502 last_sync_ms: std::sync::atomic::AtomicU32::new(0),
503 last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
504 #[cfg(feature = "peat-lite-frame")]
505 peat_lite_seq: std::sync::atomic::AtomicU32::new(0),
506 encryption_key,
507 peer_sessions: std::sync::Mutex::new(None),
508 connection_graph: std::sync::Mutex::new(connection_graph),
509 seen_cache: std::sync::Mutex::new(seen_cache),
510 gossip_strategy,
511 delta_encoder: std::sync::Mutex::new(delta_encoder),
512 identity: Some(identity),
513 identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
514 peer_peripherals: std::sync::RwLock::new(HashMap::new()),
515 document_registry,
516 app_documents: std::sync::RwLock::new(HashMap::new()),
517 #[cfg(feature = "translator-codec")]
518 ble_translator: std::sync::RwLock::new(
519 crate::translator::BleTranslator::with_defaults(),
520 ),
521 #[cfg(all(feature = "translator-codec", feature = "uniffi"))]
522 decoded_document_json_callback: std::sync::RwLock::new(None),
523 polled_translator_consumer_attested: std::sync::atomic::AtomicBool::new(false),
524 }
525 }
526
527 pub fn from_genesis(
535 genesis: &crate::security::MeshGenesis,
536 identity: DeviceIdentity,
537 callsign: &str,
538 ) -> Self {
539 let config = PeatMeshConfig::new(identity.node_id(), callsign, &genesis.mesh_id())
540 .with_encryption(genesis.encryption_secret());
541
542 Self::with_identity(config, identity)
543 }
544
545 #[cfg(feature = "std")]
571 pub fn from_persisted(
572 state: crate::security::PersistedState,
573 callsign: &str,
574 ) -> Result<Self, crate::security::PersistenceError> {
575 let identity = state.restore_identity()?;
577
578 let genesis = state.restore_genesis();
580
581 let mesh = if let Some(ref gen) = genesis {
583 Self::from_genesis(gen, identity, callsign)
584 } else {
585 let config = PeatMeshConfig::new(identity.node_id(), callsign, "RESTORED");
586 Self::with_identity(config, identity)
587 };
588
589 let restored_registry = state.restore_registry();
591 if let Ok(mut registry) = mesh.identity_registry.lock() {
592 *registry = restored_registry;
593 }
594
595 log::info!(
596 "PeatMesh restored from persisted state: node_id={:08X}, known_peers={}",
597 mesh.config.node_id.as_u32(),
598 mesh.known_identity_count()
599 );
600
601 Ok(mesh)
602 }
603
604 #[cfg(feature = "std")]
617 pub fn to_persisted_state(
618 &self,
619 genesis: Option<&crate::security::MeshGenesis>,
620 ) -> Option<crate::security::PersistedState> {
621 let identity = self.identity.as_ref()?;
622 let registry = self.identity_registry.lock().ok()?;
623
624 Some(crate::security::PersistedState::with_registry(
625 identity, genesis, ®istry,
626 ))
627 }
628
629 #[cfg(all(feature = "translator-codec", feature = "uniffi"))]
651 pub fn set_decoded_document_json_callback(
652 &self,
653 cb: Box<dyn crate::DecodedDocumentJsonCallback>,
654 ) {
655 let cb_arc: Arc<dyn crate::DecodedDocumentJsonCallback> = Arc::from(cb);
660 if let Ok(mut slot) = self.decoded_document_json_callback.write() {
661 *slot = Some(cb_arc);
662 }
663 }
664
665 pub fn acknowledge_polled_translator_consumer(&self) {
681 self.polled_translator_consumer_attested
682 .store(true, std::sync::atomic::Ordering::Release);
683 }
684
685 #[cfg(feature = "translator-codec")]
689 pub fn set_translator_config(&self, config: crate::translator::TranslationConfig) {
690 if let Ok(mut t) = self.ble_translator.write() {
691 *t = crate::translator::BleTranslator::new(config);
692 }
693 }
694
695 #[cfg(feature = "translator-codec")]
708 fn try_handle_translator_marker(
709 &self,
710 decrypted: &[u8],
711 peer: Option<&str>,
712 source_node: Option<NodeId>,
713 ) -> TranslatorMarkerOutcome {
714 if decrypted.is_empty() {
715 return TranslatorMarkerOutcome::NotTranslatorMarker;
716 }
717 let marker = decrypted[0];
718
719 if (TRANSLATOR_RESERVED_MARKER_START..=TRANSLATOR_RESERVED_MARKER_END).contains(&marker) {
725 log::warn!(
726 "ble: dropping reserved translator-marker frame (marker=0x{marker:02X}, len={})",
727 decrypted.len()
728 );
729 return TranslatorMarkerOutcome::Handled;
730 }
731
732 if marker != TRANSLATOR_FRAME_MARKER {
733 return TranslatorMarkerOutcome::NotTranslatorMarker;
734 }
735
736 if decrypted.len() < 2 {
738 log::warn!(
739 "ble: dropping truncated translator frame (len={}, missing collection code)",
740 decrypted.len()
741 );
742 return TranslatorMarkerOutcome::Handled;
743 }
744
745 let code = decrypted[1];
746 let payload = &decrypted[2..];
747
748 let (collection, decode_result) = {
751 let translator = match self.ble_translator.read() {
752 Ok(g) => g,
753 Err(_) => {
754 log::warn!("ble: translator RwLock poisoned; dropping frame");
755 return TranslatorMarkerOutcome::Handled;
756 }
757 };
758 let collection = match translator.code_to_collection(code) {
759 Some(c) => c.to_string(),
760 None => {
761 log::warn!(
762 "ble: dropping translator frame with unknown collection code 0x{code:02X}"
763 );
764 return TranslatorMarkerOutcome::Handled;
765 }
766 };
767
768 let ctx = crate::translator::DecodeInboundCtx {
769 collection: &collection,
770 callsign: None,
771 cell_id: None,
772 peripheral_id: source_node.map(|n| n.as_u32()),
773 };
774
775 let result = translator.decode_inbound_sync(payload, &ctx);
776 (collection, result)
777 };
778
779 match decode_result {
780 Ok(Some(value)) => {
781 #[cfg(feature = "uniffi")]
791 let json_cb = self
792 .decoded_document_json_callback
793 .read()
794 .ok()
795 .and_then(|g| g.as_ref().cloned());
796 #[cfg(not(feature = "uniffi"))]
797 let json_cb: Option<()> = None;
798
799 let polled_attested = self
800 .polled_translator_consumer_attested
801 .load(std::sync::atomic::Ordering::Acquire);
802
803 let any_consumer = json_cb.is_some() || polled_attested;
804
805 let doc_json_result = serde_json::to_string(&value);
811
812 #[cfg(feature = "uniffi")]
813 if let (Some(json_cb), Ok(ref doc_json)) = (json_cb, &doc_json_result) {
814 json_cb.on_document(
815 collection.clone(),
816 doc_json.clone(),
817 peer.map(str::to_string),
818 );
819 }
820
821 if let Err(ref e) = doc_json_result {
822 log::warn!(
823 "ble: failed to serialize decoded {} value to JSON: {}",
824 collection,
825 e
826 );
827 }
828
829 if !any_consumer {
830 log::debug!(
840 "ble: decoded {} frame but no consumer registered (no callback, no polled attestation)",
841 collection
842 );
843 self.notify(crate::observer::PeatEvent::TranslatorNoCallback {
844 collection: collection.clone(),
845 peer: peer.map(str::to_string),
846 });
847 }
848
849 if let Ok(doc_json) = doc_json_result {
855 return TranslatorMarkerOutcome::Decoded(DecodedTranslatorFrame {
856 collection,
857 doc_json,
858 peer: peer.map(str::to_string),
859 });
860 }
861 }
862 Ok(None) => {
863 log::debug!(
864 "ble: codec declined translator frame for collection {}",
865 collection
866 );
867 }
868 Err(e) => {
869 log::warn!(
870 "ble: translator frame decode error (collection={}): {:#}",
871 collection,
872 e
873 );
874 }
875 }
876
877 TranslatorMarkerOutcome::Handled
878 }
879
880 pub fn is_encryption_enabled(&self) -> bool {
884 self.encryption_key.is_some()
885 }
886
887 pub fn is_strict_encryption_enabled(&self) -> bool {
891 self.config.strict_encryption && self.encryption_key.is_some()
892 }
893
894 pub fn enable_encryption(&mut self, secret: &[u8; 32]) {
899 self.encryption_key = Some(MeshEncryptionKey::from_shared_secret(
900 &self.config.mesh_id,
901 secret,
902 ));
903 }
904
905 pub fn disable_encryption(&mut self) {
907 self.encryption_key = None;
908 }
909
910 fn encrypt_document(&self, plaintext: &[u8]) -> Vec<u8> {
915 match &self.encryption_key {
916 Some(key) => {
917 match key.encrypt_to_bytes(plaintext) {
919 Ok(ciphertext) => {
920 let mut buf = Vec::with_capacity(2 + ciphertext.len());
921 buf.push(ENCRYPTED_MARKER);
922 buf.push(0x00); buf.extend_from_slice(&ciphertext);
924 buf
925 }
926 Err(e) => {
927 log::error!("Encryption failed: {}", e);
928 plaintext.to_vec()
930 }
931 }
932 }
933 None => plaintext.to_vec(),
934 }
935 }
936
937 fn decrypt_document<'a>(
945 &self,
946 data: &'a [u8],
947 source_hint: Option<&str>,
948 ) -> Option<std::borrow::Cow<'a, [u8]>> {
949 log::debug!(
950 "decrypt_document: len={}, first_byte=0x{:02X}, source={:?}",
951 data.len(),
952 data.first().copied().unwrap_or(0),
953 source_hint
954 );
955
956 if data.len() >= 2 && data[0] == ENCRYPTED_MARKER {
958 let _reserved = data[1];
960 let encrypted_payload = &data[2..];
961
962 log::debug!(
963 "decrypt_document: encrypted payload len={}, nonce+ciphertext",
964 encrypted_payload.len()
965 );
966
967 match &self.encryption_key {
968 Some(key) => match key.decrypt_from_bytes(encrypted_payload) {
969 Ok(plaintext) => {
970 log::debug!(
971 "decrypt_document: SUCCESS, plaintext len={}",
972 plaintext.len()
973 );
974 Some(std::borrow::Cow::Owned(plaintext))
975 }
976 Err(e) => {
977 log::warn!(
978 "decrypt_document: FAILED (wrong key or corrupted): {} [payload_len={}, source={:?}]",
979 e,
980 encrypted_payload.len(),
981 source_hint
982 );
983 self.notify(PeatEvent::SecurityViolation {
984 kind: SecurityViolationKind::DecryptionFailed,
985 source: source_hint.map(String::from),
986 });
987 None
988 }
989 },
990 None => {
991 log::warn!(
992 "decrypt_document: encryption not enabled but received encrypted doc"
993 );
994 None
995 }
996 }
997 } else {
998 if self.config.strict_encryption && self.encryption_key.is_some() {
1001 log::warn!(
1002 "Rejected unencrypted document in strict encryption mode (source: {:?})",
1003 source_hint
1004 );
1005 self.notify(PeatEvent::SecurityViolation {
1006 kind: SecurityViolationKind::UnencryptedInStrictMode,
1007 source: source_hint.map(String::from),
1008 });
1009 None
1010 } else {
1011 Some(std::borrow::Cow::Borrowed(data))
1013 }
1014 }
1015 }
1016
1017 pub fn decrypt_only(&self, data: &[u8]) -> Option<Vec<u8>> {
1031 self.decrypt_document(data, None)
1032 .map(|cow| cow.into_owned())
1033 }
1034
1035 pub fn has_identity(&self) -> bool {
1039 self.identity.is_some()
1040 }
1041
1042 pub fn public_key(&self) -> Option<[u8; 32]> {
1044 self.identity.as_ref().map(|id| id.public_key())
1045 }
1046
1047 pub fn create_attestation(&self, now_ms: u64) -> Option<IdentityAttestation> {
1051 self.identity
1052 .as_ref()
1053 .map(|id| id.create_attestation(now_ms))
1054 }
1055
1056 pub fn verify_peer_identity(&self, attestation: &IdentityAttestation) -> RegistryResult {
1065 self.identity_registry
1066 .lock()
1067 .unwrap()
1068 .verify_or_register(attestation)
1069 }
1070
1071 pub fn is_peer_identity_known(&self, node_id: NodeId) -> bool {
1073 self.identity_registry.lock().unwrap().is_known(node_id)
1074 }
1075
1076 pub fn peer_public_key(&self, node_id: NodeId) -> Option<[u8; 32]> {
1078 self.identity_registry
1079 .lock()
1080 .unwrap()
1081 .get_public_key(node_id)
1082 .copied()
1083 }
1084
1085 pub fn known_identity_count(&self) -> usize {
1087 self.identity_registry.lock().unwrap().len()
1088 }
1089
1090 pub fn pre_register_peer_identity(&self, node_id: NodeId, public_key: [u8; 32], now_ms: u64) {
1095 self.identity_registry
1096 .lock()
1097 .unwrap()
1098 .pre_register(node_id, public_key, now_ms);
1099 }
1100
1101 pub fn forget_peer_identity(&self, node_id: NodeId) {
1105 self.identity_registry.lock().unwrap().remove(node_id);
1106 }
1107
1108 pub fn sign(&self, data: &[u8]) -> Option<[u8; 64]> {
1112 self.identity.as_ref().map(|id| id.sign(data))
1113 }
1114
1115 pub fn verify_peer_signature(
1120 &self,
1121 node_id: NodeId,
1122 data: &[u8],
1123 signature: &[u8; 64],
1124 ) -> bool {
1125 if let Some(public_key) = self.peer_public_key(node_id) {
1126 crate::security::verify_signature(&public_key, data, signature)
1127 } else {
1128 false
1129 }
1130 }
1131
1132 pub fn is_relay_enabled(&self) -> bool {
1136 self.config.enable_relay
1137 }
1138
1139 pub fn enable_relay(&mut self) {
1141 self.config.enable_relay = true;
1142 }
1143
1144 pub fn disable_relay(&mut self) {
1146 self.config.enable_relay = false;
1147 }
1148
1149 pub fn has_seen_message(&self, message_id: &MessageId) -> bool {
1153 self.seen_cache.lock().unwrap().has_seen(message_id)
1154 }
1155
1156 pub fn mark_message_seen(&self, message_id: MessageId, origin: NodeId, now_ms: u64) -> bool {
1160 self.seen_cache
1161 .lock()
1162 .unwrap()
1163 .check_and_mark(message_id, origin, now_ms)
1164 }
1165
1166 pub fn seen_cache_size(&self) -> usize {
1168 self.seen_cache.lock().unwrap().len()
1169 }
1170
1171 pub fn clear_seen_cache(&self) {
1173 self.seen_cache.lock().unwrap().clear();
1174 }
1175
1176 pub fn wrap_for_relay(&self, payload: Vec<u8>) -> Vec<u8> {
1181 let envelope = RelayEnvelope::broadcast(self.config.node_id, payload)
1182 .with_max_hops(self.config.max_relay_hops);
1183 envelope.encode()
1184 }
1185
1186 pub fn get_relay_targets(&self, exclude_peer: Option<NodeId>) -> Vec<PeatPeer> {
1191 let connected = self.peer_manager.get_connected_peers();
1192 let filtered: Vec<_> = if let Some(exclude) = exclude_peer {
1193 connected
1194 .into_iter()
1195 .filter(|p| p.node_id != exclude)
1196 .collect()
1197 } else {
1198 connected
1199 };
1200
1201 self.gossip_strategy
1202 .select_peers(&filtered)
1203 .into_iter()
1204 .cloned()
1205 .collect()
1206 }
1207
1208 pub fn process_relay_envelope(
1218 &self,
1219 data: &[u8],
1220 source_peer: NodeId,
1221 now_ms: u64,
1222 ) -> Option<RelayDecision> {
1223 let envelope = RelayEnvelope::decode(data)?;
1225
1226 if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
1229 let is_new = self.connection_graph.lock().unwrap().on_relay_received(
1230 source_peer,
1231 envelope.origin_node,
1232 envelope.hop_count,
1233 now_ms,
1234 );
1235
1236 if is_new {
1237 log::debug!(
1238 "Discovered indirect peer {:08X} via {:08X} ({} hops)",
1239 envelope.origin_node.as_u32(),
1240 source_peer.as_u32(),
1241 envelope.hop_count
1242 );
1243 }
1244 }
1245
1246 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
1248 let stats = self
1250 .seen_cache
1251 .lock()
1252 .unwrap()
1253 .get_stats(&envelope.message_id);
1254 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
1255
1256 self.notify(PeatEvent::DuplicateMessageDropped {
1257 origin_node: envelope.origin_node,
1258 seen_count,
1259 });
1260
1261 log::debug!(
1262 "Dropped duplicate message {} from {:08X} (seen {} times)",
1263 envelope.message_id,
1264 envelope.origin_node.as_u32(),
1265 seen_count
1266 );
1267 return None;
1268 }
1269
1270 if !envelope.can_relay() {
1272 self.notify(PeatEvent::MessageTtlExpired {
1273 origin_node: envelope.origin_node,
1274 hop_count: envelope.hop_count,
1275 });
1276
1277 log::debug!(
1278 "Message {} from {:08X} TTL expired at hop {}",
1279 envelope.message_id,
1280 envelope.origin_node.as_u32(),
1281 envelope.hop_count
1282 );
1283
1284 return Some(RelayDecision {
1286 payload: envelope.payload,
1287 origin_node: envelope.origin_node,
1288 hop_count: envelope.hop_count,
1289 should_relay: false,
1290 relay_envelope: None,
1291 });
1292 }
1293
1294 let should_relay = self.config.enable_relay;
1296 let relay_envelope = if should_relay {
1297 envelope.relay() } else {
1299 None
1300 };
1301
1302 Some(RelayDecision {
1303 payload: envelope.payload,
1304 origin_node: envelope.origin_node,
1305 hop_count: envelope.hop_count,
1306 should_relay,
1307 relay_envelope,
1308 })
1309 }
1310
1311 pub fn build_relay_document(&self) -> Vec<u8> {
1316 let doc = self.build_document(); self.wrap_for_relay(doc)
1318 }
1319
1320 pub fn register_peer_for_delta(&self, peer_id: &NodeId) {
1327 let mut encoder = self.delta_encoder.lock().unwrap();
1328 encoder.add_peer(peer_id);
1329 log::debug!(
1330 "Registered peer {:08X} for delta sync tracking",
1331 peer_id.as_u32()
1332 );
1333 }
1334
1335 pub fn unregister_peer_for_delta(&self, peer_id: &NodeId) {
1339 let mut encoder = self.delta_encoder.lock().unwrap();
1340 encoder.remove_peer(peer_id);
1341 log::debug!(
1342 "Unregistered peer {:08X} from delta sync tracking",
1343 peer_id.as_u32()
1344 );
1345 }
1346
1347 pub fn reset_peer_delta_state(&self, peer_id: &NodeId) {
1352 let mut encoder = self.delta_encoder.lock().unwrap();
1353 encoder.reset_peer(peer_id);
1354 log::debug!("Reset delta sync state for peer {:08X}", peer_id.as_u32());
1355 }
1356
1357 pub fn record_delta_sent(&self, peer_id: &NodeId, bytes: usize) {
1359 let mut encoder = self.delta_encoder.lock().unwrap();
1360 encoder.record_sent(peer_id, bytes);
1361 }
1362
1363 pub fn record_delta_received(&self, peer_id: &NodeId, bytes: usize, timestamp: u64) {
1365 let mut encoder = self.delta_encoder.lock().unwrap();
1366 encoder.record_received(peer_id, bytes, timestamp);
1367 }
1368
1369 pub fn delta_stats(&self) -> DeltaStats {
1374 self.delta_encoder.lock().unwrap().stats()
1375 }
1376
1377 pub fn peer_delta_stats(&self, peer_id: &NodeId) -> Option<(u64, u64, u32)> {
1381 let encoder = self.delta_encoder.lock().unwrap();
1382 encoder
1383 .get_peer_state(peer_id)
1384 .map(|state| (state.bytes_sent, state.bytes_received, state.sync_count))
1385 }
1386
1387 pub fn build_delta_document_for_peer(&self, peer_id: &NodeId, now_ms: u64) -> Option<Vec<u8>> {
1395 let mut all_operations: Vec<Operation> = Vec::new();
1397
1398 for (node_id_u32, count) in self.document_sync.counter_entries() {
1401 all_operations.push(Operation::IncrementCounter {
1402 counter_id: 0, node_id: NodeId::new(node_id_u32),
1404 amount: count,
1405 timestamp: count, });
1407 }
1408
1409 let peripheral = self.document_sync.peripheral_snapshot();
1412 let peripheral_timestamp = peripheral
1413 .last_event
1414 .as_ref()
1415 .map(|e| e.timestamp)
1416 .unwrap_or(1); all_operations.push(Operation::UpdatePeripheral {
1418 peripheral,
1419 timestamp: peripheral_timestamp,
1420 });
1421
1422 if let Some(emergency) = self.document_sync.emergency_snapshot() {
1424 let source_node = NodeId::new(emergency.source_node());
1425 let timestamp = emergency.timestamp();
1426
1427 all_operations.push(Operation::SetEmergency {
1429 source_node,
1430 timestamp,
1431 known_peers: emergency.all_nodes(),
1432 });
1433
1434 for acked_node in emergency.acked_nodes() {
1436 all_operations.push(Operation::AckEmergency {
1437 node_id: NodeId::new(acked_node),
1438 emergency_timestamp: timestamp,
1439 });
1440 }
1441 }
1442
1443 for app_op in self.app_document_delta_ops() {
1445 all_operations.push(Operation::App(app_op));
1446 }
1447
1448 let filtered_operations: Vec<Operation> = {
1450 let encoder = self.delta_encoder.lock().unwrap();
1451 if let Some(peer_state) = encoder.get_peer_state(peer_id) {
1452 all_operations
1453 .into_iter()
1454 .filter(|op| peer_state.needs_send(&op.key(), op.timestamp()))
1455 .collect()
1456 } else {
1457 all_operations
1459 }
1460 };
1461
1462 if filtered_operations.is_empty() {
1464 return None;
1465 }
1466
1467 {
1469 let mut encoder = self.delta_encoder.lock().unwrap();
1470 if let Some(peer_state) = encoder.get_peer_state_mut(peer_id) {
1471 for op in &filtered_operations {
1472 peer_state.mark_sent(&op.key(), op.timestamp());
1473 }
1474 }
1475 }
1476
1477 let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1479 for op in filtered_operations {
1480 delta.add_operation(op);
1481 }
1482
1483 let encoded = delta.encode();
1485 let result = self.encrypt_document(&encoded);
1486
1487 {
1489 let mut encoder = self.delta_encoder.lock().unwrap();
1490 encoder.record_sent(peer_id, result.len());
1491 }
1492
1493 Some(result)
1494 }
1495
1496 pub fn build_full_delta_document(&self, now_ms: u64) -> Vec<u8> {
1501 let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1502
1503 for (node_id_u32, count) in self.document_sync.counter_entries() {
1505 delta.add_operation(Operation::IncrementCounter {
1506 counter_id: 0,
1507 node_id: NodeId::new(node_id_u32),
1508 amount: count,
1509 timestamp: now_ms,
1510 });
1511 }
1512
1513 let peripheral = self.document_sync.peripheral_snapshot();
1515 let peripheral_timestamp = peripheral
1516 .last_event
1517 .as_ref()
1518 .map(|e| e.timestamp)
1519 .unwrap_or(now_ms);
1520 delta.add_operation(Operation::UpdatePeripheral {
1521 peripheral,
1522 timestamp: peripheral_timestamp,
1523 });
1524
1525 if let Some(emergency) = self.document_sync.emergency_snapshot() {
1527 let source_node = NodeId::new(emergency.source_node());
1528 let timestamp = emergency.timestamp();
1529
1530 delta.add_operation(Operation::SetEmergency {
1531 source_node,
1532 timestamp,
1533 known_peers: emergency.all_nodes(),
1534 });
1535
1536 for acked_node in emergency.acked_nodes() {
1537 delta.add_operation(Operation::AckEmergency {
1538 node_id: NodeId::new(acked_node),
1539 emergency_timestamp: timestamp,
1540 });
1541 }
1542 }
1543
1544 for app_op in self.app_document_delta_ops() {
1546 delta.add_operation(Operation::App(app_op));
1547 }
1548
1549 let encoded = delta.encode();
1550 self.encrypt_document(&encoded)
1551 }
1552
1553 fn process_delta_document_internal(
1557 &self,
1558 source_node: NodeId,
1559 data: &[u8],
1560 now_ms: u64,
1561 relay_data: Option<Vec<u8>>,
1562 origin_node: Option<NodeId>,
1563 hop_count: u8,
1564 ) -> Option<DataReceivedResult> {
1565 let delta = DeltaDocument::decode(data)?;
1567
1568 if delta.origin_node == self.config.node_id {
1570 return None;
1571 }
1572
1573 let mut counter_changed = false;
1575 let mut emergency_changed = false;
1576 let mut is_emergency = false;
1577 let mut is_ack = false;
1578 let mut event_timestamp = 0u64;
1579 let mut peer_peripheral: Option<crate::sync::crdt::Peripheral> = None;
1580
1581 log::info!(
1582 "Delta document from {:08X}: {} operations, data_len={}",
1583 delta.origin_node.as_u32(),
1584 delta.operations.len(),
1585 data.len()
1586 );
1587 for op in &delta.operations {
1588 log::info!(" Operation: {}", op.key());
1589 match op {
1590 Operation::IncrementCounter {
1591 node_id, amount, ..
1592 } => {
1593 let current = self.document_sync.counter_entries();
1595 let current_value = current
1596 .iter()
1597 .find(|(id, _)| *id == node_id.as_u32())
1598 .map(|(_, v)| *v)
1599 .unwrap_or(0);
1600
1601 if *amount > current_value {
1602 counter_changed = true;
1605 }
1606 }
1607 Operation::UpdatePeripheral {
1608 peripheral,
1609 timestamp,
1610 } => {
1611 if let Ok(mut peripherals) = self.peer_peripherals.write() {
1613 peripherals.insert(delta.origin_node, peripheral.clone());
1614 }
1615 peer_peripheral = Some(peripheral.clone());
1617 if *timestamp > event_timestamp {
1619 event_timestamp = *timestamp;
1620 }
1621 }
1622 Operation::SetEmergency { timestamp, .. } => {
1623 is_emergency = true;
1624 emergency_changed = true;
1625 event_timestamp = *timestamp;
1626 }
1627 Operation::AckEmergency {
1628 emergency_timestamp,
1629 ..
1630 } => {
1631 is_ack = true;
1632 emergency_changed = true;
1633 if *emergency_timestamp > event_timestamp {
1634 event_timestamp = *emergency_timestamp;
1635 }
1636 }
1637 Operation::ClearEmergency {
1638 emergency_timestamp,
1639 } => {
1640 emergency_changed = true;
1641 if *emergency_timestamp > event_timestamp {
1642 event_timestamp = *emergency_timestamp;
1643 }
1644 }
1645 Operation::App(app_op) => {
1646 let doc_timestamp = app_op.timestamp & 0x0000_FFFF_FFFF_FFFF;
1652
1653 log::info!(
1654 "App operation received: type={:02X} op_code={:02X} from {:08X} ts={} payload_len={}",
1655 app_op.type_id,
1656 app_op.op_code,
1657 app_op.source_node,
1658 doc_timestamp,
1659 app_op.payload.len()
1660 );
1661
1662 let doc_key = (app_op.type_id, app_op.source_node, doc_timestamp);
1664 let changed = {
1665 let mut docs = self.app_documents.write().unwrap();
1666
1667 if let Some(existing) = docs.get_mut(&doc_key) {
1668 self.document_registry.apply_delta_op(
1670 app_op.type_id,
1671 existing.as_mut(),
1672 app_op,
1673 )
1674 } else {
1675 if let Some(decoded) = self
1677 .document_registry
1678 .decode(app_op.type_id, &app_op.payload)
1679 {
1680 docs.insert(doc_key, decoded);
1681 true
1682 } else {
1683 log::debug!(
1686 "Received delta for unknown doc {:?}, waiting for full state",
1687 doc_key
1688 );
1689 false
1690 }
1691 }
1692 };
1693
1694 self.observers.notify(PeatEvent::app_document_received(
1696 app_op.type_id,
1697 NodeId::new(app_op.source_node),
1698 doc_timestamp,
1699 changed,
1700 ));
1701 }
1702 }
1703 }
1704
1705 self.peer_manager.record_sync(source_node, now_ms);
1707
1708 {
1710 let mut encoder = self.delta_encoder.lock().unwrap();
1711 encoder.record_received(&source_node, data.len(), now_ms);
1712 }
1713
1714 if is_emergency {
1716 self.notify(PeatEvent::EmergencyReceived {
1717 from_node: delta.origin_node,
1718 });
1719 } else if is_ack {
1720 self.notify(PeatEvent::AckReceived {
1721 from_node: delta.origin_node,
1722 });
1723 }
1724
1725 if counter_changed {
1726 let total_count = self.document_sync.total_count();
1727 self.notify(PeatEvent::DocumentSynced {
1728 from_node: delta.origin_node,
1729 total_count,
1730 });
1731 }
1732
1733 if relay_data.is_some() {
1735 let relay_targets = self.get_relay_targets(Some(source_node));
1736 self.notify(PeatEvent::MessageRelayed {
1737 origin_node: origin_node.unwrap_or(delta.origin_node),
1738 relay_count: relay_targets.len(),
1739 hop_count,
1740 });
1741 }
1742
1743 let PeripheralFields {
1744 callsign,
1745 battery_percent,
1746 heart_rate,
1747 event_type,
1748 latitude,
1749 longitude,
1750 altitude,
1751 activity_level,
1752 alerts,
1753 } = DataReceivedResult::peripheral_fields(&peer_peripheral);
1754
1755 Some(DataReceivedResult {
1756 source_node: delta.origin_node,
1757 is_emergency,
1758 is_ack,
1759 counter_changed,
1760 emergency_changed,
1761 total_count: self.document_sync.total_count(),
1762 event_timestamp,
1763 relay_data,
1764 origin_node,
1765 hop_count,
1766 callsign,
1767 battery_percent,
1768 heart_rate,
1769 event_type,
1770 latitude,
1771 longitude,
1772 altitude,
1773 activity_level,
1774 alerts,
1775 ..Default::default()
1776 })
1777 }
1778
1779 pub fn enable_peer_e2ee(&self) {
1787 let mut sessions = self.peer_sessions.lock().unwrap();
1788 if sessions.is_none() {
1789 *sessions = Some(PeerSessionManager::new(self.config.node_id));
1790 log::info!(
1791 "Per-peer E2EE enabled for node {:08X}",
1792 self.config.node_id.as_u32()
1793 );
1794 }
1795 }
1796
1797 pub fn disable_peer_e2ee(&self) {
1801 let mut sessions = self.peer_sessions.lock().unwrap();
1802 *sessions = None;
1803 log::info!("Per-peer E2EE disabled");
1804 }
1805
1806 pub fn is_peer_e2ee_enabled(&self) -> bool {
1808 self.peer_sessions.lock().unwrap().is_some()
1809 }
1810
1811 pub fn peer_e2ee_public_key(&self) -> Option<[u8; 32]> {
1815 self.peer_sessions
1816 .lock()
1817 .unwrap()
1818 .as_ref()
1819 .map(|s| s.our_public_key())
1820 }
1821
1822 pub fn initiate_peer_e2ee(&self, peer_node_id: NodeId, now_ms: u64) -> Option<Vec<u8>> {
1828 let mut sessions = self.peer_sessions.lock().unwrap();
1829 let session_mgr = sessions.as_mut()?;
1830
1831 let key_exchange = session_mgr.initiate_session(peer_node_id, now_ms);
1832 let mut buf = Vec::with_capacity(2 + 37);
1833 buf.push(KEY_EXCHANGE_MARKER);
1834 buf.push(0x00); buf.extend_from_slice(&key_exchange.encode());
1836
1837 log::info!(
1838 "Initiated E2EE session with peer {:08X}",
1839 peer_node_id.as_u32()
1840 );
1841 Some(buf)
1842 }
1843
1844 pub fn has_peer_e2ee_session(&self, peer_node_id: NodeId) -> bool {
1846 self.peer_sessions
1847 .lock()
1848 .unwrap()
1849 .as_ref()
1850 .is_some_and(|s| s.has_session(peer_node_id))
1851 }
1852
1853 pub fn peer_e2ee_session_state(&self, peer_node_id: NodeId) -> Option<SessionState> {
1855 self.peer_sessions
1856 .lock()
1857 .unwrap()
1858 .as_ref()
1859 .and_then(|s| s.session_state(peer_node_id))
1860 }
1861
1862 pub fn send_peer_e2ee(
1867 &self,
1868 peer_node_id: NodeId,
1869 plaintext: &[u8],
1870 now_ms: u64,
1871 ) -> Option<Vec<u8>> {
1872 let mut sessions = self.peer_sessions.lock().unwrap();
1873 let session_mgr = sessions.as_mut()?;
1874
1875 match session_mgr.encrypt_for_peer(peer_node_id, plaintext, now_ms) {
1876 Ok(encrypted) => {
1877 let mut buf = Vec::with_capacity(2 + encrypted.encode().len());
1878 buf.push(PEER_E2EE_MARKER);
1879 buf.push(0x00); buf.extend_from_slice(&encrypted.encode());
1881 Some(buf)
1882 }
1883 Err(e) => {
1884 log::warn!(
1885 "Failed to encrypt for peer {:08X}: {:?}",
1886 peer_node_id.as_u32(),
1887 e
1888 );
1889 None
1890 }
1891 }
1892 }
1893
1894 pub fn close_peer_e2ee(&self, peer_node_id: NodeId) {
1896 let mut sessions = self.peer_sessions.lock().unwrap();
1897 if let Some(session_mgr) = sessions.as_mut() {
1898 session_mgr.close_session(peer_node_id);
1899 self.notify(PeatEvent::PeerE2eeClosed { peer_node_id });
1900 log::info!(
1901 "Closed E2EE session with peer {:08X}",
1902 peer_node_id.as_u32()
1903 );
1904 }
1905 }
1906
1907 pub fn peer_e2ee_session_count(&self) -> usize {
1909 self.peer_sessions
1910 .lock()
1911 .unwrap()
1912 .as_ref()
1913 .map(|s| s.session_count())
1914 .unwrap_or(0)
1915 }
1916
1917 pub fn peer_e2ee_established_count(&self) -> usize {
1919 self.peer_sessions
1920 .lock()
1921 .unwrap()
1922 .as_ref()
1923 .map(|s| s.established_count())
1924 .unwrap_or(0)
1925 }
1926
1927 fn handle_key_exchange(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1932 if data.len() < 2 || data[0] != KEY_EXCHANGE_MARKER {
1933 return None;
1934 }
1935
1936 let payload = &data[2..];
1937 let msg = KeyExchangeMessage::decode(payload)?;
1938
1939 let mut sessions = self.peer_sessions.lock().unwrap();
1940 let session_mgr = sessions.as_mut()?;
1941
1942 let (response, established) = session_mgr.handle_key_exchange(&msg, now_ms)?;
1943
1944 if established {
1945 self.notify(PeatEvent::PeerE2eeEstablished {
1946 peer_node_id: msg.sender_node_id,
1947 });
1948 log::info!(
1949 "E2EE session established with peer {:08X}",
1950 msg.sender_node_id.as_u32()
1951 );
1952 }
1953
1954 let mut buf = Vec::with_capacity(2 + 37);
1956 buf.push(KEY_EXCHANGE_MARKER);
1957 buf.push(0x00);
1958 buf.extend_from_slice(&response.encode());
1959 Some(buf)
1960 }
1961
1962 fn handle_peer_e2ee_message(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1967 if data.len() < 2 || data[0] != PEER_E2EE_MARKER {
1968 return None;
1969 }
1970
1971 let payload = &data[2..];
1972 let msg = PeerEncryptedMessage::decode(payload)?;
1973
1974 let mut sessions = self.peer_sessions.lock().unwrap();
1975 let session_mgr = sessions.as_mut()?;
1976
1977 match session_mgr.decrypt_from_peer(&msg, now_ms) {
1978 Ok(plaintext) => {
1979 self.notify(PeatEvent::PeerE2eeMessageReceived {
1981 from_node: msg.sender_node_id,
1982 data: plaintext.clone(),
1983 });
1984 Some(plaintext)
1985 }
1986 Err(e) => {
1987 log::warn!(
1988 "Failed to decrypt E2EE message from {:08X}: {:?}",
1989 msg.sender_node_id.as_u32(),
1990 e
1991 );
1992 None
1993 }
1994 }
1995 }
1996
1997 pub fn node_id(&self) -> NodeId {
2001 self.config.node_id
2002 }
2003
2004 pub fn callsign(&self) -> &str {
2006 &self.config.callsign
2007 }
2008
2009 pub fn mesh_id(&self) -> &str {
2011 &self.config.mesh_id
2012 }
2013
2014 pub fn device_name(&self) -> String {
2016 format!(
2017 "PEAT_{}-{:08X}",
2018 self.config.mesh_id,
2019 self.config.node_id.as_u32()
2020 )
2021 }
2022
2023 pub fn get_peer_callsign(&self, node_id: NodeId) -> Option<String> {
2028 self.peer_peripherals.read().ok().and_then(|peripherals| {
2029 peripherals
2030 .get(&node_id)
2031 .map(|p| p.callsign_str().to_string())
2032 })
2033 }
2034
2035 pub fn get_peer_peripheral(&self, node_id: NodeId) -> Option<Peripheral> {
2040 self.peer_peripherals
2041 .read()
2042 .ok()
2043 .and_then(|peripherals| peripherals.get(&node_id).cloned())
2044 }
2045
2046 pub fn document_registry(&self) -> &DocumentRegistry {
2061 &self.document_registry
2062 }
2063
2064 pub fn store_app_document<T: crate::registry::DocumentType>(&self, doc: T) -> bool {
2071 let type_id = T::TYPE_ID;
2072 let (source_node, timestamp) = doc.identity();
2073 let key = (type_id, source_node, timestamp);
2074
2075 let mut docs = self.app_documents.write().unwrap();
2076
2077 if let Some(existing) = docs.get_mut(&key) {
2078 self.document_registry
2080 .merge(type_id, existing.as_mut(), &doc)
2081 } else {
2082 docs.insert(key, Box::new(doc));
2084 true
2085 }
2086 }
2087
2088 pub fn store_app_document_boxed(
2095 &self,
2096 type_id: u8,
2097 source_node: u32,
2098 timestamp: u64,
2099 doc: Box<dyn core::any::Any + Send + Sync>,
2100 ) -> bool {
2101 let key = (type_id, source_node, timestamp);
2102
2103 let mut docs = self.app_documents.write().unwrap();
2104
2105 if let Some(existing) = docs.get_mut(&key) {
2106 self.document_registry
2108 .merge(type_id, existing.as_mut(), doc.as_ref())
2109 } else {
2110 docs.insert(key, doc);
2112 true
2113 }
2114 }
2115
2116 pub fn get_app_document<T: crate::registry::DocumentType>(
2120 &self,
2121 source_node: u32,
2122 timestamp: u64,
2123 ) -> Option<T> {
2124 let key = (T::TYPE_ID, source_node, timestamp);
2125
2126 let docs = self.app_documents.read().unwrap();
2127 docs.get(&key).and_then(|d| d.downcast_ref::<T>()).cloned()
2128 }
2129
2130 pub fn get_all_app_documents_of_type<T: crate::registry::DocumentType>(&self) -> Vec<T> {
2134 let docs = self.app_documents.read().unwrap();
2135 docs.iter()
2136 .filter(|((type_id, _, _), _)| *type_id == T::TYPE_ID)
2137 .filter_map(|(_, doc)| doc.downcast_ref::<T>().cloned())
2138 .collect()
2139 }
2140
2141 pub fn app_document_delta_ops(&self) -> Vec<crate::registry::AppOperation> {
2145 let docs = self.app_documents.read().unwrap();
2146 let mut ops = Vec::new();
2147
2148 for ((type_id, _source, _ts), doc) in docs.iter() {
2149 if let Some(op) = self.document_registry.to_delta_op(*type_id, doc.as_ref()) {
2150 ops.push(op);
2151 }
2152 }
2153
2154 ops
2155 }
2156
2157 pub fn app_document_keys(&self, type_id: u8) -> Vec<(u32, u64)> {
2161 let docs = self.app_documents.read().unwrap();
2162 docs.keys()
2163 .filter(|(tid, _, _)| *tid == type_id)
2164 .map(|(_, source, ts)| (*source, *ts))
2165 .collect()
2166 }
2167
2168 pub fn app_document_count(&self) -> usize {
2170 self.app_documents.read().unwrap().len()
2171 }
2172
2173 pub fn add_observer(&self, observer: Arc<dyn PeatObserver>) {
2177 self.observers.add(observer);
2178 }
2179
2180 pub fn remove_observer(&self, observer: &Arc<dyn PeatObserver>) {
2182 self.observers.remove(observer);
2183 }
2184
2185 pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
2192 let data = self.document_sync.send_emergency(timestamp);
2193 self.notify(PeatEvent::MeshStateChanged {
2194 peer_count: self.peer_manager.peer_count(),
2195 connected_count: self.peer_manager.connected_count(),
2196 });
2197 self.encrypt_document(&data)
2198 }
2199
2200 pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
2205 let data = self.document_sync.send_ack(timestamp);
2206 self.notify(PeatEvent::MeshStateChanged {
2207 peer_count: self.peer_manager.peer_count(),
2208 connected_count: self.peer_manager.connected_count(),
2209 });
2210 self.encrypt_document(&data)
2211 }
2212
2213 pub fn broadcast_bytes(&self, payload: &[u8]) -> Vec<u8> {
2220 self.encrypt_document(payload)
2221 }
2222
2223 pub fn clear_event(&self) {
2225 self.document_sync.clear_event();
2226 }
2227
2228 pub fn is_emergency_active(&self) -> bool {
2230 self.document_sync.is_emergency_active()
2231 }
2232
2233 pub fn is_ack_active(&self) -> bool {
2235 self.document_sync.is_ack_active()
2236 }
2237
2238 pub fn current_event(&self) -> Option<EventType> {
2240 self.document_sync.current_event()
2241 }
2242
2243 pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
2252 let data = self.document_sync.start_emergency(timestamp, known_peers);
2253 self.notify(PeatEvent::MeshStateChanged {
2254 peer_count: self.peer_manager.peer_count(),
2255 connected_count: self.peer_manager.connected_count(),
2256 });
2257 self.encrypt_document(&data)
2258 }
2259
2260 pub fn start_emergency_with_known_peers(&self, timestamp: u64) -> Vec<u8> {
2264 let peers: Vec<u32> = self
2265 .peer_manager
2266 .get_peers()
2267 .iter()
2268 .map(|p| p.node_id.as_u32())
2269 .collect();
2270 self.start_emergency(timestamp, &peers)
2271 }
2272
2273 pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
2278 let result = self.document_sync.ack_emergency(timestamp);
2279 if result.is_some() {
2280 self.notify(PeatEvent::MeshStateChanged {
2281 peer_count: self.peer_manager.peer_count(),
2282 connected_count: self.peer_manager.connected_count(),
2283 });
2284 }
2285 result.map(|data| self.encrypt_document(&data))
2286 }
2287
2288 pub fn clear_emergency(&self) {
2290 self.document_sync.clear_emergency();
2291 }
2292
2293 pub fn has_active_emergency(&self) -> bool {
2295 self.document_sync.has_active_emergency()
2296 }
2297
2298 pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
2302 self.document_sync.get_emergency_status()
2303 }
2304
2305 pub fn has_peer_acked(&self, peer_id: u32) -> bool {
2307 self.document_sync.has_peer_acked(peer_id)
2308 }
2309
2310 pub fn all_peers_acked(&self) -> bool {
2312 self.document_sync.all_peers_acked()
2313 }
2314
2315 #[cfg(feature = "legacy-chat")]
2325 pub fn send_chat(&self, sender: &str, text: &str, timestamp: u64) -> Option<Vec<u8>> {
2326 if self.document_sync.add_chat_message(sender, text, timestamp) {
2327 Some(self.encrypt_document(&self.build_document()))
2328 } else {
2329 None
2330 }
2331 }
2332
2333 #[cfg(feature = "legacy-chat")]
2341 pub fn send_chat_reply(
2342 &self,
2343 sender: &str,
2344 text: &str,
2345 reply_to_node: u32,
2346 reply_to_timestamp: u64,
2347 timestamp: u64,
2348 ) -> Option<Vec<u8>> {
2349 if self.document_sync.add_chat_reply(
2350 sender,
2351 text,
2352 reply_to_node,
2353 reply_to_timestamp,
2354 timestamp,
2355 ) {
2356 Some(self.encrypt_document(&self.build_document()))
2357 } else {
2358 None
2359 }
2360 }
2361
2362 #[cfg(feature = "legacy-chat")]
2364 pub fn chat_count(&self) -> usize {
2365 self.document_sync.chat_count()
2366 }
2367
2368 #[cfg(feature = "legacy-chat")]
2372 pub fn chat_messages_since(
2373 &self,
2374 since_timestamp: u64,
2375 ) -> Vec<(u32, u64, String, String, u32, u64)> {
2376 self.document_sync.chat_messages_since(since_timestamp)
2377 }
2378
2379 #[cfg(feature = "legacy-chat")]
2383 pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
2384 self.document_sync.all_chat_messages()
2385 }
2386
2387 pub fn on_ble_discovered(
2393 &self,
2394 identifier: &str,
2395 name: Option<&str>,
2396 rssi: i8,
2397 mesh_id: Option<&str>,
2398 now_ms: u64,
2399 ) -> Option<PeatPeer> {
2400 let (node_id, is_new) = self
2401 .peer_manager
2402 .on_discovered(identifier, name, rssi, mesh_id, now_ms)?;
2403
2404 let peer = self.peer_manager.get_peer(node_id)?;
2405
2406 {
2408 let mut graph = self.connection_graph.lock().unwrap();
2409 graph.on_discovered(
2410 node_id,
2411 identifier.to_string(),
2412 name.map(|s| s.to_string()),
2413 mesh_id.map(|s| s.to_string()),
2414 rssi,
2415 now_ms,
2416 );
2417 }
2418
2419 if is_new {
2420 self.notify(PeatEvent::PeerDiscovered { peer: peer.clone() });
2421 self.notify_mesh_state_changed();
2422 }
2423
2424 Some(peer)
2425 }
2426
2427 pub fn on_ble_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
2431 let node_id = match self.peer_manager.on_connected(identifier, now_ms) {
2432 Some(id) => id,
2433 None => {
2434 log::warn!(
2435 "on_ble_connected: identifier {:?} not in peer map — \
2436 use on_incoming_connection() for peripheral connections",
2437 identifier
2438 );
2439 return None;
2440 }
2441 };
2442
2443 {
2445 let mut graph = self.connection_graph.lock().unwrap();
2446 graph.on_connected(node_id, now_ms);
2447 }
2448
2449 self.register_peer_for_delta(&node_id);
2451
2452 self.notify(PeatEvent::PeerConnected { node_id });
2453 self.notify_mesh_state_changed();
2454 Some(node_id)
2455 }
2456
2457 pub fn on_ble_disconnected(
2459 &self,
2460 identifier: &str,
2461 reason: DisconnectReason,
2462 ) -> Option<NodeId> {
2463 let (node_id, observer_reason) = self.peer_manager.on_disconnected(identifier, reason)?;
2464
2465 {
2467 let mut graph = self.connection_graph.lock().unwrap();
2468 let platform_reason = match observer_reason {
2469 DisconnectReason::LocalRequest => crate::platform::DisconnectReason::LocalRequest,
2470 DisconnectReason::RemoteRequest => crate::platform::DisconnectReason::RemoteRequest,
2471 DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2472 DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2473 DisconnectReason::ConnectionFailed => {
2474 crate::platform::DisconnectReason::ConnectionFailed
2475 }
2476 DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2477 };
2478 let now_ms = std::time::SystemTime::now()
2479 .duration_since(std::time::UNIX_EPOCH)
2480 .map(|d| d.as_millis() as u64)
2481 .unwrap_or(0);
2482 graph.on_disconnected(node_id, platform_reason, now_ms);
2483
2484 graph.remove_via_peer(node_id);
2487 }
2488
2489 self.unregister_peer_for_delta(&node_id);
2491
2492 self.notify(PeatEvent::PeerDisconnected {
2493 node_id,
2494 reason: observer_reason,
2495 });
2496 self.notify_mesh_state_changed();
2497 Some(node_id)
2498 }
2499
2500 pub fn on_peer_disconnected(&self, node_id: NodeId, reason: DisconnectReason) {
2504 if self
2505 .peer_manager
2506 .on_disconnected_by_node_id(node_id, reason)
2507 {
2508 {
2510 let mut graph = self.connection_graph.lock().unwrap();
2511 let platform_reason = match reason {
2512 DisconnectReason::LocalRequest => {
2513 crate::platform::DisconnectReason::LocalRequest
2514 }
2515 DisconnectReason::RemoteRequest => {
2516 crate::platform::DisconnectReason::RemoteRequest
2517 }
2518 DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2519 DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2520 DisconnectReason::ConnectionFailed => {
2521 crate::platform::DisconnectReason::ConnectionFailed
2522 }
2523 DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2524 };
2525 let now_ms = std::time::SystemTime::now()
2526 .duration_since(std::time::UNIX_EPOCH)
2527 .map(|d| d.as_millis() as u64)
2528 .unwrap_or(0);
2529 graph.on_disconnected(node_id, platform_reason, now_ms);
2530
2531 graph.remove_via_peer(node_id);
2533 }
2534
2535 self.unregister_peer_for_delta(&node_id);
2537
2538 self.notify(PeatEvent::PeerDisconnected { node_id, reason });
2539 self.notify_mesh_state_changed();
2540 }
2541 }
2542
2543 pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
2547 let is_new = self
2548 .peer_manager
2549 .on_incoming_connection(identifier, node_id, now_ms);
2550
2551 {
2553 let mut graph = self.connection_graph.lock().unwrap();
2554 if is_new {
2555 graph.on_discovered(
2556 node_id,
2557 identifier.to_string(),
2558 None,
2559 Some(self.config.mesh_id.clone()),
2560 -50, now_ms,
2562 );
2563 }
2564 graph.on_connected(node_id, now_ms);
2565 }
2566
2567 self.register_peer_for_delta(&node_id);
2569
2570 if is_new {
2571 if let Some(peer) = self.peer_manager.get_peer(node_id) {
2572 self.notify(PeatEvent::PeerDiscovered { peer });
2573 }
2574 }
2575
2576 self.notify(PeatEvent::PeerConnected { node_id });
2577 self.notify_mesh_state_changed();
2578
2579 is_new
2580 }
2581
2582 pub fn on_ble_data_received(
2589 &self,
2590 identifier: &str,
2591 data: &[u8],
2592 now_ms: u64,
2593 ) -> Option<DataReceivedResult> {
2594 let node_id = self.peer_manager.get_node_id(identifier)?;
2596
2597 if data.len() >= 2 {
2599 match data[0] {
2600 KEY_EXCHANGE_MARKER => {
2601 let _response = self.handle_key_exchange(data, now_ms);
2603 return None;
2605 }
2606 PEER_E2EE_MARKER => {
2607 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2609 return None;
2611 }
2612 RELAY_ENVELOPE_MARKER => {
2613 return self
2615 .handle_relay_envelope_with_identifier(node_id, identifier, data, now_ms);
2616 }
2617 _ => {}
2618 }
2619 }
2620
2621 self.process_document_data_with_identifier(node_id, identifier, data, now_ms, None, None, 0)
2623 }
2624
2625 #[allow(clippy::too_many_arguments)]
2627 fn process_document_data_with_identifier(
2628 &self,
2629 source_node: NodeId,
2630 identifier: &str,
2631 data: &[u8],
2632 now_ms: u64,
2633 relay_data: Option<Vec<u8>>,
2634 origin_node: Option<NodeId>,
2635 hop_count: u8,
2636 ) -> Option<DataReceivedResult> {
2637 let decrypted = self.decrypt_document(data, Some(identifier))?;
2639
2640 #[cfg(feature = "peat-lite-frame")]
2650 match crate::peat_lite_frame::try_handle_peat_lite_frame(&decrypted) {
2651 crate::peat_lite_frame::PeatLiteFrameOutcome::NotPeatLiteFrame => {}
2652 crate::peat_lite_frame::PeatLiteFrameOutcome::Handled => return None,
2653 crate::peat_lite_frame::PeatLiteFrameOutcome::Decoded(frame) => {
2654 return Some(DataReceivedResult {
2655 source_node,
2656 peat_lite_document: Some(frame),
2657 ..Default::default()
2658 });
2659 }
2660 }
2661
2662 #[cfg(feature = "translator-codec")]
2669 match self.try_handle_translator_marker(&decrypted, Some(identifier), Some(source_node)) {
2670 TranslatorMarkerOutcome::NotTranslatorMarker => {}
2671 TranslatorMarkerOutcome::Handled => return None,
2672 TranslatorMarkerOutcome::Decoded(frame) => {
2673 return Some(DataReceivedResult::translator_frame(source_node, frame));
2674 }
2675 }
2676
2677 if DeltaDocument::is_delta_document(&decrypted) {
2679 return self.process_delta_document_internal(
2680 source_node,
2681 &decrypted,
2682 now_ms,
2683 relay_data,
2684 origin_node,
2685 hop_count,
2686 );
2687 }
2688
2689 let result = self.document_sync.merge_document(&decrypted)?;
2691
2692 if let Some(ref peripheral) = result.peer_peripheral {
2694 if let Ok(mut peripherals) = self.peer_peripherals.write() {
2695 peripherals.insert(result.source_node, peripheral.clone());
2696 }
2697 }
2698
2699 self.peer_manager.record_sync(source_node, now_ms);
2701
2702 if result.is_emergency() {
2704 self.notify(PeatEvent::EmergencyReceived {
2705 from_node: result.source_node,
2706 });
2707 } else if result.is_ack() {
2708 self.notify(PeatEvent::AckReceived {
2709 from_node: result.source_node,
2710 });
2711 }
2712
2713 if result.counter_changed {
2714 self.notify(PeatEvent::DocumentSynced {
2715 from_node: result.source_node,
2716 total_count: result.total_count,
2717 });
2718 }
2719
2720 if relay_data.is_some() {
2722 let relay_targets = self.get_relay_targets(Some(source_node));
2723 self.notify(PeatEvent::MessageRelayed {
2724 origin_node: origin_node.unwrap_or(result.source_node),
2725 relay_count: relay_targets.len(),
2726 hop_count,
2727 });
2728 }
2729
2730 let PeripheralFields {
2731 callsign,
2732 battery_percent,
2733 heart_rate,
2734 event_type,
2735 latitude,
2736 longitude,
2737 altitude,
2738 activity_level,
2739 alerts,
2740 } = DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2741
2742 Some(DataReceivedResult {
2743 source_node: result.source_node,
2744 is_emergency: result.is_emergency(),
2745 is_ack: result.is_ack(),
2746 counter_changed: result.counter_changed,
2747 emergency_changed: result.emergency_changed,
2748 total_count: result.total_count,
2749 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2750 relay_data,
2751 origin_node,
2752 hop_count,
2753 callsign,
2754 battery_percent,
2755 heart_rate,
2756 event_type,
2757 latitude,
2758 longitude,
2759 altitude,
2760 activity_level,
2761 alerts,
2762 ..Default::default()
2763 })
2764 }
2765
2766 fn handle_relay_envelope_with_identifier(
2768 &self,
2769 source_node: NodeId,
2770 identifier: &str,
2771 data: &[u8],
2772 now_ms: u64,
2773 ) -> Option<DataReceivedResult> {
2774 let envelope = RelayEnvelope::decode(data)?;
2776
2777 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2779 let stats = self
2780 .seen_cache
2781 .lock()
2782 .unwrap()
2783 .get_stats(&envelope.message_id);
2784 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2785
2786 self.notify(PeatEvent::DuplicateMessageDropped {
2787 origin_node: envelope.origin_node,
2788 seen_count,
2789 });
2790 return None;
2791 }
2792
2793 let relay_data = if envelope.can_relay() && self.config.enable_relay {
2795 envelope.relay().map(|e| e.encode())
2796 } else {
2797 if !envelope.can_relay() {
2798 self.notify(PeatEvent::MessageTtlExpired {
2799 origin_node: envelope.origin_node,
2800 hop_count: envelope.hop_count,
2801 });
2802 }
2803 None
2804 };
2805
2806 self.process_document_data_with_identifier(
2808 source_node,
2809 identifier,
2810 &envelope.payload,
2811 now_ms,
2812 relay_data,
2813 Some(envelope.origin_node),
2814 envelope.hop_count,
2815 )
2816 }
2817
2818 pub fn on_ble_data_received_from_node(
2825 &self,
2826 node_id: NodeId,
2827 data: &[u8],
2828 now_ms: u64,
2829 ) -> Option<DataReceivedResult> {
2830 if data.len() >= 2 {
2832 match data[0] {
2833 KEY_EXCHANGE_MARKER => {
2834 let _response = self.handle_key_exchange(data, now_ms);
2835 return None;
2836 }
2837 PEER_E2EE_MARKER => {
2838 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2839 return None;
2840 }
2841 RELAY_ENVELOPE_MARKER => {
2842 return self.handle_relay_envelope(node_id, data, now_ms);
2844 }
2845 _ => {}
2846 }
2847 }
2848
2849 self.process_document_data(node_id, data, now_ms, None, None, 0)
2851 }
2852
2853 pub fn on_ble_data_received_anonymous(
2863 &self,
2864 identifier: &str,
2865 data: &[u8],
2866 now_ms: u64,
2867 ) -> Option<DataReceivedResult> {
2868 log::debug!(
2869 "on_ble_data_received_anonymous: identifier={}, len={}, marker=0x{:02X}",
2870 identifier,
2871 data.len(),
2872 data.first().copied().unwrap_or(0)
2873 );
2874
2875 let decrypted = match self.decrypt_document(data, Some(identifier)) {
2877 Some(d) => d,
2878 None => {
2879 log::warn!(
2880 "on_ble_data_received_anonymous: decrypt/parse FAILED for {} byte doc from {}",
2881 data.len(),
2882 identifier
2883 );
2884 return None;
2885 }
2886 };
2887
2888 #[cfg(feature = "peat-lite-frame")]
2894 match crate::peat_lite_frame::try_handle_peat_lite_frame(&decrypted) {
2895 crate::peat_lite_frame::PeatLiteFrameOutcome::NotPeatLiteFrame => {}
2896 crate::peat_lite_frame::PeatLiteFrameOutcome::Handled => return None,
2897 crate::peat_lite_frame::PeatLiteFrameOutcome::Decoded(frame) => {
2898 let source = NodeId::new(frame.source_node_id);
2899 return Some(DataReceivedResult {
2900 source_node: source,
2901 peat_lite_document: Some(frame),
2902 ..Default::default()
2903 });
2904 }
2905 }
2906
2907 #[cfg(feature = "translator-codec")]
2918 match self.try_handle_translator_marker(&decrypted, Some(identifier), None) {
2919 TranslatorMarkerOutcome::NotTranslatorMarker => {}
2920 TranslatorMarkerOutcome::Handled => return None,
2921 TranslatorMarkerOutcome::Decoded(frame) => {
2922 return Some(DataReceivedResult::translator_frame(NodeId::new(0), frame));
2927 }
2928 }
2929
2930 if decrypted.len() < 8 {
2933 log::warn!("Decrypted document too short to extract source_node");
2934 return None;
2935 }
2936
2937 let source_node_u32 =
2938 u32::from_le_bytes([decrypted[4], decrypted[5], decrypted[6], decrypted[7]]);
2939 let source_node = NodeId::new(source_node_u32);
2940
2941 log::info!(
2942 "Anonymous document from {}: source_node={:08X}, len={}",
2943 identifier,
2944 source_node_u32,
2945 decrypted.len()
2946 );
2947
2948 self.peer_manager
2951 .register_identifier(identifier, source_node);
2952
2953 let is_delta = DeltaDocument::is_delta_document(&decrypted);
2955 log::info!(
2956 "Document format: delta={}, first_byte=0x{:02X}, len={}",
2957 is_delta,
2958 decrypted.first().copied().unwrap_or(0),
2959 decrypted.len()
2960 );
2961
2962 if is_delta {
2963 return self.process_delta_document_internal(
2964 source_node,
2965 &decrypted,
2966 now_ms,
2967 None,
2968 None,
2969 0,
2970 );
2971 }
2972
2973 const APP_LAYER_MARKER: u8 = 0xAF;
2977 if decrypted.first().copied() == Some(APP_LAYER_MARKER) {
2978 log::debug!(
2979 "App-layer message (0xAF) from {:08X}, {} bytes - passing to relay",
2980 source_node.as_u32(),
2981 decrypted.len()
2982 );
2983 return Some(DataReceivedResult {
2984 source_node,
2985 is_emergency: false,
2986 is_ack: false,
2987 counter_changed: false,
2988 emergency_changed: false,
2989 total_count: 0,
2990 event_timestamp: now_ms,
2991 relay_data: Some(decrypted.to_vec()),
2992 origin_node: None,
2993 hop_count: 0,
2994 callsign: None,
2995 battery_percent: None,
2996 heart_rate: None,
2997 event_type: None,
2998 latitude: None,
2999 longitude: None,
3000 altitude: None,
3001 ..Default::default()
3002 });
3003 }
3004
3005 log::info!(
3007 "Processing legacy document from {:08X}",
3008 source_node.as_u32()
3009 );
3010 let result = self.document_sync.merge_document(&decrypted)?;
3011
3012 log::info!(
3014 "Merge result: peer_peripheral={}, counter_changed={}",
3015 result.peer_peripheral.is_some(),
3016 result.counter_changed
3017 );
3018 if let Some(ref p) = result.peer_peripheral {
3019 log::info!("Peripheral callsign: '{}'", p.callsign_str());
3020 }
3021
3022 self.peer_manager.record_sync(source_node, now_ms);
3024
3025 if result.is_emergency() {
3027 self.notify(PeatEvent::EmergencyReceived {
3028 from_node: result.source_node,
3029 });
3030 } else if result.is_ack() {
3031 self.notify(PeatEvent::AckReceived {
3032 from_node: result.source_node,
3033 });
3034 }
3035
3036 if result.counter_changed {
3037 self.notify(PeatEvent::DocumentSynced {
3038 from_node: result.source_node,
3039 total_count: result.total_count,
3040 });
3041 }
3042
3043 let PeripheralFields {
3044 callsign,
3045 battery_percent,
3046 heart_rate,
3047 event_type,
3048 latitude,
3049 longitude,
3050 altitude,
3051 activity_level,
3052 alerts,
3053 } = DataReceivedResult::peripheral_fields(&result.peer_peripheral);
3054
3055 Some(DataReceivedResult {
3056 source_node: result.source_node,
3057 is_emergency: result.is_emergency(),
3058 is_ack: result.is_ack(),
3059 counter_changed: result.counter_changed,
3060 emergency_changed: result.emergency_changed,
3061 total_count: result.total_count,
3062 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
3063 relay_data: None,
3064 origin_node: None,
3065 hop_count: 0,
3066 callsign,
3067 battery_percent,
3068 heart_rate,
3069 event_type,
3070 latitude,
3071 longitude,
3072 altitude,
3073 activity_level,
3074 alerts,
3075 ..Default::default()
3076 })
3077 }
3078
3079 fn process_document_data(
3081 &self,
3082 source_node: NodeId,
3083 data: &[u8],
3084 now_ms: u64,
3085 relay_data: Option<Vec<u8>>,
3086 origin_node: Option<NodeId>,
3087 hop_count: u8,
3088 ) -> Option<DataReceivedResult> {
3089 let source_hint = format!("node:{:08X}", source_node.as_u32());
3091 let decrypted = self.decrypt_document(data, Some(&source_hint))?;
3092
3093 #[cfg(feature = "peat-lite-frame")]
3096 match crate::peat_lite_frame::try_handle_peat_lite_frame(&decrypted) {
3097 crate::peat_lite_frame::PeatLiteFrameOutcome::NotPeatLiteFrame => {}
3098 crate::peat_lite_frame::PeatLiteFrameOutcome::Handled => return None,
3099 crate::peat_lite_frame::PeatLiteFrameOutcome::Decoded(frame) => {
3100 return Some(DataReceivedResult {
3101 source_node,
3102 relay_data,
3103 origin_node,
3104 hop_count,
3105 peat_lite_document: Some(frame),
3106 ..Default::default()
3107 });
3108 }
3109 }
3110
3111 #[cfg(feature = "translator-codec")]
3115 match self.try_handle_translator_marker(&decrypted, None, Some(source_node)) {
3116 TranslatorMarkerOutcome::NotTranslatorMarker => {}
3117 TranslatorMarkerOutcome::Handled => return None,
3118 TranslatorMarkerOutcome::Decoded(frame) => {
3119 return Some(DataReceivedResult::translator_frame(source_node, frame));
3120 }
3121 }
3122
3123 if DeltaDocument::is_delta_document(&decrypted) {
3125 return self.process_delta_document_internal(
3126 source_node,
3127 &decrypted,
3128 now_ms,
3129 relay_data,
3130 origin_node,
3131 hop_count,
3132 );
3133 }
3134
3135 let result = self.document_sync.merge_document(&decrypted)?;
3137
3138 if let Some(ref peripheral) = result.peer_peripheral {
3140 if let Ok(mut peripherals) = self.peer_peripherals.write() {
3141 peripherals.insert(result.source_node, peripheral.clone());
3142 }
3143 }
3144
3145 self.peer_manager.record_sync(source_node, now_ms);
3147
3148 if result.is_emergency() {
3150 self.notify(PeatEvent::EmergencyReceived {
3151 from_node: result.source_node,
3152 });
3153 } else if result.is_ack() {
3154 self.notify(PeatEvent::AckReceived {
3155 from_node: result.source_node,
3156 });
3157 }
3158
3159 if result.counter_changed {
3160 self.notify(PeatEvent::DocumentSynced {
3161 from_node: result.source_node,
3162 total_count: result.total_count,
3163 });
3164 }
3165
3166 if relay_data.is_some() {
3168 let relay_targets = self.get_relay_targets(Some(source_node));
3169 self.notify(PeatEvent::MessageRelayed {
3170 origin_node: origin_node.unwrap_or(result.source_node),
3171 relay_count: relay_targets.len(),
3172 hop_count,
3173 });
3174 }
3175
3176 let PeripheralFields {
3177 callsign,
3178 battery_percent,
3179 heart_rate,
3180 event_type,
3181 latitude,
3182 longitude,
3183 altitude,
3184 activity_level,
3185 alerts,
3186 } = DataReceivedResult::peripheral_fields(&result.peer_peripheral);
3187
3188 Some(DataReceivedResult {
3189 source_node: result.source_node,
3190 is_emergency: result.is_emergency(),
3191 is_ack: result.is_ack(),
3192 counter_changed: result.counter_changed,
3193 emergency_changed: result.emergency_changed,
3194 total_count: result.total_count,
3195 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
3196 relay_data,
3197 origin_node,
3198 hop_count,
3199 callsign,
3200 battery_percent,
3201 heart_rate,
3202 event_type,
3203 latitude,
3204 longitude,
3205 altitude,
3206 activity_level,
3207 alerts,
3208 ..Default::default()
3209 })
3210 }
3211
3212 fn handle_relay_envelope(
3214 &self,
3215 source_node: NodeId,
3216 data: &[u8],
3217 now_ms: u64,
3218 ) -> Option<DataReceivedResult> {
3219 let decision = self.process_relay_envelope(data, source_node, now_ms)?;
3221
3222 let relay_data = if decision.should_relay {
3224 decision.relay_data()
3225 } else {
3226 None
3227 };
3228
3229 self.process_document_data(
3231 source_node,
3232 &decision.payload,
3233 now_ms,
3234 relay_data,
3235 Some(decision.origin_node),
3236 decision.hop_count,
3237 )
3238 }
3239
3240 pub fn on_ble_data(
3249 &self,
3250 identifier: &str,
3251 data: &[u8],
3252 now_ms: u64,
3253 ) -> Option<DataReceivedResult> {
3254 if data.len() >= 2 {
3256 match data[0] {
3257 KEY_EXCHANGE_MARKER => {
3258 let _response = self.handle_key_exchange(data, now_ms);
3259 return None;
3260 }
3261 PEER_E2EE_MARKER => {
3262 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
3263 return None;
3264 }
3265 RELAY_ENVELOPE_MARKER => {
3266 return self.handle_relay_envelope_with_incoming(identifier, data, now_ms);
3268 }
3269 _ => {}
3270 }
3271 }
3272
3273 self.process_incoming_document(identifier, data, now_ms, None, None, 0)
3275 }
3276
3277 fn process_incoming_document(
3279 &self,
3280 identifier: &str,
3281 data: &[u8],
3282 now_ms: u64,
3283 relay_data: Option<Vec<u8>>,
3284 origin_node: Option<NodeId>,
3285 hop_count: u8,
3286 ) -> Option<DataReceivedResult> {
3287 let decrypted = self.decrypt_document(data, Some(identifier))?;
3289
3290 let result = self.document_sync.merge_document(&decrypted)?;
3292
3293 self.peer_manager.record_sync(result.source_node, now_ms);
3295
3296 if origin_node.is_none() {
3301 let is_new =
3303 self.peer_manager
3304 .on_incoming_connection(identifier, result.source_node, now_ms);
3305
3306 {
3308 let mut graph = self.connection_graph.lock().unwrap();
3309 if is_new {
3310 graph.on_discovered(
3311 result.source_node,
3312 identifier.to_string(),
3313 None,
3314 Some(self.config.mesh_id.clone()),
3315 -50, now_ms,
3317 );
3318 }
3319 graph.on_connected(result.source_node, now_ms);
3320 }
3321 }
3322
3323 if result.is_emergency() {
3325 self.notify(PeatEvent::EmergencyReceived {
3326 from_node: result.source_node,
3327 });
3328 } else if result.is_ack() {
3329 self.notify(PeatEvent::AckReceived {
3330 from_node: result.source_node,
3331 });
3332 }
3333
3334 if result.counter_changed {
3335 self.notify(PeatEvent::DocumentSynced {
3336 from_node: result.source_node,
3337 total_count: result.total_count,
3338 });
3339 }
3340
3341 if relay_data.is_some() {
3343 let relay_targets = self.get_relay_targets(Some(result.source_node));
3344 self.notify(PeatEvent::MessageRelayed {
3345 origin_node: origin_node.unwrap_or(result.source_node),
3346 relay_count: relay_targets.len(),
3347 hop_count,
3348 });
3349 }
3350
3351 let PeripheralFields {
3352 callsign,
3353 battery_percent,
3354 heart_rate,
3355 event_type,
3356 latitude,
3357 longitude,
3358 altitude,
3359 activity_level,
3360 alerts,
3361 } = DataReceivedResult::peripheral_fields(&result.peer_peripheral);
3362
3363 Some(DataReceivedResult {
3364 source_node: result.source_node,
3365 is_emergency: result.is_emergency(),
3366 is_ack: result.is_ack(),
3367 counter_changed: result.counter_changed,
3368 emergency_changed: result.emergency_changed,
3369 total_count: result.total_count,
3370 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
3371 relay_data,
3372 origin_node,
3373 hop_count,
3374 callsign,
3375 battery_percent,
3376 heart_rate,
3377 event_type,
3378 latitude,
3379 longitude,
3380 altitude,
3381 activity_level,
3382 alerts,
3383 ..Default::default()
3384 })
3385 }
3386
3387 fn handle_relay_envelope_with_incoming(
3389 &self,
3390 identifier: &str,
3391 data: &[u8],
3392 now_ms: u64,
3393 ) -> Option<DataReceivedResult> {
3394 let envelope = RelayEnvelope::decode(data)?;
3396
3397 if let Some(source_peer) = self.peer_manager.get_node_id(identifier) {
3400 if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
3401 let is_new = self.connection_graph.lock().unwrap().on_relay_received(
3402 source_peer,
3403 envelope.origin_node,
3404 envelope.hop_count,
3405 now_ms,
3406 );
3407
3408 if is_new {
3409 log::debug!(
3410 "Discovered indirect peer {:08X} via {:08X} ({} hops)",
3411 envelope.origin_node.as_u32(),
3412 source_peer.as_u32(),
3413 envelope.hop_count
3414 );
3415 }
3416 }
3417 }
3418
3419 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
3421 let stats = self
3423 .seen_cache
3424 .lock()
3425 .unwrap()
3426 .get_stats(&envelope.message_id);
3427 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
3428
3429 self.notify(PeatEvent::DuplicateMessageDropped {
3430 origin_node: envelope.origin_node,
3431 seen_count,
3432 });
3433 return None;
3434 }
3435
3436 let (should_relay, relay_data) = if envelope.can_relay() && self.config.enable_relay {
3438 let relay_env = envelope.relay();
3439 (true, relay_env.map(|e| e.encode()))
3440 } else {
3441 if !envelope.can_relay() {
3442 self.notify(PeatEvent::MessageTtlExpired {
3443 origin_node: envelope.origin_node,
3444 hop_count: envelope.hop_count,
3445 });
3446 }
3447 (false, None)
3448 };
3449
3450 self.process_incoming_document(
3452 identifier,
3453 &envelope.payload,
3454 now_ms,
3455 if should_relay { relay_data } else { None },
3456 Some(envelope.origin_node),
3457 envelope.hop_count,
3458 )
3459 }
3460
3461 pub fn tick(&self, now_ms: u64) -> Option<Vec<u8>> {
3471 use std::sync::atomic::Ordering;
3472
3473 let now_ms_32 = now_ms as u32;
3475
3476 let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3478 let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3479 if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3480 self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3481 let removed = self.peer_manager.cleanup_stale(now_ms);
3482 for node_id in &removed {
3483 self.notify(PeatEvent::PeerLost { node_id: *node_id });
3484 }
3485 if !removed.is_empty() {
3486 self.notify_mesh_state_changed();
3487 }
3488
3489 {
3491 let mut graph = self.connection_graph.lock().unwrap();
3492 let newly_lost = graph.tick(now_ms);
3493 graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3495 drop(graph);
3496
3497 for node_id in newly_lost {
3500 if !removed.contains(&node_id) {
3502 self.notify(PeatEvent::PeerLost { node_id });
3503 }
3504 }
3505 }
3506 }
3507
3508 let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3510 let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3511 if sync_elapsed >= self.config.sync_interval_ms as u32 {
3512 self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3513 if self.peer_manager.connected_count() > 0 {
3515 let doc = self.document_sync.build_document();
3516 return Some(self.encrypt_document(&doc));
3517 }
3518 }
3519
3520 None
3521 }
3522
3523 pub fn tick_with_peer_deltas(&self, now_ms: u64) -> Vec<(NodeId, Vec<u8>)> {
3532 use std::sync::atomic::Ordering;
3533 let now_ms_32 = now_ms as u32;
3534
3535 let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3537 let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3538 if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3539 self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3540 let removed = self.peer_manager.cleanup_stale(now_ms);
3541 for node_id in &removed {
3542 self.notify(PeatEvent::PeerLost { node_id: *node_id });
3543 }
3544 if !removed.is_empty() {
3545 self.notify_mesh_state_changed();
3546 }
3547
3548 {
3550 let mut graph = self.connection_graph.lock().unwrap();
3551 let newly_lost = graph.tick(now_ms);
3552 graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3553 drop(graph);
3554
3555 for node_id in newly_lost {
3556 if !removed.contains(&node_id) {
3557 self.notify(PeatEvent::PeerLost { node_id });
3558 }
3559 }
3560 }
3561 }
3562
3563 let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3565 let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3566 if sync_elapsed >= self.config.sync_interval_ms as u32 {
3567 self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3568
3569 let doc = self.document_sync.build_document();
3571 let encrypted = self.encrypt_document(&doc);
3572 let mut results = Vec::new();
3573 for peer in self.get_connected_peers() {
3574 results.push((peer.node_id, encrypted.clone()));
3575 }
3576 return results;
3577 }
3578
3579 Vec::new()
3580 }
3581
3582 pub fn get_peers(&self) -> Vec<PeatPeer> {
3586 self.peer_manager.get_peers()
3587 }
3588
3589 pub fn get_connected_peers(&self) -> Vec<PeatPeer> {
3591 self.peer_manager.get_connected_peers()
3592 }
3593
3594 pub fn get_peer(&self, node_id: NodeId) -> Option<PeatPeer> {
3596 self.peer_manager.get_peer(node_id)
3597 }
3598
3599 pub fn peer_count(&self) -> usize {
3601 self.peer_manager.peer_count()
3602 }
3603
3604 pub fn connected_count(&self) -> usize {
3606 self.peer_manager.connected_count()
3607 }
3608
3609 pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
3611 self.peer_manager.matches_mesh(device_mesh_id)
3612 }
3613
3614 pub fn get_connection_graph(&self) -> Vec<PeerConnectionState> {
3638 self.connection_graph.lock().unwrap().get_all_owned()
3639 }
3640
3641 pub fn get_peer_connection_state(&self, node_id: NodeId) -> Option<PeerConnectionState> {
3643 self.connection_graph
3644 .lock()
3645 .unwrap()
3646 .get_peer(node_id)
3647 .cloned()
3648 }
3649
3650 pub fn get_connected_states(&self) -> Vec<PeerConnectionState> {
3652 self.connection_graph
3653 .lock()
3654 .unwrap()
3655 .get_connected()
3656 .into_iter()
3657 .cloned()
3658 .collect()
3659 }
3660
3661 pub fn get_degraded_peers(&self) -> Vec<PeerConnectionState> {
3663 self.connection_graph
3664 .lock()
3665 .unwrap()
3666 .get_degraded()
3667 .into_iter()
3668 .cloned()
3669 .collect()
3670 }
3671
3672 pub fn get_recently_disconnected(
3676 &self,
3677 within_ms: u64,
3678 now_ms: u64,
3679 ) -> Vec<PeerConnectionState> {
3680 self.connection_graph
3681 .lock()
3682 .unwrap()
3683 .get_recently_disconnected(within_ms, now_ms)
3684 .into_iter()
3685 .cloned()
3686 .collect()
3687 }
3688
3689 pub fn get_lost_peers(&self) -> Vec<PeerConnectionState> {
3691 self.connection_graph
3692 .lock()
3693 .unwrap()
3694 .get_lost()
3695 .into_iter()
3696 .cloned()
3697 .collect()
3698 }
3699
3700 pub fn get_connection_state_counts(&self) -> StateCountSummary {
3702 self.connection_graph.lock().unwrap().state_counts()
3703 }
3704
3705 pub fn get_indirect_peers(&self) -> Vec<IndirectPeer> {
3713 self.connection_graph
3714 .lock()
3715 .unwrap()
3716 .get_indirect_peers_owned()
3717 }
3718
3719 pub fn get_peer_degree(&self, node_id: NodeId) -> Option<PeerDegree> {
3726 self.connection_graph.lock().unwrap().peer_degree(node_id)
3727 }
3728
3729 pub fn get_full_state_counts(&self) -> FullStateCountSummary {
3734 self.connection_graph.lock().unwrap().full_state_counts()
3735 }
3736
3737 pub fn get_paths_to_peer(&self, node_id: NodeId) -> Vec<(NodeId, u8)> {
3742 self.connection_graph.lock().unwrap().get_paths_to(node_id)
3743 }
3744
3745 pub fn is_peer_known(&self, node_id: NodeId) -> bool {
3747 self.connection_graph.lock().unwrap().is_known(node_id)
3748 }
3749
3750 pub fn indirect_peer_count(&self) -> usize {
3752 self.connection_graph.lock().unwrap().indirect_peer_count()
3753 }
3754
3755 pub fn cleanup_indirect_peers(&self, now_ms: u64) -> Vec<NodeId> {
3760 self.connection_graph
3761 .lock()
3762 .unwrap()
3763 .cleanup_indirect(now_ms)
3764 }
3765
3766 pub fn total_count(&self) -> u64 {
3768 self.document_sync.total_count()
3769 }
3770
3771 pub fn document_version(&self) -> u32 {
3773 self.document_sync.version()
3774 }
3775
3776 pub fn version(&self) -> u32 {
3778 self.document_sync.version()
3779 }
3780
3781 pub fn update_health(&self, battery_percent: u8) {
3783 self.document_sync.update_health(battery_percent);
3784 }
3785
3786 pub fn update_activity(&self, activity: u8) {
3792 self.document_sync.update_activity(activity);
3793 }
3794
3795 pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
3797 self.document_sync
3798 .update_health_full(battery_percent, activity);
3799 }
3800
3801 pub fn update_heart_rate(&self, heart_rate: u8) {
3803 self.document_sync.update_heart_rate(heart_rate);
3804 }
3805
3806 pub fn update_alerts(&self, alerts: u8) {
3810 self.document_sync.update_alerts(alerts);
3811 }
3812
3813 pub fn update_location(&self, latitude: f32, longitude: f32, altitude: Option<f32>) {
3815 self.document_sync
3816 .update_location(latitude, longitude, altitude);
3817 }
3818
3819 pub fn clear_location(&self) {
3821 self.document_sync.clear_location();
3822 }
3823
3824 pub fn update_callsign(&self, callsign: &str) {
3826 self.document_sync.update_callsign(callsign);
3827 }
3828
3829 pub fn set_peripheral_event(&self, event_type: EventType, timestamp: u64) {
3831 self.document_sync
3832 .set_peripheral_event(event_type, timestamp);
3833 }
3834
3835 pub fn clear_peripheral_event(&self) {
3837 self.document_sync.clear_peripheral_event();
3838 }
3839
3840 #[allow(clippy::too_many_arguments)]
3845 pub fn update_peripheral_state(
3846 &self,
3847 callsign: &str,
3848 battery_percent: u8,
3849 heart_rate: Option<u8>,
3850 latitude: Option<f32>,
3851 longitude: Option<f32>,
3852 altitude: Option<f32>,
3853 event_type: Option<EventType>,
3854 timestamp: u64,
3855 ) {
3856 self.document_sync.update_peripheral_state(
3857 callsign,
3858 battery_percent,
3859 heart_rate,
3860 latitude,
3861 longitude,
3862 altitude,
3863 event_type,
3864 timestamp,
3865 );
3866 }
3867
3868 pub fn build_document(&self) -> Vec<u8> {
3872 let doc = self.document_sync.build_document();
3873 self.encrypt_document(&doc)
3874 }
3875
3876 #[cfg(feature = "translator-codec")]
3903 pub fn publish_translator_frame(
3904 &self,
3905 collection: &str,
3906 value: &serde_json::Value,
3907 ) -> Option<Vec<u8>> {
3908 let translator = self.ble_translator.read().ok()?;
3909 let framed = translator.encode_outbound_sync(value, collection)?;
3910 drop(translator);
3911 Some(self.encrypt_document(&framed))
3912 }
3913
3914 #[cfg(feature = "translator-codec")]
3933 pub fn publish_platform_advertisement(
3934 &self,
3935 peripheral: &crate::translator::BlePeripheral,
3936 ) -> Option<Vec<u8>> {
3937 let payload = crate::translator::postcard_encode(peripheral)?;
3938 let mut framed = Vec::with_capacity(2 + payload.len());
3939 framed.push(crate::document::TRANSLATOR_FRAME_MARKER);
3940 framed.push(crate::translator::COLLECTION_CODE_PLATFORMS);
3941 framed.extend_from_slice(&payload);
3942 Some(self.encrypt_document(&framed))
3943 }
3944
3945 #[cfg(feature = "peat-lite-frame")]
3972 pub fn publish_peat_lite_document(&self, envelope_payload: &[u8]) -> Option<Vec<u8>> {
3973 let seq_num = self
3974 .peat_lite_seq
3975 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
3976 let framed = crate::peat_lite_frame::encode_peat_lite_document(
3977 self.node_id().as_u32(),
3978 seq_num,
3979 envelope_payload,
3980 )
3981 .ok()?;
3982 Some(self.encrypt_document(&framed))
3983 }
3984
3985 pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<PeatPeer> {
3987 self.peer_manager.peers_needing_sync(now_ms)
3988 }
3989
3990 fn notify(&self, event: PeatEvent) {
3993 self.observers.notify(event);
3994 }
3995
3996 fn notify_mesh_state_changed(&self) {
3997 self.notify(PeatEvent::MeshStateChanged {
3998 peer_count: self.peer_manager.peer_count(),
3999 connected_count: self.peer_manager.connected_count(),
4000 });
4001 }
4002
4003 pub fn check_canned_message(&self, source_node: u32, timestamp: u64, _ttl_ms: u64) -> bool {
4023 let mut id_bytes = [0u8; 16];
4026 id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
4027 id_bytes[4..12].copy_from_slice(×tamp.to_le_bytes());
4028 let message_id = crate::relay::MessageId::from_bytes(id_bytes);
4029
4030 let seen = self.seen_cache.lock().unwrap();
4032 !seen.has_seen(&message_id)
4033 }
4034
4035 pub fn mark_canned_message_seen(&self, source_node: u32, timestamp: u64) {
4040 let now = std::time::SystemTime::now()
4041 .duration_since(std::time::UNIX_EPOCH)
4042 .map(|d| d.as_millis() as u64)
4043 .unwrap_or(0);
4044
4045 let mut id_bytes = [0u8; 16];
4047 id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
4048 id_bytes[4..12].copy_from_slice(×tamp.to_le_bytes());
4049 let message_id = crate::relay::MessageId::from_bytes(id_bytes);
4050 let origin = NodeId::new(source_node);
4051
4052 let mut seen = self.seen_cache.lock().unwrap();
4053 seen.mark_seen(message_id, origin, now);
4054 }
4055
4056 pub fn get_connected_peer_identifiers(&self) -> Vec<String> {
4061 self.peer_manager.get_connected_identifiers()
4062 }
4063}
4064
4065#[derive(Debug, Clone, Default)]
4067pub struct DataReceivedResult {
4068 pub source_node: NodeId,
4070
4071 pub is_emergency: bool,
4073
4074 pub is_ack: bool,
4076
4077 pub counter_changed: bool,
4079
4080 pub emergency_changed: bool,
4082
4083 pub total_count: u64,
4085
4086 pub event_timestamp: u64,
4088
4089 pub relay_data: Option<Vec<u8>>,
4094
4095 pub origin_node: Option<NodeId>,
4097
4098 pub hop_count: u8,
4100
4101 pub callsign: Option<String>,
4104
4105 pub battery_percent: Option<u8>,
4107
4108 pub heart_rate: Option<u8>,
4110
4111 pub event_type: Option<u8>,
4113
4114 pub latitude: Option<f32>,
4116
4117 pub longitude: Option<f32>,
4119
4120 pub altitude: Option<f32>,
4122
4123 pub activity_level: Option<u8>,
4128
4129 pub alerts: Option<u8>,
4136
4137 pub decoded_translator_frame: Option<DecodedTranslatorFrame>,
4145
4146 pub peat_lite_document: Option<PeatLiteDocumentFrame>,
4161}
4162
4163#[derive(Debug, Clone)]
4180pub struct PeatLiteDocumentFrame {
4181 pub source_node_id: u32,
4184 pub seq_num: u32,
4187 pub flags: u8,
4189 pub collection: String,
4192 pub doc_id: String,
4195 pub timestamp_ms: i64,
4197 pub body: Vec<u8>,
4200}
4201
4202impl PeatLiteDocumentFrame {
4203 #[inline]
4205 pub fn is_tombstone(&self) -> bool {
4206 self.flags & 0x01 != 0
4209 }
4210}
4211
4212#[derive(Debug, Clone)]
4222pub struct DecodedTranslatorFrame {
4223 pub collection: String,
4225 pub doc_json: String,
4229 pub peer: Option<String>,
4231}
4232
4233#[cfg(feature = "translator-codec")]
4238#[derive(Debug, Clone)]
4239enum TranslatorMarkerOutcome {
4240 NotTranslatorMarker,
4243 Decoded(DecodedTranslatorFrame),
4246 Handled,
4249}
4250
4251impl DataReceivedResult {
4252 #[cfg(feature = "translator-codec")]
4260 pub(crate) fn translator_frame(source_node: NodeId, frame: DecodedTranslatorFrame) -> Self {
4261 Self {
4262 source_node,
4263 decoded_translator_frame: Some(frame),
4264 ..Default::default()
4265 }
4266 }
4267
4268 fn peripheral_fields(peripheral: &Option<crate::sync::crdt::Peripheral>) -> PeripheralFields {
4276 match peripheral {
4277 Some(p) => {
4278 let callsign = {
4279 let s = p.callsign_str();
4280 if s.is_empty() {
4281 None
4282 } else {
4283 Some(s.to_string())
4284 }
4285 };
4286 let battery_percent = if p.health.battery_percent > 0 {
4287 Some(p.health.battery_percent)
4288 } else {
4289 None
4290 };
4291 let (latitude, longitude, altitude) = match &p.location {
4292 Some(loc) => (Some(loc.latitude), Some(loc.longitude), loc.altitude),
4293 None => (None, None, None),
4294 };
4295 PeripheralFields {
4296 callsign,
4297 battery_percent,
4298 heart_rate: p.health.heart_rate,
4299 event_type: p.last_event.as_ref().map(|e| e.event_type as u8),
4300 latitude,
4301 longitude,
4302 altitude,
4303 activity_level: Some(p.health.activity),
4304 alerts: Some(p.health.alerts),
4305 }
4306 }
4307 None => PeripheralFields::default(),
4308 }
4309 }
4310}
4311
4312#[derive(Debug, Clone, Default)]
4320pub struct PeripheralFields {
4321 pub callsign: Option<String>,
4323 pub battery_percent: Option<u8>,
4326 pub heart_rate: Option<u8>,
4329 pub event_type: Option<u8>,
4331 pub latitude: Option<f32>,
4333 pub longitude: Option<f32>,
4335 pub altitude: Option<f32>,
4338 pub activity_level: Option<u8>,
4340 pub alerts: Option<u8>,
4342}
4343
4344#[derive(Debug, Clone)]
4346pub struct RelayDecision {
4347 pub payload: Vec<u8>,
4349
4350 pub origin_node: NodeId,
4352
4353 pub hop_count: u8,
4355
4356 pub should_relay: bool,
4358
4359 pub relay_envelope: Option<RelayEnvelope>,
4363}
4364
4365impl RelayDecision {
4366 pub fn relay_data(&self) -> Option<Vec<u8>> {
4370 self.relay_envelope.as_ref().map(|e| e.encode())
4371 }
4372}
4373
4374#[cfg(all(test, feature = "std"))]
4375mod tests {
4376 use super::*;
4377 use crate::observer::CollectingObserver;
4378
4379 const TEST_TIMESTAMP: u64 = 1705276800000;
4381
4382 fn create_mesh(node_id: u32, callsign: &str) -> PeatMesh {
4383 let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST");
4384 PeatMesh::new(config)
4385 }
4386
4387 #[test]
4388 fn test_mesh_creation() {
4389 let mesh = create_mesh(0x12345678, "ALPHA-1");
4390
4391 assert_eq!(mesh.node_id().as_u32(), 0x12345678);
4392 assert_eq!(mesh.callsign(), "ALPHA-1");
4393 assert_eq!(mesh.mesh_id(), "TEST");
4394 assert_eq!(mesh.device_name(), "PEAT_TEST-12345678");
4395 }
4396
4397 #[test]
4398 fn test_peer_discovery() {
4399 let mesh = create_mesh(0x11111111, "ALPHA-1");
4400 let observer = Arc::new(CollectingObserver::new());
4401 mesh.add_observer(observer.clone());
4402
4403 let peer = mesh.on_ble_discovered(
4405 "device-uuid",
4406 Some("PEAT_TEST-22222222"),
4407 -65,
4408 Some("TEST"),
4409 1000,
4410 );
4411
4412 assert!(peer.is_some());
4413 let peer = peer.unwrap();
4414 assert_eq!(peer.node_id.as_u32(), 0x22222222);
4415
4416 let events = observer.events();
4418 assert!(events
4419 .iter()
4420 .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
4421 assert!(events
4422 .iter()
4423 .any(|e| matches!(e, PeatEvent::MeshStateChanged { .. })));
4424 }
4425
4426 #[test]
4427 fn test_connection_lifecycle() {
4428 let mesh = create_mesh(0x11111111, "ALPHA-1");
4429 let observer = Arc::new(CollectingObserver::new());
4430 mesh.add_observer(observer.clone());
4431
4432 mesh.on_ble_discovered(
4434 "device-uuid",
4435 Some("PEAT_TEST-22222222"),
4436 -65,
4437 Some("TEST"),
4438 1000,
4439 );
4440
4441 let node_id = mesh.on_ble_connected("device-uuid", 2000);
4442 assert_eq!(node_id, Some(NodeId::new(0x22222222)));
4443 assert_eq!(mesh.connected_count(), 1);
4444
4445 let node_id = mesh.on_ble_disconnected("device-uuid", DisconnectReason::RemoteRequest);
4447 assert_eq!(node_id, Some(NodeId::new(0x22222222)));
4448 assert_eq!(mesh.connected_count(), 0);
4449
4450 let events = observer.events();
4452 assert!(events
4453 .iter()
4454 .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
4455 assert!(events
4456 .iter()
4457 .any(|e| matches!(e, PeatEvent::PeerDisconnected { .. })));
4458 }
4459
4460 #[test]
4461 fn test_emergency_flow() {
4462 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4463 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4464
4465 let observer2 = Arc::new(CollectingObserver::new());
4466 mesh2.add_observer(observer2.clone());
4467
4468 let doc = mesh1.send_emergency(TEST_TIMESTAMP);
4470 assert!(mesh1.is_emergency_active());
4471
4472 let result =
4474 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4475
4476 assert!(result.is_some());
4477 let result = result.unwrap();
4478 assert!(result.is_emergency);
4479 assert_eq!(result.source_node.as_u32(), 0x11111111);
4480
4481 let events = observer2.events();
4483 assert!(events
4484 .iter()
4485 .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
4486 }
4487
4488 #[test]
4489 fn test_ack_flow() {
4490 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4491 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4492
4493 let observer2 = Arc::new(CollectingObserver::new());
4494 mesh2.add_observer(observer2.clone());
4495
4496 let doc = mesh1.send_ack(TEST_TIMESTAMP);
4498 assert!(mesh1.is_ack_active());
4499
4500 let result =
4502 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4503
4504 assert!(result.is_some());
4505 let result = result.unwrap();
4506 assert!(result.is_ack);
4507
4508 let events = observer2.events();
4510 assert!(events
4511 .iter()
4512 .any(|e| matches!(e, PeatEvent::AckReceived { .. })));
4513 }
4514
4515 #[test]
4516 fn test_tick_cleanup() {
4517 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4518 .with_peer_timeout(10_000);
4519 let mesh = PeatMesh::new(config);
4520
4521 let observer = Arc::new(CollectingObserver::new());
4522 mesh.add_observer(observer.clone());
4523
4524 mesh.on_ble_discovered(
4526 "device-uuid",
4527 Some("PEAT_TEST-22222222"),
4528 -65,
4529 Some("TEST"),
4530 1000,
4531 );
4532 assert_eq!(mesh.peer_count(), 1);
4533
4534 mesh.tick(5000);
4536 assert_eq!(mesh.peer_count(), 1);
4537
4538 mesh.tick(20000);
4540 assert_eq!(mesh.peer_count(), 0);
4541
4542 let events = observer.events();
4544 assert!(events
4545 .iter()
4546 .any(|e| matches!(e, PeatEvent::PeerLost { .. })));
4547 }
4548
4549 #[test]
4550 fn test_tick_sync_broadcast() {
4551 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4552 .with_sync_interval(5000);
4553 let mesh = PeatMesh::new(config);
4554
4555 mesh.on_ble_discovered(
4557 "device-uuid",
4558 Some("PEAT_TEST-22222222"),
4559 -65,
4560 Some("TEST"),
4561 1000,
4562 );
4563 mesh.on_ble_connected("device-uuid", 1000);
4564
4565 let _result = mesh.tick(0);
4567 let result = mesh.tick(3000);
4571 assert!(result.is_none());
4572
4573 let result = mesh.tick(6000);
4575 assert!(result.is_some());
4576
4577 let result = mesh.tick(6100);
4579 assert!(result.is_none());
4580
4581 let result = mesh.tick(12000);
4583 assert!(result.is_some());
4584 }
4585
4586 #[test]
4587 fn test_incoming_connection() {
4588 let mesh = create_mesh(0x11111111, "ALPHA-1");
4589 let observer = Arc::new(CollectingObserver::new());
4590 mesh.add_observer(observer.clone());
4591
4592 let is_new = mesh.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
4594
4595 assert!(is_new);
4596 assert_eq!(mesh.peer_count(), 1);
4597 assert_eq!(mesh.connected_count(), 1);
4598
4599 let events = observer.events();
4601 assert!(events
4602 .iter()
4603 .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
4604 assert!(events
4605 .iter()
4606 .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
4607 }
4608
4609 #[test]
4610 fn test_mesh_filtering() {
4611 let mesh = create_mesh(0x11111111, "ALPHA-1");
4612
4613 let peer = mesh.on_ble_discovered(
4615 "device-uuid-1",
4616 Some("PEAT_OTHER-22222222"),
4617 -65,
4618 Some("OTHER"),
4619 1000,
4620 );
4621 assert!(peer.is_none());
4622 assert_eq!(mesh.peer_count(), 0);
4623
4624 let peer = mesh.on_ble_discovered(
4626 "device-uuid-2",
4627 Some("PEAT_TEST-33333333"),
4628 -65,
4629 Some("TEST"),
4630 1000,
4631 );
4632 assert!(peer.is_some());
4633 assert_eq!(mesh.peer_count(), 1);
4634 }
4635
4636 fn create_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
4639 let config =
4640 PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_encryption(secret);
4641 PeatMesh::new(config)
4642 }
4643
4644 #[test]
4645 fn test_encryption_enabled() {
4646 let secret = [0x42u8; 32];
4647 let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4648
4649 assert!(mesh.is_encryption_enabled());
4650 }
4651
4652 #[test]
4653 fn test_encryption_disabled_by_default() {
4654 let mesh = create_mesh(0x11111111, "ALPHA-1");
4655
4656 assert!(!mesh.is_encryption_enabled());
4657 }
4658
4659 #[test]
4660 fn test_encrypted_document_exchange() {
4661 let secret = [0x42u8; 32];
4662 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4663 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4664
4665 let doc = mesh1.build_document();
4667
4668 assert!(doc.len() >= 2);
4670 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4671
4672 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4674
4675 assert!(result.is_some());
4676 let result = result.unwrap();
4677 assert_eq!(result.source_node.as_u32(), 0x11111111);
4678 }
4679
4680 #[test]
4681 fn test_encrypted_emergency_exchange() {
4682 let secret = [0x42u8; 32];
4683 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4684 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4685
4686 let observer = Arc::new(CollectingObserver::new());
4687 mesh2.add_observer(observer.clone());
4688
4689 let doc = mesh1.send_emergency(TEST_TIMESTAMP);
4691
4692 let result =
4694 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4695
4696 assert!(result.is_some());
4697 let result = result.unwrap();
4698 assert!(result.is_emergency);
4699
4700 let events = observer.events();
4702 assert!(events
4703 .iter()
4704 .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
4705 }
4706
4707 #[test]
4708 fn test_wrong_key_fails_decrypt() {
4709 let secret1 = [0x42u8; 32];
4710 let secret2 = [0x43u8; 32]; let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4712 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4713
4714 let doc = mesh1.build_document();
4716
4717 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4719
4720 assert!(result.is_none());
4721 }
4722
4723 #[test]
4724 fn test_unencrypted_mesh_can_read_unencrypted() {
4725 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4726 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4727
4728 let doc = mesh1.build_document();
4730
4731 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4733
4734 assert!(result.is_some());
4735 }
4736
4737 #[test]
4738 fn test_encrypted_mesh_can_receive_unencrypted() {
4739 let secret = [0x42u8; 32];
4741 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); let doc = mesh1.build_document();
4746
4747 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4749
4750 assert!(result.is_some());
4751 }
4752
4753 #[test]
4754 fn test_unencrypted_mesh_cannot_receive_encrypted() {
4755 let secret = [0x42u8; 32];
4756 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret); let mesh2 = create_mesh(0x22222222, "BRAVO-1"); let doc = mesh1.build_document();
4761
4762 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4764
4765 assert!(result.is_none());
4766 }
4767
4768 #[test]
4769 fn test_enable_disable_encryption() {
4770 let mut mesh = create_mesh(0x11111111, "ALPHA-1");
4771
4772 assert!(!mesh.is_encryption_enabled());
4773
4774 let secret = [0x42u8; 32];
4776 mesh.enable_encryption(&secret);
4777 assert!(mesh.is_encryption_enabled());
4778
4779 let doc = mesh.build_document();
4781 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4782
4783 mesh.disable_encryption();
4785 assert!(!mesh.is_encryption_enabled());
4786
4787 let doc = mesh.build_document();
4789 assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4790 }
4791
4792 #[test]
4793 fn test_encryption_overhead() {
4794 let secret = [0x42u8; 32];
4795 let mesh_encrypted = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4796 let mesh_unencrypted = create_mesh(0x22222222, "BRAVO-1");
4797
4798 let doc_encrypted = mesh_encrypted.build_document();
4799 let doc_unencrypted = mesh_unencrypted.build_document();
4800
4801 let overhead = doc_encrypted.len() - doc_unencrypted.len();
4807 assert_eq!(overhead, 30); }
4809
4810 #[test]
4813 fn test_peer_e2ee_enable_disable() {
4814 let mesh = create_mesh(0x11111111, "ALPHA-1");
4815
4816 assert!(!mesh.is_peer_e2ee_enabled());
4817 assert!(mesh.peer_e2ee_public_key().is_none());
4818
4819 mesh.enable_peer_e2ee();
4820 assert!(mesh.is_peer_e2ee_enabled());
4821 assert!(mesh.peer_e2ee_public_key().is_some());
4822
4823 mesh.disable_peer_e2ee();
4824 assert!(!mesh.is_peer_e2ee_enabled());
4825 }
4826
4827 #[test]
4828 fn test_peer_e2ee_initiate_session() {
4829 let mesh = create_mesh(0x11111111, "ALPHA-1");
4830 mesh.enable_peer_e2ee();
4831
4832 let key_exchange = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4833 assert!(key_exchange.is_some());
4834
4835 let key_exchange = key_exchange.unwrap();
4836 assert_eq!(key_exchange[0], crate::document::KEY_EXCHANGE_MARKER);
4838
4839 assert_eq!(mesh.peer_e2ee_session_count(), 1);
4841 assert_eq!(mesh.peer_e2ee_established_count(), 0);
4842 }
4843
4844 #[test]
4845 fn test_peer_e2ee_full_handshake() {
4846 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4847 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4848
4849 mesh1.enable_peer_e2ee();
4850 mesh2.enable_peer_e2ee();
4851
4852 let observer1 = Arc::new(CollectingObserver::new());
4853 let observer2 = Arc::new(CollectingObserver::new());
4854 mesh1.add_observer(observer1.clone());
4855 mesh2.add_observer(observer2.clone());
4856
4857 let key_exchange1 = mesh1
4859 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4860 .unwrap();
4861
4862 let response = mesh2.handle_key_exchange(&key_exchange1, 1000);
4864 assert!(response.is_some());
4865
4866 assert!(mesh2.has_peer_e2ee_session(NodeId::new(0x11111111)));
4868
4869 let key_exchange2 = response.unwrap();
4871 let _ = mesh1.handle_key_exchange(&key_exchange2, 1000);
4872
4873 assert!(mesh1.has_peer_e2ee_session(NodeId::new(0x22222222)));
4875
4876 let events1 = observer1.events();
4878 assert!(events1
4879 .iter()
4880 .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4881
4882 let events2 = observer2.events();
4883 assert!(events2
4884 .iter()
4885 .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4886 }
4887
4888 #[test]
4889 fn test_peer_e2ee_encrypt_decrypt() {
4890 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4891 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4892
4893 mesh1.enable_peer_e2ee();
4894 mesh2.enable_peer_e2ee();
4895
4896 let key_exchange1 = mesh1
4898 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4899 .unwrap();
4900 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4901 mesh1.handle_key_exchange(&key_exchange2, 1000);
4902
4903 let plaintext = b"Secret message from mesh1";
4905 let encrypted = mesh1.send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000);
4906 assert!(encrypted.is_some());
4907
4908 let encrypted = encrypted.unwrap();
4909 assert_eq!(encrypted[0], crate::document::PEER_E2EE_MARKER);
4911
4912 let observer2 = Arc::new(CollectingObserver::new());
4914 mesh2.add_observer(observer2.clone());
4915
4916 let decrypted = mesh2.handle_peer_e2ee_message(&encrypted, 2000);
4917 assert!(decrypted.is_some());
4918 assert_eq!(decrypted.unwrap(), plaintext);
4919
4920 let events = observer2.events();
4922 assert!(events.iter().any(|e| matches!(
4923 e,
4924 PeatEvent::PeerE2eeMessageReceived { from_node, data }
4925 if from_node.as_u32() == 0x11111111 && data == plaintext
4926 )));
4927 }
4928
4929 #[test]
4930 fn test_peer_e2ee_bidirectional() {
4931 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4932 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4933
4934 mesh1.enable_peer_e2ee();
4935 mesh2.enable_peer_e2ee();
4936
4937 let key_exchange1 = mesh1
4939 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4940 .unwrap();
4941 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4942 mesh1.handle_key_exchange(&key_exchange2, 1000);
4943
4944 let msg1 = mesh1
4946 .send_peer_e2ee(NodeId::new(0x22222222), b"Hello from mesh1", 2000)
4947 .unwrap();
4948 let dec1 = mesh2.handle_peer_e2ee_message(&msg1, 2000).unwrap();
4949 assert_eq!(dec1, b"Hello from mesh1");
4950
4951 let msg2 = mesh2
4953 .send_peer_e2ee(NodeId::new(0x11111111), b"Hello from mesh2", 2000)
4954 .unwrap();
4955 let dec2 = mesh1.handle_peer_e2ee_message(&msg2, 2000).unwrap();
4956 assert_eq!(dec2, b"Hello from mesh2");
4957 }
4958
4959 #[test]
4960 fn test_peer_e2ee_close_session() {
4961 let mesh = create_mesh(0x11111111, "ALPHA-1");
4962 mesh.enable_peer_e2ee();
4963
4964 let observer = Arc::new(CollectingObserver::new());
4965 mesh.add_observer(observer.clone());
4966
4967 mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4969 assert_eq!(mesh.peer_e2ee_session_count(), 1);
4970
4971 mesh.close_peer_e2ee(NodeId::new(0x22222222));
4973
4974 let events = observer.events();
4976 assert!(events
4977 .iter()
4978 .any(|e| matches!(e, PeatEvent::PeerE2eeClosed { .. })));
4979 }
4980
4981 #[test]
4982 fn test_peer_e2ee_without_enabling() {
4983 let mesh = create_mesh(0x11111111, "ALPHA-1");
4984
4985 let result = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4987 assert!(result.is_none());
4988
4989 let result = mesh.send_peer_e2ee(NodeId::new(0x22222222), b"test", 1000);
4990 assert!(result.is_none());
4991
4992 assert!(!mesh.has_peer_e2ee_session(NodeId::new(0x22222222)));
4993 }
4994
4995 #[test]
4996 fn test_peer_e2ee_overhead() {
4997 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4998 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4999
5000 mesh1.enable_peer_e2ee();
5001 mesh2.enable_peer_e2ee();
5002
5003 let key_exchange1 = mesh1
5005 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
5006 .unwrap();
5007 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
5008 mesh1.handle_key_exchange(&key_exchange2, 1000);
5009
5010 let plaintext = b"Test message";
5012 let encrypted = mesh1
5013 .send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000)
5014 .unwrap();
5015
5016 let overhead = encrypted.len() - plaintext.len();
5025 assert_eq!(overhead, 46);
5026 }
5027
5028 fn create_strict_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
5031 let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST")
5032 .with_encryption(secret)
5033 .with_strict_encryption();
5034 PeatMesh::new(config)
5035 }
5036
5037 #[test]
5038 fn test_strict_encryption_enabled() {
5039 let secret = [0x42u8; 32];
5040 let mesh = create_strict_encrypted_mesh(0x11111111, "ALPHA-1", secret);
5041
5042 assert!(mesh.is_encryption_enabled());
5043 assert!(mesh.is_strict_encryption_enabled());
5044 }
5045
5046 #[test]
5047 fn test_strict_encryption_disabled_by_default() {
5048 let secret = [0x42u8; 32];
5049 let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
5050
5051 assert!(mesh.is_encryption_enabled());
5052 assert!(!mesh.is_strict_encryption_enabled());
5053 }
5054
5055 #[test]
5056 fn test_strict_encryption_requires_encryption_enabled() {
5057 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
5059 .with_strict_encryption(); let mesh = PeatMesh::new(config);
5061
5062 assert!(!mesh.is_encryption_enabled());
5063 assert!(!mesh.is_strict_encryption_enabled());
5064 }
5065
5066 #[test]
5067 fn test_strict_mode_accepts_encrypted_documents() {
5068 let secret = [0x42u8; 32];
5069 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
5070 let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
5071
5072 let doc = mesh1.build_document();
5074 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
5075
5076 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
5078 assert!(result.is_some());
5079 }
5080
5081 #[test]
5082 fn test_strict_mode_rejects_unencrypted_documents() {
5083 let secret = [0x42u8; 32];
5084 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret); let observer = Arc::new(CollectingObserver::new());
5088 mesh2.add_observer(observer.clone());
5089
5090 let doc = mesh1.build_document();
5092 assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
5093
5094 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
5096 assert!(result.is_none());
5097
5098 let events = observer.events();
5100 assert!(events.iter().any(|e| matches!(
5101 e,
5102 PeatEvent::SecurityViolation {
5103 kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
5104 ..
5105 }
5106 )));
5107 }
5108
5109 #[test]
5110 fn test_non_strict_mode_accepts_unencrypted_documents() {
5111 let secret = [0x42u8; 32];
5112 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); let doc = mesh1.build_document();
5117
5118 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
5120 assert!(result.is_some());
5121 }
5122
5123 #[test]
5124 fn test_strict_mode_security_violation_event_includes_source() {
5125 let secret = [0x42u8; 32];
5126 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
5127 let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
5128
5129 let observer = Arc::new(CollectingObserver::new());
5130 mesh2.add_observer(observer.clone());
5131
5132 let doc = mesh1.build_document();
5133
5134 mesh2.on_ble_discovered(
5136 "test-device-uuid",
5137 Some("PEAT_TEST-11111111"),
5138 -65,
5139 Some("TEST"),
5140 500,
5141 );
5142 mesh2.on_ble_connected("test-device-uuid", 600);
5143
5144 let _result = mesh2.on_ble_data_received("test-device-uuid", &doc, 1000);
5145
5146 let events = observer.events();
5148 let violation = events.iter().find(|e| {
5149 matches!(
5150 e,
5151 PeatEvent::SecurityViolation {
5152 kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
5153 ..
5154 }
5155 )
5156 });
5157 assert!(violation.is_some());
5158
5159 if let Some(PeatEvent::SecurityViolation { source, .. }) = violation {
5160 assert!(source.is_some());
5161 assert_eq!(source.as_ref().unwrap(), "test-device-uuid");
5162 }
5163 }
5164
5165 #[test]
5166 fn test_decryption_failure_emits_security_violation() {
5167 let secret1 = [0x42u8; 32];
5168 let secret2 = [0x43u8; 32]; let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
5170 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
5171
5172 let observer = Arc::new(CollectingObserver::new());
5173 mesh2.add_observer(observer.clone());
5174
5175 let doc = mesh1.build_document();
5177
5178 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
5180 assert!(result.is_none());
5181
5182 let events = observer.events();
5184 assert!(events.iter().any(|e| matches!(
5185 e,
5186 PeatEvent::SecurityViolation {
5187 kind: crate::observer::SecurityViolationKind::DecryptionFailed,
5188 ..
5189 }
5190 )));
5191 }
5192
5193 #[test]
5194 fn test_strict_mode_builder_chain() {
5195 let secret = [0x42u8; 32];
5196 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
5197 .with_encryption(secret)
5198 .with_strict_encryption()
5199 .with_sync_interval(10_000)
5200 .with_peer_timeout(60_000);
5201
5202 let mesh = PeatMesh::new(config);
5203
5204 assert!(mesh.is_encryption_enabled());
5205 assert!(mesh.is_strict_encryption_enabled());
5206 }
5207
5208 fn create_relay_mesh(node_id: u32, callsign: &str) -> PeatMesh {
5211 let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_relay();
5212 PeatMesh::new(config)
5213 }
5214
5215 #[test]
5216 fn test_relay_disabled_by_default() {
5217 let mesh = create_mesh(0x11111111, "ALPHA-1");
5218 assert!(!mesh.is_relay_enabled());
5219 }
5220
5221 #[test]
5222 fn test_relay_enabled() {
5223 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5224 assert!(mesh.is_relay_enabled());
5225 }
5226
5227 #[test]
5228 fn test_relay_config_builder() {
5229 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
5230 .with_relay()
5231 .with_max_relay_hops(5)
5232 .with_relay_fanout(3)
5233 .with_seen_cache_ttl(60_000);
5234
5235 assert!(config.enable_relay);
5236 assert_eq!(config.max_relay_hops, 5);
5237 assert_eq!(config.relay_fanout, 3);
5238 assert_eq!(config.seen_cache_ttl_ms, 60_000);
5239 }
5240
5241 #[test]
5242 fn test_seen_message_deduplication() {
5243 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5244 let origin = NodeId::new(0x22222222);
5245 let msg_id = crate::relay::MessageId::from_content(origin, 1000, 0xDEADBEEF);
5246
5247 assert!(mesh.mark_message_seen(msg_id, origin, 1000));
5249
5250 assert!(!mesh.mark_message_seen(msg_id, origin, 2000));
5252
5253 assert_eq!(mesh.seen_cache_size(), 1);
5254 }
5255
5256 #[test]
5257 fn test_wrap_for_relay() {
5258 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5259
5260 let payload = vec![1, 2, 3, 4, 5];
5261 let wrapped = mesh.wrap_for_relay(payload.clone());
5262
5263 assert_eq!(wrapped[0], crate::relay::RELAY_ENVELOPE_MARKER);
5265
5266 let envelope = crate::relay::RelayEnvelope::decode(&wrapped).unwrap();
5268 assert_eq!(envelope.payload, payload);
5269 assert_eq!(envelope.origin_node, NodeId::new(0x11111111));
5270 assert_eq!(envelope.hop_count, 0);
5271 }
5272
5273 #[test]
5274 fn test_process_relay_envelope_new_message() {
5275 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5276 let observer = Arc::new(CollectingObserver::new());
5277 mesh.add_observer(observer.clone());
5278
5279 let payload = vec![1, 2, 3, 4, 5];
5281 let envelope =
5282 crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
5283 .with_max_hops(7);
5284 let data = envelope.encode();
5285
5286 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
5288
5289 assert!(decision.is_some());
5290 let decision = decision.unwrap();
5291 assert_eq!(decision.payload, payload);
5292 assert_eq!(decision.origin_node.as_u32(), 0x22222222);
5293 assert_eq!(decision.hop_count, 0);
5294 assert!(decision.should_relay);
5295 assert!(decision.relay_envelope.is_some());
5296
5297 let relay_env = decision.relay_envelope.unwrap();
5299 assert_eq!(relay_env.hop_count, 1);
5300 }
5301
5302 #[test]
5303 fn test_process_relay_envelope_duplicate() {
5304 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5305 let observer = Arc::new(CollectingObserver::new());
5306 mesh.add_observer(observer.clone());
5307
5308 let payload = vec![1, 2, 3, 4, 5];
5309 let envelope = crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload);
5310 let data = envelope.encode();
5311
5312 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
5314 assert!(decision.is_some());
5315
5316 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 2000);
5318 assert!(decision.is_none());
5319
5320 let events = observer.events();
5322 assert!(events
5323 .iter()
5324 .any(|e| matches!(e, PeatEvent::DuplicateMessageDropped { .. })));
5325 }
5326
5327 #[test]
5328 fn test_process_relay_envelope_ttl_expired() {
5329 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5330 let observer = Arc::new(CollectingObserver::new());
5331 mesh.add_observer(observer.clone());
5332
5333 let payload = vec![1, 2, 3, 4, 5];
5335 let mut envelope =
5336 crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
5337 .with_max_hops(3);
5338
5339 envelope = envelope.relay().unwrap(); envelope = envelope.relay().unwrap(); envelope = envelope.relay().unwrap(); let data = envelope.encode();
5345
5346 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
5348
5349 assert!(decision.is_some());
5350 let decision = decision.unwrap();
5351 assert_eq!(decision.payload, payload);
5352 assert!(!decision.should_relay); assert!(decision.relay_envelope.is_none());
5354
5355 let events = observer.events();
5357 assert!(events
5358 .iter()
5359 .any(|e| matches!(e, PeatEvent::MessageTtlExpired { .. })));
5360 }
5361
5362 #[test]
5363 fn test_build_relay_document() {
5364 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5365
5366 let relay_doc = mesh.build_relay_document();
5367
5368 assert_eq!(relay_doc[0], crate::relay::RELAY_ENVELOPE_MARKER);
5370
5371 let envelope = crate::relay::RelayEnvelope::decode(&relay_doc).unwrap();
5373 assert_eq!(envelope.origin_node.as_u32(), 0x11111111);
5374
5375 let doc = crate::document::PeatDocument::decode(&envelope.payload);
5377 assert!(doc.is_some());
5378 }
5379
5380 #[test]
5381 fn test_relay_targets_excludes_source() {
5382 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5383
5384 mesh.on_ble_discovered(
5386 "peer-1",
5387 Some("PEAT_TEST-22222222"),
5388 -60,
5389 Some("TEST"),
5390 1000,
5391 );
5392 mesh.on_ble_connected("peer-1", 1000);
5393
5394 mesh.on_ble_discovered(
5395 "peer-2",
5396 Some("PEAT_TEST-33333333"),
5397 -65,
5398 Some("TEST"),
5399 1000,
5400 );
5401 mesh.on_ble_connected("peer-2", 1000);
5402
5403 mesh.on_ble_discovered(
5404 "peer-3",
5405 Some("PEAT_TEST-44444444"),
5406 -70,
5407 Some("TEST"),
5408 1000,
5409 );
5410 mesh.on_ble_connected("peer-3", 1000);
5411
5412 let targets = mesh.get_relay_targets(Some(NodeId::new(0x33333333)));
5414
5415 assert!(targets.iter().all(|p| p.node_id.as_u32() != 0x33333333));
5417 }
5418
5419 #[test]
5420 fn test_clear_seen_cache() {
5421 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
5422 let origin = NodeId::new(0x22222222);
5423
5424 mesh.mark_message_seen(
5426 crate::relay::MessageId::from_content(origin, 1000, 0x11111111),
5427 origin,
5428 1000,
5429 );
5430 mesh.mark_message_seen(
5431 crate::relay::MessageId::from_content(origin, 2000, 0x22222222),
5432 origin,
5433 2000,
5434 );
5435
5436 assert_eq!(mesh.seen_cache_size(), 2);
5437
5438 mesh.clear_seen_cache();
5440 assert_eq!(mesh.seen_cache_size(), 0);
5441 }
5442
5443 #[cfg(feature = "peat-lite-frame")]
5453 #[test]
5454 fn publish_peat_lite_document_emits_well_formed_frame() {
5455 use peat_lite::protocol::document as pl_doc;
5456
5457 let mesh = create_mesh(0xCAFEBABE, "ALPHA-1");
5458
5459 let mut env = vec![0u8; 256];
5463 let env_n = pl_doc::encode(
5464 0,
5465 "markers",
5466 "test-id",
5467 1_700_000_000_000,
5468 b"body",
5469 &mut env,
5470 )
5471 .expect("envelope encode");
5472 env.truncate(env_n);
5473
5474 let wire = mesh
5478 .publish_peat_lite_document(&env)
5479 .expect("publish should succeed");
5480
5481 let decrypted = mesh
5484 .decrypt_document(&wire, None)
5485 .expect("decrypt own outbound");
5486
5487 assert!(
5489 crate::peat_lite_frame::is_peat_lite_frame(&decrypted),
5490 "decrypted outbound must start with peat-lite MAGIC"
5491 );
5492
5493 match crate::peat_lite_frame::try_handle_peat_lite_frame(&decrypted) {
5494 crate::peat_lite_frame::PeatLiteFrameOutcome::Decoded(doc) => {
5495 assert_eq!(doc.source_node_id, 0xCAFEBABE);
5496 assert_eq!(doc.seq_num, 0, "first publish must use seq=0");
5497 assert_eq!(doc.collection, "markers");
5498 assert_eq!(doc.doc_id, "test-id");
5499 assert_eq!(doc.body.as_slice(), b"body");
5500 }
5501 other => panic!("decoded outbound frame mis-routed: {:?}", other),
5502 }
5503
5504 let wire2 = mesh
5509 .publish_peat_lite_document(&env)
5510 .expect("second publish should succeed");
5511 let decrypted2 = mesh
5512 .decrypt_document(&wire2, None)
5513 .expect("decrypt second outbound");
5514 match crate::peat_lite_frame::try_handle_peat_lite_frame(&decrypted2) {
5515 crate::peat_lite_frame::PeatLiteFrameOutcome::Decoded(doc) => {
5516 assert_eq!(
5517 doc.seq_num, 1,
5518 "second publish must increment seq, not collide with first"
5519 );
5520 }
5521 other => panic!("second outbound frame mis-routed: {:?}", other),
5522 }
5523 }
5524
5525 #[cfg(feature = "peat-lite-frame")]
5532 #[test]
5533 fn peat_lite_frames_do_not_collide_with_translator_routing() {
5534 use peat_lite::protocol::document as pl_doc;
5535
5536 let mesh = create_mesh(0x11111111, "ALPHA-1");
5537 let mut env = vec![0u8; 64];
5538 let n = pl_doc::encode(0, "tracks", "id", 0, b"x", &mut env).expect("envelope encode");
5539 env.truncate(n);
5540
5541 let wire = mesh.publish_peat_lite_document(&env).expect("publish");
5542 let decrypted = mesh.decrypt_document(&wire, None).expect("decrypt");
5543
5544 assert_eq!(
5547 decrypted[0], 0x50,
5548 "peat-lite frame's first byte must be 0x50, not in translator range"
5549 );
5550 assert!(
5551 !(0xB6..=0xBF).contains(&decrypted[0]),
5552 "peat-lite frame must not collide with translator marker range"
5553 );
5554 }
5555
5556 #[test]
5563 fn data_received_result_default_has_peat_lite_field() {
5564 let result = DataReceivedResult::default();
5565 assert!(
5566 result.peat_lite_document.is_none(),
5567 "default DataReceivedResult must have peat_lite_document = None"
5568 );
5569 }
5570}