1#[cfg(not(feature = "std"))]
57use alloc::{string::String, sync::Arc, vec::Vec};
58#[cfg(feature = "std")]
59use std::collections::HashMap;
60#[cfg(feature = "std")]
61use std::sync::Arc;
62
63use crate::document::{ENCRYPTED_MARKER, KEY_EXCHANGE_MARKER, PEER_E2EE_MARKER};
64#[cfg(feature = "mesh-translator")]
65use crate::document::{
66 TRANSLATOR_FRAME_MARKER, TRANSLATOR_RESERVED_MARKER_END, TRANSLATOR_RESERVED_MARKER_START,
67};
68use crate::document_sync::DocumentSync;
69use crate::gossip::{GossipStrategy, RandomFanout};
70use crate::observer::{DisconnectReason, PeatEvent, PeatObserver, SecurityViolationKind};
71use crate::peer::{
72 ConnectionStateGraph, FullStateCountSummary, IndirectPeer, PeatPeer, PeerConnectionState,
73 PeerDegree, PeerManagerConfig, StateCountSummary,
74};
75use crate::peer_manager::PeerManager;
76use crate::relay::{
77 MessageId, RelayEnvelope, SeenMessageCache, DEFAULT_MAX_HOPS, DEFAULT_SEEN_TTL_MS,
78 RELAY_ENVELOPE_MARKER,
79};
80use crate::security::{
81 DeviceIdentity, IdentityAttestation, IdentityRegistry, KeyExchangeMessage, MeshEncryptionKey,
82 PeerEncryptedMessage, PeerSessionManager, RegistryResult, SessionState,
83};
84use crate::sync::crdt::{EventType, Peripheral, PeripheralType};
85use crate::sync::delta::{DeltaEncoder, DeltaStats};
86use crate::sync::delta_document::{DeltaDocument, Operation};
87use crate::NodeId;
88
89#[cfg(feature = "std")]
90use crate::observer::ObserverManager;
91
92use crate::registry::DocumentRegistry;
93
94#[derive(Debug, Clone)]
96pub struct PeatMeshConfig {
97 pub node_id: NodeId,
99
100 pub callsign: String,
102
103 pub mesh_id: String,
105
106 pub peripheral_type: PeripheralType,
108
109 pub peer_config: PeerManagerConfig,
111
112 pub sync_interval_ms: u64,
114
115 pub auto_broadcast_events: bool,
117
118 pub encryption_secret: Option<[u8; 32]>,
124
125 pub strict_encryption: bool,
133
134 pub enable_relay: bool,
141
142 pub max_relay_hops: u8,
147
148 pub relay_fanout: usize,
154
155 pub seen_cache_ttl_ms: u64,
160}
161
162impl PeatMeshConfig {
163 pub fn new(node_id: NodeId, callsign: &str, mesh_id: &str) -> Self {
165 Self {
166 node_id,
167 callsign: callsign.into(),
168 mesh_id: mesh_id.into(),
169 peripheral_type: PeripheralType::SoldierSensor,
170 peer_config: PeerManagerConfig::with_mesh_id(mesh_id),
171 sync_interval_ms: 5000,
172 auto_broadcast_events: true,
173 encryption_secret: None,
174 strict_encryption: false,
175 enable_relay: false,
176 max_relay_hops: DEFAULT_MAX_HOPS,
177 relay_fanout: 2,
178 seen_cache_ttl_ms: DEFAULT_SEEN_TTL_MS,
179 }
180 }
181
182 pub fn with_encryption(mut self, secret: [u8; 32]) -> Self {
187 self.encryption_secret = Some(secret);
188 self
189 }
190
191 pub fn with_peripheral_type(mut self, ptype: PeripheralType) -> Self {
193 self.peripheral_type = ptype;
194 self
195 }
196
197 pub fn with_sync_interval(mut self, interval_ms: u64) -> Self {
199 self.sync_interval_ms = interval_ms;
200 self
201 }
202
203 pub fn with_peer_timeout(mut self, timeout_ms: u64) -> Self {
205 self.peer_config.peer_timeout_ms = timeout_ms;
206 self
207 }
208
209 pub fn with_max_peers(mut self, max: usize) -> Self {
211 self.peer_config.max_peers = max;
212 self
213 }
214
215 pub fn with_strict_encryption(mut self) -> Self {
223 self.strict_encryption = true;
224 self
225 }
226
227 pub fn with_relay(mut self) -> Self {
232 self.enable_relay = true;
233 self
234 }
235
236 pub fn with_max_relay_hops(mut self, max_hops: u8) -> Self {
240 self.max_relay_hops = max_hops;
241 self
242 }
243
244 pub fn with_relay_fanout(mut self, fanout: usize) -> Self {
248 self.relay_fanout = fanout.max(1);
249 self
250 }
251
252 pub fn with_seen_cache_ttl(mut self, ttl_ms: u64) -> Self {
256 self.seen_cache_ttl_ms = ttl_ms;
257 self
258 }
259}
260
261#[cfg(feature = "std")]
263type AppDocumentStore =
264 std::sync::RwLock<HashMap<(u8, u32, u64), Box<dyn core::any::Any + Send + Sync>>>;
265
266#[cfg(feature = "std")]
271pub struct PeatMesh {
272 config: PeatMeshConfig,
274
275 peer_manager: PeerManager,
277
278 document_sync: DocumentSync,
280
281 observers: ObserverManager,
283
284 last_sync_ms: std::sync::atomic::AtomicU32,
286
287 last_cleanup_ms: std::sync::atomic::AtomicU32,
289
290 encryption_key: Option<MeshEncryptionKey>,
292
293 peer_sessions: std::sync::Mutex<Option<PeerSessionManager>>,
295
296 connection_graph: std::sync::Mutex<ConnectionStateGraph>,
298
299 seen_cache: std::sync::Mutex<SeenMessageCache>,
301
302 gossip_strategy: Box<dyn GossipStrategy>,
304
305 delta_encoder: std::sync::Mutex<DeltaEncoder>,
310
311 identity: Option<DeviceIdentity>,
316
317 identity_registry: std::sync::Mutex<IdentityRegistry>,
321
322 peer_peripherals: std::sync::RwLock<HashMap<NodeId, Peripheral>>,
327
328 document_registry: DocumentRegistry,
333
334 app_documents: AppDocumentStore,
340
341 #[cfg(feature = "mesh-translator")]
350 ble_translator: std::sync::RwLock<crate::translator::BleTranslator>,
351
352 #[cfg(feature = "mesh-translator")]
366 decoded_document_callback: std::sync::RwLock<Option<Arc<dyn crate::DecodedDocumentCallback>>>,
367
368 #[cfg(all(feature = "mesh-translator", feature = "uniffi"))]
376 decoded_document_json_callback:
377 std::sync::RwLock<Option<Arc<dyn crate::DecodedDocumentJsonCallback>>>,
378}
379
380#[cfg(feature = "std")]
381impl PeatMesh {
382 pub fn new(config: PeatMeshConfig) -> Self {
384 let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
385 let document_sync = DocumentSync::with_peripheral_type(
386 config.node_id,
387 &config.callsign,
388 config.peripheral_type,
389 );
390
391 let encryption_key = config
393 .encryption_secret
394 .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
395
396 let connection_graph = ConnectionStateGraph::with_config(
398 config.peer_config.rssi_degraded_threshold,
399 config.peer_config.lost_timeout_ms,
400 );
401
402 let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
404
405 let gossip_strategy: Box<dyn GossipStrategy> =
407 Box::new(RandomFanout::new(config.relay_fanout));
408
409 let delta_encoder = DeltaEncoder::new(config.node_id);
411
412 let document_registry = DocumentRegistry::new();
413
414 Self {
415 config,
416 peer_manager,
417 document_sync,
418 observers: ObserverManager::new(),
419 last_sync_ms: std::sync::atomic::AtomicU32::new(0),
420 last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
421 encryption_key,
422 peer_sessions: std::sync::Mutex::new(None),
423 connection_graph: std::sync::Mutex::new(connection_graph),
424 seen_cache: std::sync::Mutex::new(seen_cache),
425 gossip_strategy,
426 delta_encoder: std::sync::Mutex::new(delta_encoder),
427 identity: None,
428 identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
429 peer_peripherals: std::sync::RwLock::new(HashMap::new()),
430 document_registry,
431 app_documents: std::sync::RwLock::new(HashMap::new()),
432 #[cfg(feature = "mesh-translator")]
433 ble_translator: std::sync::RwLock::new(
434 crate::translator::BleTranslator::with_defaults(),
435 ),
436 #[cfg(feature = "mesh-translator")]
437 decoded_document_callback: std::sync::RwLock::new(None),
438 #[cfg(all(feature = "mesh-translator", feature = "uniffi"))]
439 decoded_document_json_callback: std::sync::RwLock::new(None),
440 }
441 }
442
443 pub fn with_identity(config: PeatMeshConfig, identity: DeviceIdentity) -> Self {
449 let mut config = config;
451 config.node_id = identity.node_id();
452
453 let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
454 let document_sync = DocumentSync::with_peripheral_type(
455 config.node_id,
456 &config.callsign,
457 config.peripheral_type,
458 );
459
460 let encryption_key = config
461 .encryption_secret
462 .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
463
464 let connection_graph = ConnectionStateGraph::with_config(
465 config.peer_config.rssi_degraded_threshold,
466 config.peer_config.lost_timeout_ms,
467 );
468
469 let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
470 let gossip_strategy: Box<dyn GossipStrategy> =
471 Box::new(RandomFanout::new(config.relay_fanout));
472 let delta_encoder = DeltaEncoder::new(config.node_id);
473
474 let document_registry = DocumentRegistry::new();
475
476 Self {
477 config,
478 peer_manager,
479 document_sync,
480 observers: ObserverManager::new(),
481 last_sync_ms: std::sync::atomic::AtomicU32::new(0),
482 last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
483 encryption_key,
484 peer_sessions: std::sync::Mutex::new(None),
485 connection_graph: std::sync::Mutex::new(connection_graph),
486 seen_cache: std::sync::Mutex::new(seen_cache),
487 gossip_strategy,
488 delta_encoder: std::sync::Mutex::new(delta_encoder),
489 identity: Some(identity),
490 identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
491 peer_peripherals: std::sync::RwLock::new(HashMap::new()),
492 document_registry,
493 app_documents: std::sync::RwLock::new(HashMap::new()),
494 #[cfg(feature = "mesh-translator")]
495 ble_translator: std::sync::RwLock::new(
496 crate::translator::BleTranslator::with_defaults(),
497 ),
498 #[cfg(feature = "mesh-translator")]
499 decoded_document_callback: std::sync::RwLock::new(None),
500 #[cfg(all(feature = "mesh-translator", feature = "uniffi"))]
501 decoded_document_json_callback: std::sync::RwLock::new(None),
502 }
503 }
504
505 pub fn from_genesis(
513 genesis: &crate::security::MeshGenesis,
514 identity: DeviceIdentity,
515 callsign: &str,
516 ) -> Self {
517 let config = PeatMeshConfig::new(identity.node_id(), callsign, &genesis.mesh_id())
518 .with_encryption(genesis.encryption_secret());
519
520 Self::with_identity(config, identity)
521 }
522
523 #[cfg(feature = "std")]
549 pub fn from_persisted(
550 state: crate::security::PersistedState,
551 callsign: &str,
552 ) -> Result<Self, crate::security::PersistenceError> {
553 let identity = state.restore_identity()?;
555
556 let genesis = state.restore_genesis();
558
559 let mesh = if let Some(ref gen) = genesis {
561 Self::from_genesis(gen, identity, callsign)
562 } else {
563 let config = PeatMeshConfig::new(identity.node_id(), callsign, "RESTORED");
564 Self::with_identity(config, identity)
565 };
566
567 let restored_registry = state.restore_registry();
569 if let Ok(mut registry) = mesh.identity_registry.lock() {
570 *registry = restored_registry;
571 }
572
573 log::info!(
574 "PeatMesh restored from persisted state: node_id={:08X}, known_peers={}",
575 mesh.config.node_id.as_u32(),
576 mesh.known_identity_count()
577 );
578
579 Ok(mesh)
580 }
581
582 #[cfg(feature = "std")]
595 pub fn to_persisted_state(
596 &self,
597 genesis: Option<&crate::security::MeshGenesis>,
598 ) -> Option<crate::security::PersistedState> {
599 let identity = self.identity.as_ref()?;
600 let registry = self.identity_registry.lock().ok()?;
601
602 Some(crate::security::PersistedState::with_registry(
603 identity, genesis, ®istry,
604 ))
605 }
606
607 #[cfg(feature = "mesh-translator")]
621 pub fn set_decoded_document_callback(&self, cb: Arc<dyn crate::DecodedDocumentCallback>) {
622 if let Ok(mut slot) = self.decoded_document_callback.write() {
623 *slot = Some(cb);
624 }
625 }
626
627 #[cfg(all(feature = "mesh-translator", feature = "uniffi"))]
639 pub fn set_decoded_document_json_callback(
640 &self,
641 cb: Box<dyn crate::DecodedDocumentJsonCallback>,
642 ) {
643 let cb_arc: Arc<dyn crate::DecodedDocumentJsonCallback> = Arc::from(cb);
649 if let Ok(mut slot) = self.decoded_document_json_callback.write() {
650 *slot = Some(cb_arc);
651 }
652 }
653
654 #[cfg(feature = "mesh-translator")]
658 pub fn set_translator_config(&self, config: crate::translator::TranslationConfig) {
659 if let Ok(mut t) = self.ble_translator.write() {
660 *t = crate::translator::BleTranslator::new(config);
661 }
662 }
663
664 #[cfg(feature = "mesh-translator")]
677 fn try_handle_translator_marker(
678 &self,
679 decrypted: &[u8],
680 peer: Option<&str>,
681 source_node: Option<NodeId>,
682 ) -> bool {
683 if decrypted.is_empty() {
684 return false;
685 }
686 let marker = decrypted[0];
687
688 if (TRANSLATOR_RESERVED_MARKER_START..=TRANSLATOR_RESERVED_MARKER_END).contains(&marker) {
694 log::warn!(
695 "ble: dropping reserved translator-marker frame (marker=0x{marker:02X}, len={})",
696 decrypted.len()
697 );
698 return true;
699 }
700
701 if marker != TRANSLATOR_FRAME_MARKER {
702 return false;
703 }
704
705 if decrypted.len() < 2 {
707 log::warn!(
708 "ble: dropping truncated translator frame (len={}, missing collection code)",
709 decrypted.len()
710 );
711 return true;
712 }
713
714 let code = decrypted[1];
715 let payload = &decrypted[2..];
716
717 let (collection, decode_result) = {
720 let translator = match self.ble_translator.read() {
721 Ok(g) => g,
722 Err(_) => {
723 log::warn!("ble: translator RwLock poisoned; dropping frame");
724 return true;
725 }
726 };
727 let collection = match translator.code_to_collection(code) {
728 Some(c) => c.to_string(),
729 None => {
730 log::warn!(
731 "ble: dropping translator frame with unknown collection code 0x{code:02X}"
732 );
733 return true;
734 }
735 };
736
737 let mut ctx =
738 peat_mesh::transport::TranslationContext::inbound(peer.unwrap_or("unknown"))
739 .with_collection(collection.clone());
740 if let Some(node) = source_node {
741 ctx = ctx.with_local_wire_id(format!("{:08X}", node.as_u32()));
742 }
743
744 let result = translator.decode_inbound_sync(payload, &ctx);
745 (collection, result)
746 };
747
748 match decode_result {
749 Ok(Some(doc)) => {
750 let rust_cb = self
756 .decoded_document_callback
757 .read()
758 .ok()
759 .and_then(|g| g.as_ref().cloned());
760
761 #[cfg(feature = "uniffi")]
762 let json_cb = self
763 .decoded_document_json_callback
764 .read()
765 .ok()
766 .and_then(|g| g.as_ref().cloned());
767 #[cfg(not(feature = "uniffi"))]
768 let json_cb: Option<()> = None;
769
770 let any_callback = rust_cb.is_some() || json_cb.is_some();
771
772 if let Some(cb) = &rust_cb {
775 cb.on_document(&collection, doc.clone(), peer);
776 }
777
778 #[cfg(feature = "uniffi")]
790 if let Some(json_cb) = json_cb {
791 match serde_json::to_string(&doc) {
792 Ok(doc_json) => {
793 json_cb.on_document(
794 collection.clone(),
795 doc_json,
796 peer.map(str::to_string),
797 );
798 }
799 Err(e) => {
800 log::warn!(
801 "ble: failed to serialize decoded {} doc to JSON \
802 for UniFFI callback: {}",
803 collection,
804 e
805 );
806 }
807 }
808 }
809
810 if !any_callback {
811 log::debug!(
821 "ble: decoded {} frame but no DecodedDocument*Callback installed",
822 collection
823 );
824 self.notify(crate::observer::PeatEvent::TranslatorNoCallback {
825 collection: collection.clone(),
826 peer: peer.map(str::to_string),
827 });
828 }
829 }
830 Ok(None) => {
831 log::debug!(
832 "ble: codec declined translator frame for collection {}",
833 collection
834 );
835 }
836 Err(e) => {
837 log::warn!(
838 "ble: translator frame decode error (collection={}): {:#}",
839 collection,
840 e
841 );
842 }
843 }
844
845 true
846 }
847
848 pub fn is_encryption_enabled(&self) -> bool {
852 self.encryption_key.is_some()
853 }
854
855 pub fn is_strict_encryption_enabled(&self) -> bool {
859 self.config.strict_encryption && self.encryption_key.is_some()
860 }
861
862 pub fn enable_encryption(&mut self, secret: &[u8; 32]) {
867 self.encryption_key = Some(MeshEncryptionKey::from_shared_secret(
868 &self.config.mesh_id,
869 secret,
870 ));
871 }
872
873 pub fn disable_encryption(&mut self) {
875 self.encryption_key = None;
876 }
877
878 fn encrypt_document(&self, plaintext: &[u8]) -> Vec<u8> {
883 match &self.encryption_key {
884 Some(key) => {
885 match key.encrypt_to_bytes(plaintext) {
887 Ok(ciphertext) => {
888 let mut buf = Vec::with_capacity(2 + ciphertext.len());
889 buf.push(ENCRYPTED_MARKER);
890 buf.push(0x00); buf.extend_from_slice(&ciphertext);
892 buf
893 }
894 Err(e) => {
895 log::error!("Encryption failed: {}", e);
896 plaintext.to_vec()
898 }
899 }
900 }
901 None => plaintext.to_vec(),
902 }
903 }
904
905 fn decrypt_document<'a>(
913 &self,
914 data: &'a [u8],
915 source_hint: Option<&str>,
916 ) -> Option<std::borrow::Cow<'a, [u8]>> {
917 log::debug!(
918 "decrypt_document: len={}, first_byte=0x{:02X}, source={:?}",
919 data.len(),
920 data.first().copied().unwrap_or(0),
921 source_hint
922 );
923
924 if data.len() >= 2 && data[0] == ENCRYPTED_MARKER {
926 let _reserved = data[1];
928 let encrypted_payload = &data[2..];
929
930 log::debug!(
931 "decrypt_document: encrypted payload len={}, nonce+ciphertext",
932 encrypted_payload.len()
933 );
934
935 match &self.encryption_key {
936 Some(key) => match key.decrypt_from_bytes(encrypted_payload) {
937 Ok(plaintext) => {
938 log::debug!(
939 "decrypt_document: SUCCESS, plaintext len={}",
940 plaintext.len()
941 );
942 Some(std::borrow::Cow::Owned(plaintext))
943 }
944 Err(e) => {
945 log::warn!(
946 "decrypt_document: FAILED (wrong key or corrupted): {} [payload_len={}, source={:?}]",
947 e,
948 encrypted_payload.len(),
949 source_hint
950 );
951 self.notify(PeatEvent::SecurityViolation {
952 kind: SecurityViolationKind::DecryptionFailed,
953 source: source_hint.map(String::from),
954 });
955 None
956 }
957 },
958 None => {
959 log::warn!(
960 "decrypt_document: encryption not enabled but received encrypted doc"
961 );
962 None
963 }
964 }
965 } else {
966 if self.config.strict_encryption && self.encryption_key.is_some() {
969 log::warn!(
970 "Rejected unencrypted document in strict encryption mode (source: {:?})",
971 source_hint
972 );
973 self.notify(PeatEvent::SecurityViolation {
974 kind: SecurityViolationKind::UnencryptedInStrictMode,
975 source: source_hint.map(String::from),
976 });
977 None
978 } else {
979 Some(std::borrow::Cow::Borrowed(data))
981 }
982 }
983 }
984
985 pub fn decrypt_only(&self, data: &[u8]) -> Option<Vec<u8>> {
999 self.decrypt_document(data, None)
1000 .map(|cow| cow.into_owned())
1001 }
1002
1003 pub fn has_identity(&self) -> bool {
1007 self.identity.is_some()
1008 }
1009
1010 pub fn public_key(&self) -> Option<[u8; 32]> {
1012 self.identity.as_ref().map(|id| id.public_key())
1013 }
1014
1015 pub fn create_attestation(&self, now_ms: u64) -> Option<IdentityAttestation> {
1019 self.identity
1020 .as_ref()
1021 .map(|id| id.create_attestation(now_ms))
1022 }
1023
1024 pub fn verify_peer_identity(&self, attestation: &IdentityAttestation) -> RegistryResult {
1033 self.identity_registry
1034 .lock()
1035 .unwrap()
1036 .verify_or_register(attestation)
1037 }
1038
1039 pub fn is_peer_identity_known(&self, node_id: NodeId) -> bool {
1041 self.identity_registry.lock().unwrap().is_known(node_id)
1042 }
1043
1044 pub fn peer_public_key(&self, node_id: NodeId) -> Option<[u8; 32]> {
1046 self.identity_registry
1047 .lock()
1048 .unwrap()
1049 .get_public_key(node_id)
1050 .copied()
1051 }
1052
1053 pub fn known_identity_count(&self) -> usize {
1055 self.identity_registry.lock().unwrap().len()
1056 }
1057
1058 pub fn pre_register_peer_identity(&self, node_id: NodeId, public_key: [u8; 32], now_ms: u64) {
1063 self.identity_registry
1064 .lock()
1065 .unwrap()
1066 .pre_register(node_id, public_key, now_ms);
1067 }
1068
1069 pub fn forget_peer_identity(&self, node_id: NodeId) {
1073 self.identity_registry.lock().unwrap().remove(node_id);
1074 }
1075
1076 pub fn sign(&self, data: &[u8]) -> Option<[u8; 64]> {
1080 self.identity.as_ref().map(|id| id.sign(data))
1081 }
1082
1083 pub fn verify_peer_signature(
1088 &self,
1089 node_id: NodeId,
1090 data: &[u8],
1091 signature: &[u8; 64],
1092 ) -> bool {
1093 if let Some(public_key) = self.peer_public_key(node_id) {
1094 crate::security::verify_signature(&public_key, data, signature)
1095 } else {
1096 false
1097 }
1098 }
1099
1100 pub fn is_relay_enabled(&self) -> bool {
1104 self.config.enable_relay
1105 }
1106
1107 pub fn enable_relay(&mut self) {
1109 self.config.enable_relay = true;
1110 }
1111
1112 pub fn disable_relay(&mut self) {
1114 self.config.enable_relay = false;
1115 }
1116
1117 pub fn has_seen_message(&self, message_id: &MessageId) -> bool {
1121 self.seen_cache.lock().unwrap().has_seen(message_id)
1122 }
1123
1124 pub fn mark_message_seen(&self, message_id: MessageId, origin: NodeId, now_ms: u64) -> bool {
1128 self.seen_cache
1129 .lock()
1130 .unwrap()
1131 .check_and_mark(message_id, origin, now_ms)
1132 }
1133
1134 pub fn seen_cache_size(&self) -> usize {
1136 self.seen_cache.lock().unwrap().len()
1137 }
1138
1139 pub fn clear_seen_cache(&self) {
1141 self.seen_cache.lock().unwrap().clear();
1142 }
1143
1144 pub fn wrap_for_relay(&self, payload: Vec<u8>) -> Vec<u8> {
1149 let envelope = RelayEnvelope::broadcast(self.config.node_id, payload)
1150 .with_max_hops(self.config.max_relay_hops);
1151 envelope.encode()
1152 }
1153
1154 pub fn get_relay_targets(&self, exclude_peer: Option<NodeId>) -> Vec<PeatPeer> {
1159 let connected = self.peer_manager.get_connected_peers();
1160 let filtered: Vec<_> = if let Some(exclude) = exclude_peer {
1161 connected
1162 .into_iter()
1163 .filter(|p| p.node_id != exclude)
1164 .collect()
1165 } else {
1166 connected
1167 };
1168
1169 self.gossip_strategy
1170 .select_peers(&filtered)
1171 .into_iter()
1172 .cloned()
1173 .collect()
1174 }
1175
1176 pub fn process_relay_envelope(
1186 &self,
1187 data: &[u8],
1188 source_peer: NodeId,
1189 now_ms: u64,
1190 ) -> Option<RelayDecision> {
1191 let envelope = RelayEnvelope::decode(data)?;
1193
1194 if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
1197 let is_new = self.connection_graph.lock().unwrap().on_relay_received(
1198 source_peer,
1199 envelope.origin_node,
1200 envelope.hop_count,
1201 now_ms,
1202 );
1203
1204 if is_new {
1205 log::debug!(
1206 "Discovered indirect peer {:08X} via {:08X} ({} hops)",
1207 envelope.origin_node.as_u32(),
1208 source_peer.as_u32(),
1209 envelope.hop_count
1210 );
1211 }
1212 }
1213
1214 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
1216 let stats = self
1218 .seen_cache
1219 .lock()
1220 .unwrap()
1221 .get_stats(&envelope.message_id);
1222 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
1223
1224 self.notify(PeatEvent::DuplicateMessageDropped {
1225 origin_node: envelope.origin_node,
1226 seen_count,
1227 });
1228
1229 log::debug!(
1230 "Dropped duplicate message {} from {:08X} (seen {} times)",
1231 envelope.message_id,
1232 envelope.origin_node.as_u32(),
1233 seen_count
1234 );
1235 return None;
1236 }
1237
1238 if !envelope.can_relay() {
1240 self.notify(PeatEvent::MessageTtlExpired {
1241 origin_node: envelope.origin_node,
1242 hop_count: envelope.hop_count,
1243 });
1244
1245 log::debug!(
1246 "Message {} from {:08X} TTL expired at hop {}",
1247 envelope.message_id,
1248 envelope.origin_node.as_u32(),
1249 envelope.hop_count
1250 );
1251
1252 return Some(RelayDecision {
1254 payload: envelope.payload,
1255 origin_node: envelope.origin_node,
1256 hop_count: envelope.hop_count,
1257 should_relay: false,
1258 relay_envelope: None,
1259 });
1260 }
1261
1262 let should_relay = self.config.enable_relay;
1264 let relay_envelope = if should_relay {
1265 envelope.relay() } else {
1267 None
1268 };
1269
1270 Some(RelayDecision {
1271 payload: envelope.payload,
1272 origin_node: envelope.origin_node,
1273 hop_count: envelope.hop_count,
1274 should_relay,
1275 relay_envelope,
1276 })
1277 }
1278
1279 pub fn build_relay_document(&self) -> Vec<u8> {
1284 let doc = self.build_document(); self.wrap_for_relay(doc)
1286 }
1287
1288 pub fn register_peer_for_delta(&self, peer_id: &NodeId) {
1295 let mut encoder = self.delta_encoder.lock().unwrap();
1296 encoder.add_peer(peer_id);
1297 log::debug!(
1298 "Registered peer {:08X} for delta sync tracking",
1299 peer_id.as_u32()
1300 );
1301 }
1302
1303 pub fn unregister_peer_for_delta(&self, peer_id: &NodeId) {
1307 let mut encoder = self.delta_encoder.lock().unwrap();
1308 encoder.remove_peer(peer_id);
1309 log::debug!(
1310 "Unregistered peer {:08X} from delta sync tracking",
1311 peer_id.as_u32()
1312 );
1313 }
1314
1315 pub fn reset_peer_delta_state(&self, peer_id: &NodeId) {
1320 let mut encoder = self.delta_encoder.lock().unwrap();
1321 encoder.reset_peer(peer_id);
1322 log::debug!("Reset delta sync state for peer {:08X}", peer_id.as_u32());
1323 }
1324
1325 pub fn record_delta_sent(&self, peer_id: &NodeId, bytes: usize) {
1327 let mut encoder = self.delta_encoder.lock().unwrap();
1328 encoder.record_sent(peer_id, bytes);
1329 }
1330
1331 pub fn record_delta_received(&self, peer_id: &NodeId, bytes: usize, timestamp: u64) {
1333 let mut encoder = self.delta_encoder.lock().unwrap();
1334 encoder.record_received(peer_id, bytes, timestamp);
1335 }
1336
1337 pub fn delta_stats(&self) -> DeltaStats {
1342 self.delta_encoder.lock().unwrap().stats()
1343 }
1344
1345 pub fn peer_delta_stats(&self, peer_id: &NodeId) -> Option<(u64, u64, u32)> {
1349 let encoder = self.delta_encoder.lock().unwrap();
1350 encoder
1351 .get_peer_state(peer_id)
1352 .map(|state| (state.bytes_sent, state.bytes_received, state.sync_count))
1353 }
1354
1355 pub fn build_delta_document_for_peer(&self, peer_id: &NodeId, now_ms: u64) -> Option<Vec<u8>> {
1363 let mut all_operations: Vec<Operation> = Vec::new();
1365
1366 for (node_id_u32, count) in self.document_sync.counter_entries() {
1369 all_operations.push(Operation::IncrementCounter {
1370 counter_id: 0, node_id: NodeId::new(node_id_u32),
1372 amount: count,
1373 timestamp: count, });
1375 }
1376
1377 let peripheral = self.document_sync.peripheral_snapshot();
1380 let peripheral_timestamp = peripheral
1381 .last_event
1382 .as_ref()
1383 .map(|e| e.timestamp)
1384 .unwrap_or(1); all_operations.push(Operation::UpdatePeripheral {
1386 peripheral,
1387 timestamp: peripheral_timestamp,
1388 });
1389
1390 if let Some(emergency) = self.document_sync.emergency_snapshot() {
1392 let source_node = NodeId::new(emergency.source_node());
1393 let timestamp = emergency.timestamp();
1394
1395 all_operations.push(Operation::SetEmergency {
1397 source_node,
1398 timestamp,
1399 known_peers: emergency.all_nodes(),
1400 });
1401
1402 for acked_node in emergency.acked_nodes() {
1404 all_operations.push(Operation::AckEmergency {
1405 node_id: NodeId::new(acked_node),
1406 emergency_timestamp: timestamp,
1407 });
1408 }
1409 }
1410
1411 for app_op in self.app_document_delta_ops() {
1413 all_operations.push(Operation::App(app_op));
1414 }
1415
1416 let filtered_operations: Vec<Operation> = {
1418 let encoder = self.delta_encoder.lock().unwrap();
1419 if let Some(peer_state) = encoder.get_peer_state(peer_id) {
1420 all_operations
1421 .into_iter()
1422 .filter(|op| peer_state.needs_send(&op.key(), op.timestamp()))
1423 .collect()
1424 } else {
1425 all_operations
1427 }
1428 };
1429
1430 if filtered_operations.is_empty() {
1432 return None;
1433 }
1434
1435 {
1437 let mut encoder = self.delta_encoder.lock().unwrap();
1438 if let Some(peer_state) = encoder.get_peer_state_mut(peer_id) {
1439 for op in &filtered_operations {
1440 peer_state.mark_sent(&op.key(), op.timestamp());
1441 }
1442 }
1443 }
1444
1445 let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1447 for op in filtered_operations {
1448 delta.add_operation(op);
1449 }
1450
1451 let encoded = delta.encode();
1453 let result = self.encrypt_document(&encoded);
1454
1455 {
1457 let mut encoder = self.delta_encoder.lock().unwrap();
1458 encoder.record_sent(peer_id, result.len());
1459 }
1460
1461 Some(result)
1462 }
1463
1464 pub fn build_full_delta_document(&self, now_ms: u64) -> Vec<u8> {
1469 let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1470
1471 for (node_id_u32, count) in self.document_sync.counter_entries() {
1473 delta.add_operation(Operation::IncrementCounter {
1474 counter_id: 0,
1475 node_id: NodeId::new(node_id_u32),
1476 amount: count,
1477 timestamp: now_ms,
1478 });
1479 }
1480
1481 let peripheral = self.document_sync.peripheral_snapshot();
1483 let peripheral_timestamp = peripheral
1484 .last_event
1485 .as_ref()
1486 .map(|e| e.timestamp)
1487 .unwrap_or(now_ms);
1488 delta.add_operation(Operation::UpdatePeripheral {
1489 peripheral,
1490 timestamp: peripheral_timestamp,
1491 });
1492
1493 if let Some(emergency) = self.document_sync.emergency_snapshot() {
1495 let source_node = NodeId::new(emergency.source_node());
1496 let timestamp = emergency.timestamp();
1497
1498 delta.add_operation(Operation::SetEmergency {
1499 source_node,
1500 timestamp,
1501 known_peers: emergency.all_nodes(),
1502 });
1503
1504 for acked_node in emergency.acked_nodes() {
1505 delta.add_operation(Operation::AckEmergency {
1506 node_id: NodeId::new(acked_node),
1507 emergency_timestamp: timestamp,
1508 });
1509 }
1510 }
1511
1512 for app_op in self.app_document_delta_ops() {
1514 delta.add_operation(Operation::App(app_op));
1515 }
1516
1517 let encoded = delta.encode();
1518 self.encrypt_document(&encoded)
1519 }
1520
1521 fn process_delta_document_internal(
1525 &self,
1526 source_node: NodeId,
1527 data: &[u8],
1528 now_ms: u64,
1529 relay_data: Option<Vec<u8>>,
1530 origin_node: Option<NodeId>,
1531 hop_count: u8,
1532 ) -> Option<DataReceivedResult> {
1533 let delta = DeltaDocument::decode(data)?;
1535
1536 if delta.origin_node == self.config.node_id {
1538 return None;
1539 }
1540
1541 let mut counter_changed = false;
1543 let mut emergency_changed = false;
1544 let mut is_emergency = false;
1545 let mut is_ack = false;
1546 let mut event_timestamp = 0u64;
1547 let mut peer_peripheral: Option<crate::sync::crdt::Peripheral> = None;
1548
1549 log::info!(
1550 "Delta document from {:08X}: {} operations, data_len={}",
1551 delta.origin_node.as_u32(),
1552 delta.operations.len(),
1553 data.len()
1554 );
1555 for op in &delta.operations {
1556 log::info!(" Operation: {}", op.key());
1557 match op {
1558 Operation::IncrementCounter {
1559 node_id, amount, ..
1560 } => {
1561 let current = self.document_sync.counter_entries();
1563 let current_value = current
1564 .iter()
1565 .find(|(id, _)| *id == node_id.as_u32())
1566 .map(|(_, v)| *v)
1567 .unwrap_or(0);
1568
1569 if *amount > current_value {
1570 counter_changed = true;
1573 }
1574 }
1575 Operation::UpdatePeripheral {
1576 peripheral,
1577 timestamp,
1578 } => {
1579 if let Ok(mut peripherals) = self.peer_peripherals.write() {
1581 peripherals.insert(delta.origin_node, peripheral.clone());
1582 }
1583 peer_peripheral = Some(peripheral.clone());
1585 if *timestamp > event_timestamp {
1587 event_timestamp = *timestamp;
1588 }
1589 }
1590 Operation::SetEmergency { timestamp, .. } => {
1591 is_emergency = true;
1592 emergency_changed = true;
1593 event_timestamp = *timestamp;
1594 }
1595 Operation::AckEmergency {
1596 emergency_timestamp,
1597 ..
1598 } => {
1599 is_ack = true;
1600 emergency_changed = true;
1601 if *emergency_timestamp > event_timestamp {
1602 event_timestamp = *emergency_timestamp;
1603 }
1604 }
1605 Operation::ClearEmergency {
1606 emergency_timestamp,
1607 } => {
1608 emergency_changed = true;
1609 if *emergency_timestamp > event_timestamp {
1610 event_timestamp = *emergency_timestamp;
1611 }
1612 }
1613 Operation::App(app_op) => {
1614 let doc_timestamp = app_op.timestamp & 0x0000_FFFF_FFFF_FFFF;
1620
1621 log::info!(
1622 "App operation received: type={:02X} op_code={:02X} from {:08X} ts={} payload_len={}",
1623 app_op.type_id,
1624 app_op.op_code,
1625 app_op.source_node,
1626 doc_timestamp,
1627 app_op.payload.len()
1628 );
1629
1630 let doc_key = (app_op.type_id, app_op.source_node, doc_timestamp);
1632 let changed = {
1633 let mut docs = self.app_documents.write().unwrap();
1634
1635 if let Some(existing) = docs.get_mut(&doc_key) {
1636 self.document_registry.apply_delta_op(
1638 app_op.type_id,
1639 existing.as_mut(),
1640 app_op,
1641 )
1642 } else {
1643 if let Some(decoded) = self
1645 .document_registry
1646 .decode(app_op.type_id, &app_op.payload)
1647 {
1648 docs.insert(doc_key, decoded);
1649 true
1650 } else {
1651 log::debug!(
1654 "Received delta for unknown doc {:?}, waiting for full state",
1655 doc_key
1656 );
1657 false
1658 }
1659 }
1660 };
1661
1662 self.observers.notify(PeatEvent::app_document_received(
1664 app_op.type_id,
1665 NodeId::new(app_op.source_node),
1666 doc_timestamp,
1667 changed,
1668 ));
1669 }
1670 }
1671 }
1672
1673 self.peer_manager.record_sync(source_node, now_ms);
1675
1676 {
1678 let mut encoder = self.delta_encoder.lock().unwrap();
1679 encoder.record_received(&source_node, data.len(), now_ms);
1680 }
1681
1682 if is_emergency {
1684 self.notify(PeatEvent::EmergencyReceived {
1685 from_node: delta.origin_node,
1686 });
1687 } else if is_ack {
1688 self.notify(PeatEvent::AckReceived {
1689 from_node: delta.origin_node,
1690 });
1691 }
1692
1693 if counter_changed {
1694 let total_count = self.document_sync.total_count();
1695 self.notify(PeatEvent::DocumentSynced {
1696 from_node: delta.origin_node,
1697 total_count,
1698 });
1699 }
1700
1701 if relay_data.is_some() {
1703 let relay_targets = self.get_relay_targets(Some(source_node));
1704 self.notify(PeatEvent::MessageRelayed {
1705 origin_node: origin_node.unwrap_or(delta.origin_node),
1706 relay_count: relay_targets.len(),
1707 hop_count,
1708 });
1709 }
1710
1711 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
1712 DataReceivedResult::peripheral_fields(&peer_peripheral);
1713
1714 Some(DataReceivedResult {
1715 source_node: delta.origin_node,
1716 is_emergency,
1717 is_ack,
1718 counter_changed,
1719 emergency_changed,
1720 total_count: self.document_sync.total_count(),
1721 event_timestamp,
1722 relay_data,
1723 origin_node,
1724 hop_count,
1725 callsign,
1726 battery_percent,
1727 heart_rate,
1728 event_type,
1729 latitude,
1730 longitude,
1731 altitude,
1732 })
1733 }
1734
1735 pub fn enable_peer_e2ee(&self) {
1743 let mut sessions = self.peer_sessions.lock().unwrap();
1744 if sessions.is_none() {
1745 *sessions = Some(PeerSessionManager::new(self.config.node_id));
1746 log::info!(
1747 "Per-peer E2EE enabled for node {:08X}",
1748 self.config.node_id.as_u32()
1749 );
1750 }
1751 }
1752
1753 pub fn disable_peer_e2ee(&self) {
1757 let mut sessions = self.peer_sessions.lock().unwrap();
1758 *sessions = None;
1759 log::info!("Per-peer E2EE disabled");
1760 }
1761
1762 pub fn is_peer_e2ee_enabled(&self) -> bool {
1764 self.peer_sessions.lock().unwrap().is_some()
1765 }
1766
1767 pub fn peer_e2ee_public_key(&self) -> Option<[u8; 32]> {
1771 self.peer_sessions
1772 .lock()
1773 .unwrap()
1774 .as_ref()
1775 .map(|s| s.our_public_key())
1776 }
1777
1778 pub fn initiate_peer_e2ee(&self, peer_node_id: NodeId, now_ms: u64) -> Option<Vec<u8>> {
1784 let mut sessions = self.peer_sessions.lock().unwrap();
1785 let session_mgr = sessions.as_mut()?;
1786
1787 let key_exchange = session_mgr.initiate_session(peer_node_id, now_ms);
1788 let mut buf = Vec::with_capacity(2 + 37);
1789 buf.push(KEY_EXCHANGE_MARKER);
1790 buf.push(0x00); buf.extend_from_slice(&key_exchange.encode());
1792
1793 log::info!(
1794 "Initiated E2EE session with peer {:08X}",
1795 peer_node_id.as_u32()
1796 );
1797 Some(buf)
1798 }
1799
1800 pub fn has_peer_e2ee_session(&self, peer_node_id: NodeId) -> bool {
1802 self.peer_sessions
1803 .lock()
1804 .unwrap()
1805 .as_ref()
1806 .is_some_and(|s| s.has_session(peer_node_id))
1807 }
1808
1809 pub fn peer_e2ee_session_state(&self, peer_node_id: NodeId) -> Option<SessionState> {
1811 self.peer_sessions
1812 .lock()
1813 .unwrap()
1814 .as_ref()
1815 .and_then(|s| s.session_state(peer_node_id))
1816 }
1817
1818 pub fn send_peer_e2ee(
1823 &self,
1824 peer_node_id: NodeId,
1825 plaintext: &[u8],
1826 now_ms: u64,
1827 ) -> Option<Vec<u8>> {
1828 let mut sessions = self.peer_sessions.lock().unwrap();
1829 let session_mgr = sessions.as_mut()?;
1830
1831 match session_mgr.encrypt_for_peer(peer_node_id, plaintext, now_ms) {
1832 Ok(encrypted) => {
1833 let mut buf = Vec::with_capacity(2 + encrypted.encode().len());
1834 buf.push(PEER_E2EE_MARKER);
1835 buf.push(0x00); buf.extend_from_slice(&encrypted.encode());
1837 Some(buf)
1838 }
1839 Err(e) => {
1840 log::warn!(
1841 "Failed to encrypt for peer {:08X}: {:?}",
1842 peer_node_id.as_u32(),
1843 e
1844 );
1845 None
1846 }
1847 }
1848 }
1849
1850 pub fn close_peer_e2ee(&self, peer_node_id: NodeId) {
1852 let mut sessions = self.peer_sessions.lock().unwrap();
1853 if let Some(session_mgr) = sessions.as_mut() {
1854 session_mgr.close_session(peer_node_id);
1855 self.notify(PeatEvent::PeerE2eeClosed { peer_node_id });
1856 log::info!(
1857 "Closed E2EE session with peer {:08X}",
1858 peer_node_id.as_u32()
1859 );
1860 }
1861 }
1862
1863 pub fn peer_e2ee_session_count(&self) -> usize {
1865 self.peer_sessions
1866 .lock()
1867 .unwrap()
1868 .as_ref()
1869 .map(|s| s.session_count())
1870 .unwrap_or(0)
1871 }
1872
1873 pub fn peer_e2ee_established_count(&self) -> usize {
1875 self.peer_sessions
1876 .lock()
1877 .unwrap()
1878 .as_ref()
1879 .map(|s| s.established_count())
1880 .unwrap_or(0)
1881 }
1882
1883 fn handle_key_exchange(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1888 if data.len() < 2 || data[0] != KEY_EXCHANGE_MARKER {
1889 return None;
1890 }
1891
1892 let payload = &data[2..];
1893 let msg = KeyExchangeMessage::decode(payload)?;
1894
1895 let mut sessions = self.peer_sessions.lock().unwrap();
1896 let session_mgr = sessions.as_mut()?;
1897
1898 let (response, established) = session_mgr.handle_key_exchange(&msg, now_ms)?;
1899
1900 if established {
1901 self.notify(PeatEvent::PeerE2eeEstablished {
1902 peer_node_id: msg.sender_node_id,
1903 });
1904 log::info!(
1905 "E2EE session established with peer {:08X}",
1906 msg.sender_node_id.as_u32()
1907 );
1908 }
1909
1910 let mut buf = Vec::with_capacity(2 + 37);
1912 buf.push(KEY_EXCHANGE_MARKER);
1913 buf.push(0x00);
1914 buf.extend_from_slice(&response.encode());
1915 Some(buf)
1916 }
1917
1918 fn handle_peer_e2ee_message(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1923 if data.len() < 2 || data[0] != PEER_E2EE_MARKER {
1924 return None;
1925 }
1926
1927 let payload = &data[2..];
1928 let msg = PeerEncryptedMessage::decode(payload)?;
1929
1930 let mut sessions = self.peer_sessions.lock().unwrap();
1931 let session_mgr = sessions.as_mut()?;
1932
1933 match session_mgr.decrypt_from_peer(&msg, now_ms) {
1934 Ok(plaintext) => {
1935 self.notify(PeatEvent::PeerE2eeMessageReceived {
1937 from_node: msg.sender_node_id,
1938 data: plaintext.clone(),
1939 });
1940 Some(plaintext)
1941 }
1942 Err(e) => {
1943 log::warn!(
1944 "Failed to decrypt E2EE message from {:08X}: {:?}",
1945 msg.sender_node_id.as_u32(),
1946 e
1947 );
1948 None
1949 }
1950 }
1951 }
1952
1953 pub fn node_id(&self) -> NodeId {
1957 self.config.node_id
1958 }
1959
1960 pub fn callsign(&self) -> &str {
1962 &self.config.callsign
1963 }
1964
1965 pub fn mesh_id(&self) -> &str {
1967 &self.config.mesh_id
1968 }
1969
1970 pub fn device_name(&self) -> String {
1972 format!(
1973 "PEAT_{}-{:08X}",
1974 self.config.mesh_id,
1975 self.config.node_id.as_u32()
1976 )
1977 }
1978
1979 pub fn get_peer_callsign(&self, node_id: NodeId) -> Option<String> {
1984 self.peer_peripherals.read().ok().and_then(|peripherals| {
1985 peripherals
1986 .get(&node_id)
1987 .map(|p| p.callsign_str().to_string())
1988 })
1989 }
1990
1991 pub fn get_peer_peripheral(&self, node_id: NodeId) -> Option<Peripheral> {
1996 self.peer_peripherals
1997 .read()
1998 .ok()
1999 .and_then(|peripherals| peripherals.get(&node_id).cloned())
2000 }
2001
2002 pub fn document_registry(&self) -> &DocumentRegistry {
2017 &self.document_registry
2018 }
2019
2020 pub fn store_app_document<T: crate::registry::DocumentType>(&self, doc: T) -> bool {
2027 let type_id = T::TYPE_ID;
2028 let (source_node, timestamp) = doc.identity();
2029 let key = (type_id, source_node, timestamp);
2030
2031 let mut docs = self.app_documents.write().unwrap();
2032
2033 if let Some(existing) = docs.get_mut(&key) {
2034 self.document_registry
2036 .merge(type_id, existing.as_mut(), &doc)
2037 } else {
2038 docs.insert(key, Box::new(doc));
2040 true
2041 }
2042 }
2043
2044 pub fn store_app_document_boxed(
2051 &self,
2052 type_id: u8,
2053 source_node: u32,
2054 timestamp: u64,
2055 doc: Box<dyn core::any::Any + Send + Sync>,
2056 ) -> bool {
2057 let key = (type_id, source_node, timestamp);
2058
2059 let mut docs = self.app_documents.write().unwrap();
2060
2061 if let Some(existing) = docs.get_mut(&key) {
2062 self.document_registry
2064 .merge(type_id, existing.as_mut(), doc.as_ref())
2065 } else {
2066 docs.insert(key, doc);
2068 true
2069 }
2070 }
2071
2072 pub fn get_app_document<T: crate::registry::DocumentType>(
2076 &self,
2077 source_node: u32,
2078 timestamp: u64,
2079 ) -> Option<T> {
2080 let key = (T::TYPE_ID, source_node, timestamp);
2081
2082 let docs = self.app_documents.read().unwrap();
2083 docs.get(&key).and_then(|d| d.downcast_ref::<T>()).cloned()
2084 }
2085
2086 pub fn get_all_app_documents_of_type<T: crate::registry::DocumentType>(&self) -> Vec<T> {
2090 let docs = self.app_documents.read().unwrap();
2091 docs.iter()
2092 .filter(|((type_id, _, _), _)| *type_id == T::TYPE_ID)
2093 .filter_map(|(_, doc)| doc.downcast_ref::<T>().cloned())
2094 .collect()
2095 }
2096
2097 pub fn app_document_delta_ops(&self) -> Vec<crate::registry::AppOperation> {
2101 let docs = self.app_documents.read().unwrap();
2102 let mut ops = Vec::new();
2103
2104 for ((type_id, _source, _ts), doc) in docs.iter() {
2105 if let Some(op) = self.document_registry.to_delta_op(*type_id, doc.as_ref()) {
2106 ops.push(op);
2107 }
2108 }
2109
2110 ops
2111 }
2112
2113 pub fn app_document_keys(&self, type_id: u8) -> Vec<(u32, u64)> {
2117 let docs = self.app_documents.read().unwrap();
2118 docs.keys()
2119 .filter(|(tid, _, _)| *tid == type_id)
2120 .map(|(_, source, ts)| (*source, *ts))
2121 .collect()
2122 }
2123
2124 pub fn app_document_count(&self) -> usize {
2126 self.app_documents.read().unwrap().len()
2127 }
2128
2129 pub fn add_observer(&self, observer: Arc<dyn PeatObserver>) {
2133 self.observers.add(observer);
2134 }
2135
2136 pub fn remove_observer(&self, observer: &Arc<dyn PeatObserver>) {
2138 self.observers.remove(observer);
2139 }
2140
2141 pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
2148 let data = self.document_sync.send_emergency(timestamp);
2149 self.notify(PeatEvent::MeshStateChanged {
2150 peer_count: self.peer_manager.peer_count(),
2151 connected_count: self.peer_manager.connected_count(),
2152 });
2153 self.encrypt_document(&data)
2154 }
2155
2156 pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
2161 let data = self.document_sync.send_ack(timestamp);
2162 self.notify(PeatEvent::MeshStateChanged {
2163 peer_count: self.peer_manager.peer_count(),
2164 connected_count: self.peer_manager.connected_count(),
2165 });
2166 self.encrypt_document(&data)
2167 }
2168
2169 pub fn broadcast_bytes(&self, payload: &[u8]) -> Vec<u8> {
2176 self.encrypt_document(payload)
2177 }
2178
2179 pub fn clear_event(&self) {
2181 self.document_sync.clear_event();
2182 }
2183
2184 pub fn is_emergency_active(&self) -> bool {
2186 self.document_sync.is_emergency_active()
2187 }
2188
2189 pub fn is_ack_active(&self) -> bool {
2191 self.document_sync.is_ack_active()
2192 }
2193
2194 pub fn current_event(&self) -> Option<EventType> {
2196 self.document_sync.current_event()
2197 }
2198
2199 pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
2208 let data = self.document_sync.start_emergency(timestamp, known_peers);
2209 self.notify(PeatEvent::MeshStateChanged {
2210 peer_count: self.peer_manager.peer_count(),
2211 connected_count: self.peer_manager.connected_count(),
2212 });
2213 self.encrypt_document(&data)
2214 }
2215
2216 pub fn start_emergency_with_known_peers(&self, timestamp: u64) -> Vec<u8> {
2220 let peers: Vec<u32> = self
2221 .peer_manager
2222 .get_peers()
2223 .iter()
2224 .map(|p| p.node_id.as_u32())
2225 .collect();
2226 self.start_emergency(timestamp, &peers)
2227 }
2228
2229 pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
2234 let result = self.document_sync.ack_emergency(timestamp);
2235 if result.is_some() {
2236 self.notify(PeatEvent::MeshStateChanged {
2237 peer_count: self.peer_manager.peer_count(),
2238 connected_count: self.peer_manager.connected_count(),
2239 });
2240 }
2241 result.map(|data| self.encrypt_document(&data))
2242 }
2243
2244 pub fn clear_emergency(&self) {
2246 self.document_sync.clear_emergency();
2247 }
2248
2249 pub fn has_active_emergency(&self) -> bool {
2251 self.document_sync.has_active_emergency()
2252 }
2253
2254 pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
2258 self.document_sync.get_emergency_status()
2259 }
2260
2261 pub fn has_peer_acked(&self, peer_id: u32) -> bool {
2263 self.document_sync.has_peer_acked(peer_id)
2264 }
2265
2266 pub fn all_peers_acked(&self) -> bool {
2268 self.document_sync.all_peers_acked()
2269 }
2270
2271 #[cfg(feature = "legacy-chat")]
2281 pub fn send_chat(&self, sender: &str, text: &str, timestamp: u64) -> Option<Vec<u8>> {
2282 if self.document_sync.add_chat_message(sender, text, timestamp) {
2283 Some(self.encrypt_document(&self.build_document()))
2284 } else {
2285 None
2286 }
2287 }
2288
2289 #[cfg(feature = "legacy-chat")]
2297 pub fn send_chat_reply(
2298 &self,
2299 sender: &str,
2300 text: &str,
2301 reply_to_node: u32,
2302 reply_to_timestamp: u64,
2303 timestamp: u64,
2304 ) -> Option<Vec<u8>> {
2305 if self.document_sync.add_chat_reply(
2306 sender,
2307 text,
2308 reply_to_node,
2309 reply_to_timestamp,
2310 timestamp,
2311 ) {
2312 Some(self.encrypt_document(&self.build_document()))
2313 } else {
2314 None
2315 }
2316 }
2317
2318 #[cfg(feature = "legacy-chat")]
2320 pub fn chat_count(&self) -> usize {
2321 self.document_sync.chat_count()
2322 }
2323
2324 #[cfg(feature = "legacy-chat")]
2328 pub fn chat_messages_since(
2329 &self,
2330 since_timestamp: u64,
2331 ) -> Vec<(u32, u64, String, String, u32, u64)> {
2332 self.document_sync.chat_messages_since(since_timestamp)
2333 }
2334
2335 #[cfg(feature = "legacy-chat")]
2339 pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
2340 self.document_sync.all_chat_messages()
2341 }
2342
2343 pub fn on_ble_discovered(
2349 &self,
2350 identifier: &str,
2351 name: Option<&str>,
2352 rssi: i8,
2353 mesh_id: Option<&str>,
2354 now_ms: u64,
2355 ) -> Option<PeatPeer> {
2356 let (node_id, is_new) = self
2357 .peer_manager
2358 .on_discovered(identifier, name, rssi, mesh_id, now_ms)?;
2359
2360 let peer = self.peer_manager.get_peer(node_id)?;
2361
2362 {
2364 let mut graph = self.connection_graph.lock().unwrap();
2365 graph.on_discovered(
2366 node_id,
2367 identifier.to_string(),
2368 name.map(|s| s.to_string()),
2369 mesh_id.map(|s| s.to_string()),
2370 rssi,
2371 now_ms,
2372 );
2373 }
2374
2375 if is_new {
2376 self.notify(PeatEvent::PeerDiscovered { peer: peer.clone() });
2377 self.notify_mesh_state_changed();
2378 }
2379
2380 Some(peer)
2381 }
2382
2383 pub fn on_ble_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
2387 let node_id = match self.peer_manager.on_connected(identifier, now_ms) {
2388 Some(id) => id,
2389 None => {
2390 log::warn!(
2391 "on_ble_connected: identifier {:?} not in peer map — \
2392 use on_incoming_connection() for peripheral connections",
2393 identifier
2394 );
2395 return None;
2396 }
2397 };
2398
2399 {
2401 let mut graph = self.connection_graph.lock().unwrap();
2402 graph.on_connected(node_id, now_ms);
2403 }
2404
2405 self.register_peer_for_delta(&node_id);
2407
2408 self.notify(PeatEvent::PeerConnected { node_id });
2409 self.notify_mesh_state_changed();
2410 Some(node_id)
2411 }
2412
2413 pub fn on_ble_disconnected(
2415 &self,
2416 identifier: &str,
2417 reason: DisconnectReason,
2418 ) -> Option<NodeId> {
2419 let (node_id, observer_reason) = self.peer_manager.on_disconnected(identifier, reason)?;
2420
2421 {
2423 let mut graph = self.connection_graph.lock().unwrap();
2424 let platform_reason = match observer_reason {
2425 DisconnectReason::LocalRequest => crate::platform::DisconnectReason::LocalRequest,
2426 DisconnectReason::RemoteRequest => crate::platform::DisconnectReason::RemoteRequest,
2427 DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2428 DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2429 DisconnectReason::ConnectionFailed => {
2430 crate::platform::DisconnectReason::ConnectionFailed
2431 }
2432 DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2433 };
2434 let now_ms = std::time::SystemTime::now()
2435 .duration_since(std::time::UNIX_EPOCH)
2436 .map(|d| d.as_millis() as u64)
2437 .unwrap_or(0);
2438 graph.on_disconnected(node_id, platform_reason, now_ms);
2439
2440 graph.remove_via_peer(node_id);
2443 }
2444
2445 self.unregister_peer_for_delta(&node_id);
2447
2448 self.notify(PeatEvent::PeerDisconnected {
2449 node_id,
2450 reason: observer_reason,
2451 });
2452 self.notify_mesh_state_changed();
2453 Some(node_id)
2454 }
2455
2456 pub fn on_peer_disconnected(&self, node_id: NodeId, reason: DisconnectReason) {
2460 if self
2461 .peer_manager
2462 .on_disconnected_by_node_id(node_id, reason)
2463 {
2464 {
2466 let mut graph = self.connection_graph.lock().unwrap();
2467 let platform_reason = match reason {
2468 DisconnectReason::LocalRequest => {
2469 crate::platform::DisconnectReason::LocalRequest
2470 }
2471 DisconnectReason::RemoteRequest => {
2472 crate::platform::DisconnectReason::RemoteRequest
2473 }
2474 DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2475 DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2476 DisconnectReason::ConnectionFailed => {
2477 crate::platform::DisconnectReason::ConnectionFailed
2478 }
2479 DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2480 };
2481 let now_ms = std::time::SystemTime::now()
2482 .duration_since(std::time::UNIX_EPOCH)
2483 .map(|d| d.as_millis() as u64)
2484 .unwrap_or(0);
2485 graph.on_disconnected(node_id, platform_reason, now_ms);
2486
2487 graph.remove_via_peer(node_id);
2489 }
2490
2491 self.unregister_peer_for_delta(&node_id);
2493
2494 self.notify(PeatEvent::PeerDisconnected { node_id, reason });
2495 self.notify_mesh_state_changed();
2496 }
2497 }
2498
2499 pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
2503 let is_new = self
2504 .peer_manager
2505 .on_incoming_connection(identifier, node_id, now_ms);
2506
2507 {
2509 let mut graph = self.connection_graph.lock().unwrap();
2510 if is_new {
2511 graph.on_discovered(
2512 node_id,
2513 identifier.to_string(),
2514 None,
2515 Some(self.config.mesh_id.clone()),
2516 -50, now_ms,
2518 );
2519 }
2520 graph.on_connected(node_id, now_ms);
2521 }
2522
2523 self.register_peer_for_delta(&node_id);
2525
2526 if is_new {
2527 if let Some(peer) = self.peer_manager.get_peer(node_id) {
2528 self.notify(PeatEvent::PeerDiscovered { peer });
2529 }
2530 }
2531
2532 self.notify(PeatEvent::PeerConnected { node_id });
2533 self.notify_mesh_state_changed();
2534
2535 is_new
2536 }
2537
2538 pub fn on_ble_data_received(
2545 &self,
2546 identifier: &str,
2547 data: &[u8],
2548 now_ms: u64,
2549 ) -> Option<DataReceivedResult> {
2550 let node_id = self.peer_manager.get_node_id(identifier)?;
2552
2553 if data.len() >= 2 {
2555 match data[0] {
2556 KEY_EXCHANGE_MARKER => {
2557 let _response = self.handle_key_exchange(data, now_ms);
2559 return None;
2561 }
2562 PEER_E2EE_MARKER => {
2563 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2565 return None;
2567 }
2568 RELAY_ENVELOPE_MARKER => {
2569 return self
2571 .handle_relay_envelope_with_identifier(node_id, identifier, data, now_ms);
2572 }
2573 _ => {}
2574 }
2575 }
2576
2577 self.process_document_data_with_identifier(node_id, identifier, data, now_ms, None, None, 0)
2579 }
2580
2581 #[allow(clippy::too_many_arguments)]
2583 fn process_document_data_with_identifier(
2584 &self,
2585 source_node: NodeId,
2586 identifier: &str,
2587 data: &[u8],
2588 now_ms: u64,
2589 relay_data: Option<Vec<u8>>,
2590 origin_node: Option<NodeId>,
2591 hop_count: u8,
2592 ) -> Option<DataReceivedResult> {
2593 let decrypted = self.decrypt_document(data, Some(identifier))?;
2595
2596 #[cfg(feature = "mesh-translator")]
2599 if self.try_handle_translator_marker(&decrypted, Some(identifier), Some(source_node)) {
2600 return None;
2601 }
2602
2603 if DeltaDocument::is_delta_document(&decrypted) {
2605 return self.process_delta_document_internal(
2606 source_node,
2607 &decrypted,
2608 now_ms,
2609 relay_data,
2610 origin_node,
2611 hop_count,
2612 );
2613 }
2614
2615 let result = self.document_sync.merge_document(&decrypted)?;
2617
2618 if let Some(ref peripheral) = result.peer_peripheral {
2620 if let Ok(mut peripherals) = self.peer_peripherals.write() {
2621 peripherals.insert(result.source_node, peripheral.clone());
2622 }
2623 }
2624
2625 self.peer_manager.record_sync(source_node, now_ms);
2627
2628 if result.is_emergency() {
2630 self.notify(PeatEvent::EmergencyReceived {
2631 from_node: result.source_node,
2632 });
2633 } else if result.is_ack() {
2634 self.notify(PeatEvent::AckReceived {
2635 from_node: result.source_node,
2636 });
2637 }
2638
2639 if result.counter_changed {
2640 self.notify(PeatEvent::DocumentSynced {
2641 from_node: result.source_node,
2642 total_count: result.total_count,
2643 });
2644 }
2645
2646 if relay_data.is_some() {
2648 let relay_targets = self.get_relay_targets(Some(source_node));
2649 self.notify(PeatEvent::MessageRelayed {
2650 origin_node: origin_node.unwrap_or(result.source_node),
2651 relay_count: relay_targets.len(),
2652 hop_count,
2653 });
2654 }
2655
2656 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2657 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2658
2659 Some(DataReceivedResult {
2660 source_node: result.source_node,
2661 is_emergency: result.is_emergency(),
2662 is_ack: result.is_ack(),
2663 counter_changed: result.counter_changed,
2664 emergency_changed: result.emergency_changed,
2665 total_count: result.total_count,
2666 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2667 relay_data,
2668 origin_node,
2669 hop_count,
2670 callsign,
2671 battery_percent,
2672 heart_rate,
2673 event_type,
2674 latitude,
2675 longitude,
2676 altitude,
2677 })
2678 }
2679
2680 fn handle_relay_envelope_with_identifier(
2682 &self,
2683 source_node: NodeId,
2684 identifier: &str,
2685 data: &[u8],
2686 now_ms: u64,
2687 ) -> Option<DataReceivedResult> {
2688 let envelope = RelayEnvelope::decode(data)?;
2690
2691 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2693 let stats = self
2694 .seen_cache
2695 .lock()
2696 .unwrap()
2697 .get_stats(&envelope.message_id);
2698 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2699
2700 self.notify(PeatEvent::DuplicateMessageDropped {
2701 origin_node: envelope.origin_node,
2702 seen_count,
2703 });
2704 return None;
2705 }
2706
2707 let relay_data = if envelope.can_relay() && self.config.enable_relay {
2709 envelope.relay().map(|e| e.encode())
2710 } else {
2711 if !envelope.can_relay() {
2712 self.notify(PeatEvent::MessageTtlExpired {
2713 origin_node: envelope.origin_node,
2714 hop_count: envelope.hop_count,
2715 });
2716 }
2717 None
2718 };
2719
2720 self.process_document_data_with_identifier(
2722 source_node,
2723 identifier,
2724 &envelope.payload,
2725 now_ms,
2726 relay_data,
2727 Some(envelope.origin_node),
2728 envelope.hop_count,
2729 )
2730 }
2731
2732 pub fn on_ble_data_received_from_node(
2739 &self,
2740 node_id: NodeId,
2741 data: &[u8],
2742 now_ms: u64,
2743 ) -> Option<DataReceivedResult> {
2744 if data.len() >= 2 {
2746 match data[0] {
2747 KEY_EXCHANGE_MARKER => {
2748 let _response = self.handle_key_exchange(data, now_ms);
2749 return None;
2750 }
2751 PEER_E2EE_MARKER => {
2752 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2753 return None;
2754 }
2755 RELAY_ENVELOPE_MARKER => {
2756 return self.handle_relay_envelope(node_id, data, now_ms);
2758 }
2759 _ => {}
2760 }
2761 }
2762
2763 self.process_document_data(node_id, data, now_ms, None, None, 0)
2765 }
2766
2767 pub fn on_ble_data_received_anonymous(
2777 &self,
2778 identifier: &str,
2779 data: &[u8],
2780 now_ms: u64,
2781 ) -> Option<DataReceivedResult> {
2782 log::debug!(
2783 "on_ble_data_received_anonymous: identifier={}, len={}, marker=0x{:02X}",
2784 identifier,
2785 data.len(),
2786 data.first().copied().unwrap_or(0)
2787 );
2788
2789 let decrypted = match self.decrypt_document(data, Some(identifier)) {
2791 Some(d) => d,
2792 None => {
2793 log::warn!(
2794 "on_ble_data_received_anonymous: decrypt/parse FAILED for {} byte doc from {}",
2795 data.len(),
2796 identifier
2797 );
2798 return None;
2799 }
2800 };
2801
2802 #[cfg(feature = "mesh-translator")]
2813 if self.try_handle_translator_marker(&decrypted, Some(identifier), None) {
2814 return None;
2815 }
2816
2817 if decrypted.len() < 8 {
2820 log::warn!("Decrypted document too short to extract source_node");
2821 return None;
2822 }
2823
2824 let source_node_u32 =
2825 u32::from_le_bytes([decrypted[4], decrypted[5], decrypted[6], decrypted[7]]);
2826 let source_node = NodeId::new(source_node_u32);
2827
2828 log::info!(
2829 "Anonymous document from {}: source_node={:08X}, len={}",
2830 identifier,
2831 source_node_u32,
2832 decrypted.len()
2833 );
2834
2835 self.peer_manager
2838 .register_identifier(identifier, source_node);
2839
2840 let is_delta = DeltaDocument::is_delta_document(&decrypted);
2842 log::info!(
2843 "Document format: delta={}, first_byte=0x{:02X}, len={}",
2844 is_delta,
2845 decrypted.first().copied().unwrap_or(0),
2846 decrypted.len()
2847 );
2848
2849 if is_delta {
2850 return self.process_delta_document_internal(
2851 source_node,
2852 &decrypted,
2853 now_ms,
2854 None,
2855 None,
2856 0,
2857 );
2858 }
2859
2860 const APP_LAYER_MARKER: u8 = 0xAF;
2864 if decrypted.first().copied() == Some(APP_LAYER_MARKER) {
2865 log::debug!(
2866 "App-layer message (0xAF) from {:08X}, {} bytes - passing to relay",
2867 source_node.as_u32(),
2868 decrypted.len()
2869 );
2870 return Some(DataReceivedResult {
2871 source_node,
2872 is_emergency: false,
2873 is_ack: false,
2874 counter_changed: false,
2875 emergency_changed: false,
2876 total_count: 0,
2877 event_timestamp: now_ms,
2878 relay_data: Some(decrypted.to_vec()),
2879 origin_node: None,
2880 hop_count: 0,
2881 callsign: None,
2882 battery_percent: None,
2883 heart_rate: None,
2884 event_type: None,
2885 latitude: None,
2886 longitude: None,
2887 altitude: None,
2888 });
2889 }
2890
2891 log::info!(
2893 "Processing legacy document from {:08X}",
2894 source_node.as_u32()
2895 );
2896 let result = self.document_sync.merge_document(&decrypted)?;
2897
2898 log::info!(
2900 "Merge result: peer_peripheral={}, counter_changed={}",
2901 result.peer_peripheral.is_some(),
2902 result.counter_changed
2903 );
2904 if let Some(ref p) = result.peer_peripheral {
2905 log::info!("Peripheral callsign: '{}'", p.callsign_str());
2906 }
2907
2908 self.peer_manager.record_sync(source_node, now_ms);
2910
2911 if result.is_emergency() {
2913 self.notify(PeatEvent::EmergencyReceived {
2914 from_node: result.source_node,
2915 });
2916 } else if result.is_ack() {
2917 self.notify(PeatEvent::AckReceived {
2918 from_node: result.source_node,
2919 });
2920 }
2921
2922 if result.counter_changed {
2923 self.notify(PeatEvent::DocumentSynced {
2924 from_node: result.source_node,
2925 total_count: result.total_count,
2926 });
2927 }
2928
2929 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2930 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2931
2932 Some(DataReceivedResult {
2933 source_node: result.source_node,
2934 is_emergency: result.is_emergency(),
2935 is_ack: result.is_ack(),
2936 counter_changed: result.counter_changed,
2937 emergency_changed: result.emergency_changed,
2938 total_count: result.total_count,
2939 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2940 relay_data: None,
2941 origin_node: None,
2942 hop_count: 0,
2943 callsign,
2944 battery_percent,
2945 heart_rate,
2946 event_type,
2947 latitude,
2948 longitude,
2949 altitude,
2950 })
2951 }
2952
2953 fn process_document_data(
2955 &self,
2956 source_node: NodeId,
2957 data: &[u8],
2958 now_ms: u64,
2959 relay_data: Option<Vec<u8>>,
2960 origin_node: Option<NodeId>,
2961 hop_count: u8,
2962 ) -> Option<DataReceivedResult> {
2963 let source_hint = format!("node:{:08X}", source_node.as_u32());
2965 let decrypted = self.decrypt_document(data, Some(&source_hint))?;
2966
2967 #[cfg(feature = "mesh-translator")]
2970 if self.try_handle_translator_marker(&decrypted, None, Some(source_node)) {
2971 return None;
2972 }
2973
2974 if DeltaDocument::is_delta_document(&decrypted) {
2976 return self.process_delta_document_internal(
2977 source_node,
2978 &decrypted,
2979 now_ms,
2980 relay_data,
2981 origin_node,
2982 hop_count,
2983 );
2984 }
2985
2986 let result = self.document_sync.merge_document(&decrypted)?;
2988
2989 if let Some(ref peripheral) = result.peer_peripheral {
2991 if let Ok(mut peripherals) = self.peer_peripherals.write() {
2992 peripherals.insert(result.source_node, peripheral.clone());
2993 }
2994 }
2995
2996 self.peer_manager.record_sync(source_node, now_ms);
2998
2999 if result.is_emergency() {
3001 self.notify(PeatEvent::EmergencyReceived {
3002 from_node: result.source_node,
3003 });
3004 } else if result.is_ack() {
3005 self.notify(PeatEvent::AckReceived {
3006 from_node: result.source_node,
3007 });
3008 }
3009
3010 if result.counter_changed {
3011 self.notify(PeatEvent::DocumentSynced {
3012 from_node: result.source_node,
3013 total_count: result.total_count,
3014 });
3015 }
3016
3017 if relay_data.is_some() {
3019 let relay_targets = self.get_relay_targets(Some(source_node));
3020 self.notify(PeatEvent::MessageRelayed {
3021 origin_node: origin_node.unwrap_or(result.source_node),
3022 relay_count: relay_targets.len(),
3023 hop_count,
3024 });
3025 }
3026
3027 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
3028 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
3029
3030 Some(DataReceivedResult {
3031 source_node: result.source_node,
3032 is_emergency: result.is_emergency(),
3033 is_ack: result.is_ack(),
3034 counter_changed: result.counter_changed,
3035 emergency_changed: result.emergency_changed,
3036 total_count: result.total_count,
3037 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
3038 relay_data,
3039 origin_node,
3040 hop_count,
3041 callsign,
3042 battery_percent,
3043 heart_rate,
3044 event_type,
3045 latitude,
3046 longitude,
3047 altitude,
3048 })
3049 }
3050
3051 fn handle_relay_envelope(
3053 &self,
3054 source_node: NodeId,
3055 data: &[u8],
3056 now_ms: u64,
3057 ) -> Option<DataReceivedResult> {
3058 let decision = self.process_relay_envelope(data, source_node, now_ms)?;
3060
3061 let relay_data = if decision.should_relay {
3063 decision.relay_data()
3064 } else {
3065 None
3066 };
3067
3068 self.process_document_data(
3070 source_node,
3071 &decision.payload,
3072 now_ms,
3073 relay_data,
3074 Some(decision.origin_node),
3075 decision.hop_count,
3076 )
3077 }
3078
3079 pub fn on_ble_data(
3088 &self,
3089 identifier: &str,
3090 data: &[u8],
3091 now_ms: u64,
3092 ) -> Option<DataReceivedResult> {
3093 if data.len() >= 2 {
3095 match data[0] {
3096 KEY_EXCHANGE_MARKER => {
3097 let _response = self.handle_key_exchange(data, now_ms);
3098 return None;
3099 }
3100 PEER_E2EE_MARKER => {
3101 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
3102 return None;
3103 }
3104 RELAY_ENVELOPE_MARKER => {
3105 return self.handle_relay_envelope_with_incoming(identifier, data, now_ms);
3107 }
3108 _ => {}
3109 }
3110 }
3111
3112 self.process_incoming_document(identifier, data, now_ms, None, None, 0)
3114 }
3115
3116 fn process_incoming_document(
3118 &self,
3119 identifier: &str,
3120 data: &[u8],
3121 now_ms: u64,
3122 relay_data: Option<Vec<u8>>,
3123 origin_node: Option<NodeId>,
3124 hop_count: u8,
3125 ) -> Option<DataReceivedResult> {
3126 let decrypted = self.decrypt_document(data, Some(identifier))?;
3128
3129 let result = self.document_sync.merge_document(&decrypted)?;
3131
3132 self.peer_manager.record_sync(result.source_node, now_ms);
3134
3135 if origin_node.is_none() {
3140 let is_new =
3142 self.peer_manager
3143 .on_incoming_connection(identifier, result.source_node, now_ms);
3144
3145 {
3147 let mut graph = self.connection_graph.lock().unwrap();
3148 if is_new {
3149 graph.on_discovered(
3150 result.source_node,
3151 identifier.to_string(),
3152 None,
3153 Some(self.config.mesh_id.clone()),
3154 -50, now_ms,
3156 );
3157 }
3158 graph.on_connected(result.source_node, now_ms);
3159 }
3160 }
3161
3162 if result.is_emergency() {
3164 self.notify(PeatEvent::EmergencyReceived {
3165 from_node: result.source_node,
3166 });
3167 } else if result.is_ack() {
3168 self.notify(PeatEvent::AckReceived {
3169 from_node: result.source_node,
3170 });
3171 }
3172
3173 if result.counter_changed {
3174 self.notify(PeatEvent::DocumentSynced {
3175 from_node: result.source_node,
3176 total_count: result.total_count,
3177 });
3178 }
3179
3180 if relay_data.is_some() {
3182 let relay_targets = self.get_relay_targets(Some(result.source_node));
3183 self.notify(PeatEvent::MessageRelayed {
3184 origin_node: origin_node.unwrap_or(result.source_node),
3185 relay_count: relay_targets.len(),
3186 hop_count,
3187 });
3188 }
3189
3190 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
3191 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
3192
3193 Some(DataReceivedResult {
3194 source_node: result.source_node,
3195 is_emergency: result.is_emergency(),
3196 is_ack: result.is_ack(),
3197 counter_changed: result.counter_changed,
3198 emergency_changed: result.emergency_changed,
3199 total_count: result.total_count,
3200 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
3201 relay_data,
3202 origin_node,
3203 hop_count,
3204 callsign,
3205 battery_percent,
3206 heart_rate,
3207 event_type,
3208 latitude,
3209 longitude,
3210 altitude,
3211 })
3212 }
3213
3214 fn handle_relay_envelope_with_incoming(
3216 &self,
3217 identifier: &str,
3218 data: &[u8],
3219 now_ms: u64,
3220 ) -> Option<DataReceivedResult> {
3221 let envelope = RelayEnvelope::decode(data)?;
3223
3224 if let Some(source_peer) = self.peer_manager.get_node_id(identifier) {
3227 if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
3228 let is_new = self.connection_graph.lock().unwrap().on_relay_received(
3229 source_peer,
3230 envelope.origin_node,
3231 envelope.hop_count,
3232 now_ms,
3233 );
3234
3235 if is_new {
3236 log::debug!(
3237 "Discovered indirect peer {:08X} via {:08X} ({} hops)",
3238 envelope.origin_node.as_u32(),
3239 source_peer.as_u32(),
3240 envelope.hop_count
3241 );
3242 }
3243 }
3244 }
3245
3246 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
3248 let stats = self
3250 .seen_cache
3251 .lock()
3252 .unwrap()
3253 .get_stats(&envelope.message_id);
3254 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
3255
3256 self.notify(PeatEvent::DuplicateMessageDropped {
3257 origin_node: envelope.origin_node,
3258 seen_count,
3259 });
3260 return None;
3261 }
3262
3263 let (should_relay, relay_data) = if envelope.can_relay() && self.config.enable_relay {
3265 let relay_env = envelope.relay();
3266 (true, relay_env.map(|e| e.encode()))
3267 } else {
3268 if !envelope.can_relay() {
3269 self.notify(PeatEvent::MessageTtlExpired {
3270 origin_node: envelope.origin_node,
3271 hop_count: envelope.hop_count,
3272 });
3273 }
3274 (false, None)
3275 };
3276
3277 self.process_incoming_document(
3279 identifier,
3280 &envelope.payload,
3281 now_ms,
3282 if should_relay { relay_data } else { None },
3283 Some(envelope.origin_node),
3284 envelope.hop_count,
3285 )
3286 }
3287
3288 pub fn tick(&self, now_ms: u64) -> Option<Vec<u8>> {
3298 use std::sync::atomic::Ordering;
3299
3300 let now_ms_32 = now_ms as u32;
3302
3303 let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3305 let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3306 if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3307 self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3308 let removed = self.peer_manager.cleanup_stale(now_ms);
3309 for node_id in &removed {
3310 self.notify(PeatEvent::PeerLost { node_id: *node_id });
3311 }
3312 if !removed.is_empty() {
3313 self.notify_mesh_state_changed();
3314 }
3315
3316 {
3318 let mut graph = self.connection_graph.lock().unwrap();
3319 let newly_lost = graph.tick(now_ms);
3320 graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3322 drop(graph);
3323
3324 for node_id in newly_lost {
3327 if !removed.contains(&node_id) {
3329 self.notify(PeatEvent::PeerLost { node_id });
3330 }
3331 }
3332 }
3333 }
3334
3335 let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3337 let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3338 if sync_elapsed >= self.config.sync_interval_ms as u32 {
3339 self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3340 if self.peer_manager.connected_count() > 0 {
3342 let doc = self.document_sync.build_document();
3343 return Some(self.encrypt_document(&doc));
3344 }
3345 }
3346
3347 None
3348 }
3349
3350 pub fn tick_with_peer_deltas(&self, now_ms: u64) -> Vec<(NodeId, Vec<u8>)> {
3359 use std::sync::atomic::Ordering;
3360 let now_ms_32 = now_ms as u32;
3361
3362 let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3364 let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3365 if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3366 self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3367 let removed = self.peer_manager.cleanup_stale(now_ms);
3368 for node_id in &removed {
3369 self.notify(PeatEvent::PeerLost { node_id: *node_id });
3370 }
3371 if !removed.is_empty() {
3372 self.notify_mesh_state_changed();
3373 }
3374
3375 {
3377 let mut graph = self.connection_graph.lock().unwrap();
3378 let newly_lost = graph.tick(now_ms);
3379 graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3380 drop(graph);
3381
3382 for node_id in newly_lost {
3383 if !removed.contains(&node_id) {
3384 self.notify(PeatEvent::PeerLost { node_id });
3385 }
3386 }
3387 }
3388 }
3389
3390 let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3392 let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3393 if sync_elapsed >= self.config.sync_interval_ms as u32 {
3394 self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3395
3396 let doc = self.document_sync.build_document();
3398 let encrypted = self.encrypt_document(&doc);
3399 let mut results = Vec::new();
3400 for peer in self.get_connected_peers() {
3401 results.push((peer.node_id, encrypted.clone()));
3402 }
3403 return results;
3404 }
3405
3406 Vec::new()
3407 }
3408
3409 pub fn get_peers(&self) -> Vec<PeatPeer> {
3413 self.peer_manager.get_peers()
3414 }
3415
3416 pub fn get_connected_peers(&self) -> Vec<PeatPeer> {
3418 self.peer_manager.get_connected_peers()
3419 }
3420
3421 pub fn get_peer(&self, node_id: NodeId) -> Option<PeatPeer> {
3423 self.peer_manager.get_peer(node_id)
3424 }
3425
3426 pub fn peer_count(&self) -> usize {
3428 self.peer_manager.peer_count()
3429 }
3430
3431 pub fn connected_count(&self) -> usize {
3433 self.peer_manager.connected_count()
3434 }
3435
3436 pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
3438 self.peer_manager.matches_mesh(device_mesh_id)
3439 }
3440
3441 pub fn get_connection_graph(&self) -> Vec<PeerConnectionState> {
3465 self.connection_graph.lock().unwrap().get_all_owned()
3466 }
3467
3468 pub fn get_peer_connection_state(&self, node_id: NodeId) -> Option<PeerConnectionState> {
3470 self.connection_graph
3471 .lock()
3472 .unwrap()
3473 .get_peer(node_id)
3474 .cloned()
3475 }
3476
3477 pub fn get_connected_states(&self) -> Vec<PeerConnectionState> {
3479 self.connection_graph
3480 .lock()
3481 .unwrap()
3482 .get_connected()
3483 .into_iter()
3484 .cloned()
3485 .collect()
3486 }
3487
3488 pub fn get_degraded_peers(&self) -> Vec<PeerConnectionState> {
3490 self.connection_graph
3491 .lock()
3492 .unwrap()
3493 .get_degraded()
3494 .into_iter()
3495 .cloned()
3496 .collect()
3497 }
3498
3499 pub fn get_recently_disconnected(
3503 &self,
3504 within_ms: u64,
3505 now_ms: u64,
3506 ) -> Vec<PeerConnectionState> {
3507 self.connection_graph
3508 .lock()
3509 .unwrap()
3510 .get_recently_disconnected(within_ms, now_ms)
3511 .into_iter()
3512 .cloned()
3513 .collect()
3514 }
3515
3516 pub fn get_lost_peers(&self) -> Vec<PeerConnectionState> {
3518 self.connection_graph
3519 .lock()
3520 .unwrap()
3521 .get_lost()
3522 .into_iter()
3523 .cloned()
3524 .collect()
3525 }
3526
3527 pub fn get_connection_state_counts(&self) -> StateCountSummary {
3529 self.connection_graph.lock().unwrap().state_counts()
3530 }
3531
3532 pub fn get_indirect_peers(&self) -> Vec<IndirectPeer> {
3540 self.connection_graph
3541 .lock()
3542 .unwrap()
3543 .get_indirect_peers_owned()
3544 }
3545
3546 pub fn get_peer_degree(&self, node_id: NodeId) -> Option<PeerDegree> {
3553 self.connection_graph.lock().unwrap().peer_degree(node_id)
3554 }
3555
3556 pub fn get_full_state_counts(&self) -> FullStateCountSummary {
3561 self.connection_graph.lock().unwrap().full_state_counts()
3562 }
3563
3564 pub fn get_paths_to_peer(&self, node_id: NodeId) -> Vec<(NodeId, u8)> {
3569 self.connection_graph.lock().unwrap().get_paths_to(node_id)
3570 }
3571
3572 pub fn is_peer_known(&self, node_id: NodeId) -> bool {
3574 self.connection_graph.lock().unwrap().is_known(node_id)
3575 }
3576
3577 pub fn indirect_peer_count(&self) -> usize {
3579 self.connection_graph.lock().unwrap().indirect_peer_count()
3580 }
3581
3582 pub fn cleanup_indirect_peers(&self, now_ms: u64) -> Vec<NodeId> {
3587 self.connection_graph
3588 .lock()
3589 .unwrap()
3590 .cleanup_indirect(now_ms)
3591 }
3592
3593 pub fn total_count(&self) -> u64 {
3595 self.document_sync.total_count()
3596 }
3597
3598 pub fn document_version(&self) -> u32 {
3600 self.document_sync.version()
3601 }
3602
3603 pub fn version(&self) -> u32 {
3605 self.document_sync.version()
3606 }
3607
3608 pub fn update_health(&self, battery_percent: u8) {
3610 self.document_sync.update_health(battery_percent);
3611 }
3612
3613 pub fn update_activity(&self, activity: u8) {
3615 self.document_sync.update_activity(activity);
3616 }
3617
3618 pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
3620 self.document_sync
3621 .update_health_full(battery_percent, activity);
3622 }
3623
3624 pub fn update_heart_rate(&self, heart_rate: u8) {
3626 self.document_sync.update_heart_rate(heart_rate);
3627 }
3628
3629 pub fn update_location(&self, latitude: f32, longitude: f32, altitude: Option<f32>) {
3631 self.document_sync
3632 .update_location(latitude, longitude, altitude);
3633 }
3634
3635 pub fn clear_location(&self) {
3637 self.document_sync.clear_location();
3638 }
3639
3640 pub fn update_callsign(&self, callsign: &str) {
3642 self.document_sync.update_callsign(callsign);
3643 }
3644
3645 pub fn set_peripheral_event(&self, event_type: EventType, timestamp: u64) {
3647 self.document_sync
3648 .set_peripheral_event(event_type, timestamp);
3649 }
3650
3651 pub fn clear_peripheral_event(&self) {
3653 self.document_sync.clear_peripheral_event();
3654 }
3655
3656 #[allow(clippy::too_many_arguments)]
3661 pub fn update_peripheral_state(
3662 &self,
3663 callsign: &str,
3664 battery_percent: u8,
3665 heart_rate: Option<u8>,
3666 latitude: Option<f32>,
3667 longitude: Option<f32>,
3668 altitude: Option<f32>,
3669 event_type: Option<EventType>,
3670 timestamp: u64,
3671 ) {
3672 self.document_sync.update_peripheral_state(
3673 callsign,
3674 battery_percent,
3675 heart_rate,
3676 latitude,
3677 longitude,
3678 altitude,
3679 event_type,
3680 timestamp,
3681 );
3682 }
3683
3684 pub fn build_document(&self) -> Vec<u8> {
3688 let doc = self.document_sync.build_document();
3689 self.encrypt_document(&doc)
3690 }
3691
3692 pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<PeatPeer> {
3694 self.peer_manager.peers_needing_sync(now_ms)
3695 }
3696
3697 fn notify(&self, event: PeatEvent) {
3700 self.observers.notify(event);
3701 }
3702
3703 fn notify_mesh_state_changed(&self) {
3704 self.notify(PeatEvent::MeshStateChanged {
3705 peer_count: self.peer_manager.peer_count(),
3706 connected_count: self.peer_manager.connected_count(),
3707 });
3708 }
3709
3710 pub fn check_canned_message(&self, source_node: u32, timestamp: u64, _ttl_ms: u64) -> bool {
3730 let mut id_bytes = [0u8; 16];
3733 id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3734 id_bytes[4..12].copy_from_slice(×tamp.to_le_bytes());
3735 let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3736
3737 let seen = self.seen_cache.lock().unwrap();
3739 !seen.has_seen(&message_id)
3740 }
3741
3742 pub fn mark_canned_message_seen(&self, source_node: u32, timestamp: u64) {
3747 let now = std::time::SystemTime::now()
3748 .duration_since(std::time::UNIX_EPOCH)
3749 .map(|d| d.as_millis() as u64)
3750 .unwrap_or(0);
3751
3752 let mut id_bytes = [0u8; 16];
3754 id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3755 id_bytes[4..12].copy_from_slice(×tamp.to_le_bytes());
3756 let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3757 let origin = NodeId::new(source_node);
3758
3759 let mut seen = self.seen_cache.lock().unwrap();
3760 seen.mark_seen(message_id, origin, now);
3761 }
3762
3763 pub fn get_connected_peer_identifiers(&self) -> Vec<String> {
3768 self.peer_manager.get_connected_identifiers()
3769 }
3770}
3771
3772#[derive(Debug, Clone)]
3774pub struct DataReceivedResult {
3775 pub source_node: NodeId,
3777
3778 pub is_emergency: bool,
3780
3781 pub is_ack: bool,
3783
3784 pub counter_changed: bool,
3786
3787 pub emergency_changed: bool,
3789
3790 pub total_count: u64,
3792
3793 pub event_timestamp: u64,
3795
3796 pub relay_data: Option<Vec<u8>>,
3801
3802 pub origin_node: Option<NodeId>,
3804
3805 pub hop_count: u8,
3807
3808 pub callsign: Option<String>,
3811
3812 pub battery_percent: Option<u8>,
3814
3815 pub heart_rate: Option<u8>,
3817
3818 pub event_type: Option<u8>,
3820
3821 pub latitude: Option<f32>,
3823
3824 pub longitude: Option<f32>,
3826
3827 pub altitude: Option<f32>,
3829}
3830
3831impl DataReceivedResult {
3832 #[allow(clippy::type_complexity)]
3834 fn peripheral_fields(
3835 peripheral: &Option<crate::sync::crdt::Peripheral>,
3836 ) -> (
3837 Option<String>,
3838 Option<u8>,
3839 Option<u8>,
3840 Option<u8>,
3841 Option<f32>,
3842 Option<f32>,
3843 Option<f32>,
3844 ) {
3845 match peripheral {
3846 Some(p) => {
3847 let callsign = {
3848 let s = p.callsign_str();
3849 if s.is_empty() {
3850 None
3851 } else {
3852 Some(s.to_string())
3853 }
3854 };
3855 let battery = if p.health.battery_percent > 0 {
3856 Some(p.health.battery_percent)
3857 } else {
3858 None
3859 };
3860 let heart_rate = p.health.heart_rate;
3861 let event_type = p.last_event.as_ref().map(|e| e.event_type as u8);
3862 let (lat, lon, alt) = match &p.location {
3863 Some(loc) => (Some(loc.latitude), Some(loc.longitude), loc.altitude),
3864 None => (None, None, None),
3865 };
3866 (callsign, battery, heart_rate, event_type, lat, lon, alt)
3867 }
3868 None => (None, None, None, None, None, None, None),
3869 }
3870 }
3871}
3872
3873#[derive(Debug, Clone)]
3875pub struct RelayDecision {
3876 pub payload: Vec<u8>,
3878
3879 pub origin_node: NodeId,
3881
3882 pub hop_count: u8,
3884
3885 pub should_relay: bool,
3887
3888 pub relay_envelope: Option<RelayEnvelope>,
3892}
3893
3894impl RelayDecision {
3895 pub fn relay_data(&self) -> Option<Vec<u8>> {
3899 self.relay_envelope.as_ref().map(|e| e.encode())
3900 }
3901}
3902
3903#[cfg(all(test, feature = "std"))]
3904mod tests {
3905 use super::*;
3906 use crate::observer::CollectingObserver;
3907
3908 const TEST_TIMESTAMP: u64 = 1705276800000;
3910
3911 fn create_mesh(node_id: u32, callsign: &str) -> PeatMesh {
3912 let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST");
3913 PeatMesh::new(config)
3914 }
3915
3916 #[test]
3917 fn test_mesh_creation() {
3918 let mesh = create_mesh(0x12345678, "ALPHA-1");
3919
3920 assert_eq!(mesh.node_id().as_u32(), 0x12345678);
3921 assert_eq!(mesh.callsign(), "ALPHA-1");
3922 assert_eq!(mesh.mesh_id(), "TEST");
3923 assert_eq!(mesh.device_name(), "PEAT_TEST-12345678");
3924 }
3925
3926 #[test]
3927 fn test_peer_discovery() {
3928 let mesh = create_mesh(0x11111111, "ALPHA-1");
3929 let observer = Arc::new(CollectingObserver::new());
3930 mesh.add_observer(observer.clone());
3931
3932 let peer = mesh.on_ble_discovered(
3934 "device-uuid",
3935 Some("PEAT_TEST-22222222"),
3936 -65,
3937 Some("TEST"),
3938 1000,
3939 );
3940
3941 assert!(peer.is_some());
3942 let peer = peer.unwrap();
3943 assert_eq!(peer.node_id.as_u32(), 0x22222222);
3944
3945 let events = observer.events();
3947 assert!(events
3948 .iter()
3949 .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
3950 assert!(events
3951 .iter()
3952 .any(|e| matches!(e, PeatEvent::MeshStateChanged { .. })));
3953 }
3954
3955 #[test]
3956 fn test_connection_lifecycle() {
3957 let mesh = create_mesh(0x11111111, "ALPHA-1");
3958 let observer = Arc::new(CollectingObserver::new());
3959 mesh.add_observer(observer.clone());
3960
3961 mesh.on_ble_discovered(
3963 "device-uuid",
3964 Some("PEAT_TEST-22222222"),
3965 -65,
3966 Some("TEST"),
3967 1000,
3968 );
3969
3970 let node_id = mesh.on_ble_connected("device-uuid", 2000);
3971 assert_eq!(node_id, Some(NodeId::new(0x22222222)));
3972 assert_eq!(mesh.connected_count(), 1);
3973
3974 let node_id = mesh.on_ble_disconnected("device-uuid", DisconnectReason::RemoteRequest);
3976 assert_eq!(node_id, Some(NodeId::new(0x22222222)));
3977 assert_eq!(mesh.connected_count(), 0);
3978
3979 let events = observer.events();
3981 assert!(events
3982 .iter()
3983 .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
3984 assert!(events
3985 .iter()
3986 .any(|e| matches!(e, PeatEvent::PeerDisconnected { .. })));
3987 }
3988
3989 #[test]
3990 fn test_emergency_flow() {
3991 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3992 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3993
3994 let observer2 = Arc::new(CollectingObserver::new());
3995 mesh2.add_observer(observer2.clone());
3996
3997 let doc = mesh1.send_emergency(TEST_TIMESTAMP);
3999 assert!(mesh1.is_emergency_active());
4000
4001 let result =
4003 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4004
4005 assert!(result.is_some());
4006 let result = result.unwrap();
4007 assert!(result.is_emergency);
4008 assert_eq!(result.source_node.as_u32(), 0x11111111);
4009
4010 let events = observer2.events();
4012 assert!(events
4013 .iter()
4014 .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
4015 }
4016
4017 #[test]
4018 fn test_ack_flow() {
4019 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4020 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4021
4022 let observer2 = Arc::new(CollectingObserver::new());
4023 mesh2.add_observer(observer2.clone());
4024
4025 let doc = mesh1.send_ack(TEST_TIMESTAMP);
4027 assert!(mesh1.is_ack_active());
4028
4029 let result =
4031 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4032
4033 assert!(result.is_some());
4034 let result = result.unwrap();
4035 assert!(result.is_ack);
4036
4037 let events = observer2.events();
4039 assert!(events
4040 .iter()
4041 .any(|e| matches!(e, PeatEvent::AckReceived { .. })));
4042 }
4043
4044 #[test]
4045 fn test_tick_cleanup() {
4046 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4047 .with_peer_timeout(10_000);
4048 let mesh = PeatMesh::new(config);
4049
4050 let observer = Arc::new(CollectingObserver::new());
4051 mesh.add_observer(observer.clone());
4052
4053 mesh.on_ble_discovered(
4055 "device-uuid",
4056 Some("PEAT_TEST-22222222"),
4057 -65,
4058 Some("TEST"),
4059 1000,
4060 );
4061 assert_eq!(mesh.peer_count(), 1);
4062
4063 mesh.tick(5000);
4065 assert_eq!(mesh.peer_count(), 1);
4066
4067 mesh.tick(20000);
4069 assert_eq!(mesh.peer_count(), 0);
4070
4071 let events = observer.events();
4073 assert!(events
4074 .iter()
4075 .any(|e| matches!(e, PeatEvent::PeerLost { .. })));
4076 }
4077
4078 #[test]
4079 fn test_tick_sync_broadcast() {
4080 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4081 .with_sync_interval(5000);
4082 let mesh = PeatMesh::new(config);
4083
4084 mesh.on_ble_discovered(
4086 "device-uuid",
4087 Some("PEAT_TEST-22222222"),
4088 -65,
4089 Some("TEST"),
4090 1000,
4091 );
4092 mesh.on_ble_connected("device-uuid", 1000);
4093
4094 let _result = mesh.tick(0);
4096 let result = mesh.tick(3000);
4100 assert!(result.is_none());
4101
4102 let result = mesh.tick(6000);
4104 assert!(result.is_some());
4105
4106 let result = mesh.tick(6100);
4108 assert!(result.is_none());
4109
4110 let result = mesh.tick(12000);
4112 assert!(result.is_some());
4113 }
4114
4115 #[test]
4116 fn test_incoming_connection() {
4117 let mesh = create_mesh(0x11111111, "ALPHA-1");
4118 let observer = Arc::new(CollectingObserver::new());
4119 mesh.add_observer(observer.clone());
4120
4121 let is_new = mesh.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
4123
4124 assert!(is_new);
4125 assert_eq!(mesh.peer_count(), 1);
4126 assert_eq!(mesh.connected_count(), 1);
4127
4128 let events = observer.events();
4130 assert!(events
4131 .iter()
4132 .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
4133 assert!(events
4134 .iter()
4135 .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
4136 }
4137
4138 #[test]
4139 fn test_mesh_filtering() {
4140 let mesh = create_mesh(0x11111111, "ALPHA-1");
4141
4142 let peer = mesh.on_ble_discovered(
4144 "device-uuid-1",
4145 Some("PEAT_OTHER-22222222"),
4146 -65,
4147 Some("OTHER"),
4148 1000,
4149 );
4150 assert!(peer.is_none());
4151 assert_eq!(mesh.peer_count(), 0);
4152
4153 let peer = mesh.on_ble_discovered(
4155 "device-uuid-2",
4156 Some("PEAT_TEST-33333333"),
4157 -65,
4158 Some("TEST"),
4159 1000,
4160 );
4161 assert!(peer.is_some());
4162 assert_eq!(mesh.peer_count(), 1);
4163 }
4164
4165 fn create_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
4168 let config =
4169 PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_encryption(secret);
4170 PeatMesh::new(config)
4171 }
4172
4173 #[test]
4174 fn test_encryption_enabled() {
4175 let secret = [0x42u8; 32];
4176 let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4177
4178 assert!(mesh.is_encryption_enabled());
4179 }
4180
4181 #[test]
4182 fn test_encryption_disabled_by_default() {
4183 let mesh = create_mesh(0x11111111, "ALPHA-1");
4184
4185 assert!(!mesh.is_encryption_enabled());
4186 }
4187
4188 #[test]
4189 fn test_encrypted_document_exchange() {
4190 let secret = [0x42u8; 32];
4191 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4192 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4193
4194 let doc = mesh1.build_document();
4196
4197 assert!(doc.len() >= 2);
4199 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4200
4201 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4203
4204 assert!(result.is_some());
4205 let result = result.unwrap();
4206 assert_eq!(result.source_node.as_u32(), 0x11111111);
4207 }
4208
4209 #[test]
4210 fn test_encrypted_emergency_exchange() {
4211 let secret = [0x42u8; 32];
4212 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4213 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4214
4215 let observer = Arc::new(CollectingObserver::new());
4216 mesh2.add_observer(observer.clone());
4217
4218 let doc = mesh1.send_emergency(TEST_TIMESTAMP);
4220
4221 let result =
4223 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
4224
4225 assert!(result.is_some());
4226 let result = result.unwrap();
4227 assert!(result.is_emergency);
4228
4229 let events = observer.events();
4231 assert!(events
4232 .iter()
4233 .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
4234 }
4235
4236 #[test]
4237 fn test_wrong_key_fails_decrypt() {
4238 let secret1 = [0x42u8; 32];
4239 let secret2 = [0x43u8; 32]; let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4241 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4242
4243 let doc = mesh1.build_document();
4245
4246 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4248
4249 assert!(result.is_none());
4250 }
4251
4252 #[test]
4253 fn test_unencrypted_mesh_can_read_unencrypted() {
4254 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4255 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4256
4257 let doc = mesh1.build_document();
4259
4260 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4262
4263 assert!(result.is_some());
4264 }
4265
4266 #[test]
4267 fn test_encrypted_mesh_can_receive_unencrypted() {
4268 let secret = [0x42u8; 32];
4270 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); let doc = mesh1.build_document();
4275
4276 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4278
4279 assert!(result.is_some());
4280 }
4281
4282 #[test]
4283 fn test_unencrypted_mesh_cannot_receive_encrypted() {
4284 let secret = [0x42u8; 32];
4285 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret); let mesh2 = create_mesh(0x22222222, "BRAVO-1"); let doc = mesh1.build_document();
4290
4291 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4293
4294 assert!(result.is_none());
4295 }
4296
4297 #[test]
4298 fn test_enable_disable_encryption() {
4299 let mut mesh = create_mesh(0x11111111, "ALPHA-1");
4300
4301 assert!(!mesh.is_encryption_enabled());
4302
4303 let secret = [0x42u8; 32];
4305 mesh.enable_encryption(&secret);
4306 assert!(mesh.is_encryption_enabled());
4307
4308 let doc = mesh.build_document();
4310 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4311
4312 mesh.disable_encryption();
4314 assert!(!mesh.is_encryption_enabled());
4315
4316 let doc = mesh.build_document();
4318 assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4319 }
4320
4321 #[test]
4322 fn test_encryption_overhead() {
4323 let secret = [0x42u8; 32];
4324 let mesh_encrypted = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4325 let mesh_unencrypted = create_mesh(0x22222222, "BRAVO-1");
4326
4327 let doc_encrypted = mesh_encrypted.build_document();
4328 let doc_unencrypted = mesh_unencrypted.build_document();
4329
4330 let overhead = doc_encrypted.len() - doc_unencrypted.len();
4336 assert_eq!(overhead, 30); }
4338
4339 #[test]
4342 fn test_peer_e2ee_enable_disable() {
4343 let mesh = create_mesh(0x11111111, "ALPHA-1");
4344
4345 assert!(!mesh.is_peer_e2ee_enabled());
4346 assert!(mesh.peer_e2ee_public_key().is_none());
4347
4348 mesh.enable_peer_e2ee();
4349 assert!(mesh.is_peer_e2ee_enabled());
4350 assert!(mesh.peer_e2ee_public_key().is_some());
4351
4352 mesh.disable_peer_e2ee();
4353 assert!(!mesh.is_peer_e2ee_enabled());
4354 }
4355
4356 #[test]
4357 fn test_peer_e2ee_initiate_session() {
4358 let mesh = create_mesh(0x11111111, "ALPHA-1");
4359 mesh.enable_peer_e2ee();
4360
4361 let key_exchange = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4362 assert!(key_exchange.is_some());
4363
4364 let key_exchange = key_exchange.unwrap();
4365 assert_eq!(key_exchange[0], crate::document::KEY_EXCHANGE_MARKER);
4367
4368 assert_eq!(mesh.peer_e2ee_session_count(), 1);
4370 assert_eq!(mesh.peer_e2ee_established_count(), 0);
4371 }
4372
4373 #[test]
4374 fn test_peer_e2ee_full_handshake() {
4375 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4376 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4377
4378 mesh1.enable_peer_e2ee();
4379 mesh2.enable_peer_e2ee();
4380
4381 let observer1 = Arc::new(CollectingObserver::new());
4382 let observer2 = Arc::new(CollectingObserver::new());
4383 mesh1.add_observer(observer1.clone());
4384 mesh2.add_observer(observer2.clone());
4385
4386 let key_exchange1 = mesh1
4388 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4389 .unwrap();
4390
4391 let response = mesh2.handle_key_exchange(&key_exchange1, 1000);
4393 assert!(response.is_some());
4394
4395 assert!(mesh2.has_peer_e2ee_session(NodeId::new(0x11111111)));
4397
4398 let key_exchange2 = response.unwrap();
4400 let _ = mesh1.handle_key_exchange(&key_exchange2, 1000);
4401
4402 assert!(mesh1.has_peer_e2ee_session(NodeId::new(0x22222222)));
4404
4405 let events1 = observer1.events();
4407 assert!(events1
4408 .iter()
4409 .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4410
4411 let events2 = observer2.events();
4412 assert!(events2
4413 .iter()
4414 .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4415 }
4416
4417 #[test]
4418 fn test_peer_e2ee_encrypt_decrypt() {
4419 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4420 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4421
4422 mesh1.enable_peer_e2ee();
4423 mesh2.enable_peer_e2ee();
4424
4425 let key_exchange1 = mesh1
4427 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4428 .unwrap();
4429 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4430 mesh1.handle_key_exchange(&key_exchange2, 1000);
4431
4432 let plaintext = b"Secret message from mesh1";
4434 let encrypted = mesh1.send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000);
4435 assert!(encrypted.is_some());
4436
4437 let encrypted = encrypted.unwrap();
4438 assert_eq!(encrypted[0], crate::document::PEER_E2EE_MARKER);
4440
4441 let observer2 = Arc::new(CollectingObserver::new());
4443 mesh2.add_observer(observer2.clone());
4444
4445 let decrypted = mesh2.handle_peer_e2ee_message(&encrypted, 2000);
4446 assert!(decrypted.is_some());
4447 assert_eq!(decrypted.unwrap(), plaintext);
4448
4449 let events = observer2.events();
4451 assert!(events.iter().any(|e| matches!(
4452 e,
4453 PeatEvent::PeerE2eeMessageReceived { from_node, data }
4454 if from_node.as_u32() == 0x11111111 && data == plaintext
4455 )));
4456 }
4457
4458 #[test]
4459 fn test_peer_e2ee_bidirectional() {
4460 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4461 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4462
4463 mesh1.enable_peer_e2ee();
4464 mesh2.enable_peer_e2ee();
4465
4466 let key_exchange1 = mesh1
4468 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4469 .unwrap();
4470 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4471 mesh1.handle_key_exchange(&key_exchange2, 1000);
4472
4473 let msg1 = mesh1
4475 .send_peer_e2ee(NodeId::new(0x22222222), b"Hello from mesh1", 2000)
4476 .unwrap();
4477 let dec1 = mesh2.handle_peer_e2ee_message(&msg1, 2000).unwrap();
4478 assert_eq!(dec1, b"Hello from mesh1");
4479
4480 let msg2 = mesh2
4482 .send_peer_e2ee(NodeId::new(0x11111111), b"Hello from mesh2", 2000)
4483 .unwrap();
4484 let dec2 = mesh1.handle_peer_e2ee_message(&msg2, 2000).unwrap();
4485 assert_eq!(dec2, b"Hello from mesh2");
4486 }
4487
4488 #[test]
4489 fn test_peer_e2ee_close_session() {
4490 let mesh = create_mesh(0x11111111, "ALPHA-1");
4491 mesh.enable_peer_e2ee();
4492
4493 let observer = Arc::new(CollectingObserver::new());
4494 mesh.add_observer(observer.clone());
4495
4496 mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4498 assert_eq!(mesh.peer_e2ee_session_count(), 1);
4499
4500 mesh.close_peer_e2ee(NodeId::new(0x22222222));
4502
4503 let events = observer.events();
4505 assert!(events
4506 .iter()
4507 .any(|e| matches!(e, PeatEvent::PeerE2eeClosed { .. })));
4508 }
4509
4510 #[test]
4511 fn test_peer_e2ee_without_enabling() {
4512 let mesh = create_mesh(0x11111111, "ALPHA-1");
4513
4514 let result = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4516 assert!(result.is_none());
4517
4518 let result = mesh.send_peer_e2ee(NodeId::new(0x22222222), b"test", 1000);
4519 assert!(result.is_none());
4520
4521 assert!(!mesh.has_peer_e2ee_session(NodeId::new(0x22222222)));
4522 }
4523
4524 #[test]
4525 fn test_peer_e2ee_overhead() {
4526 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4527 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4528
4529 mesh1.enable_peer_e2ee();
4530 mesh2.enable_peer_e2ee();
4531
4532 let key_exchange1 = mesh1
4534 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4535 .unwrap();
4536 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4537 mesh1.handle_key_exchange(&key_exchange2, 1000);
4538
4539 let plaintext = b"Test message";
4541 let encrypted = mesh1
4542 .send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000)
4543 .unwrap();
4544
4545 let overhead = encrypted.len() - plaintext.len();
4554 assert_eq!(overhead, 46);
4555 }
4556
4557 fn create_strict_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
4560 let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST")
4561 .with_encryption(secret)
4562 .with_strict_encryption();
4563 PeatMesh::new(config)
4564 }
4565
4566 #[test]
4567 fn test_strict_encryption_enabled() {
4568 let secret = [0x42u8; 32];
4569 let mesh = create_strict_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4570
4571 assert!(mesh.is_encryption_enabled());
4572 assert!(mesh.is_strict_encryption_enabled());
4573 }
4574
4575 #[test]
4576 fn test_strict_encryption_disabled_by_default() {
4577 let secret = [0x42u8; 32];
4578 let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4579
4580 assert!(mesh.is_encryption_enabled());
4581 assert!(!mesh.is_strict_encryption_enabled());
4582 }
4583
4584 #[test]
4585 fn test_strict_encryption_requires_encryption_enabled() {
4586 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4588 .with_strict_encryption(); let mesh = PeatMesh::new(config);
4590
4591 assert!(!mesh.is_encryption_enabled());
4592 assert!(!mesh.is_strict_encryption_enabled());
4593 }
4594
4595 #[test]
4596 fn test_strict_mode_accepts_encrypted_documents() {
4597 let secret = [0x42u8; 32];
4598 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4599 let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4600
4601 let doc = mesh1.build_document();
4603 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4604
4605 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4607 assert!(result.is_some());
4608 }
4609
4610 #[test]
4611 fn test_strict_mode_rejects_unencrypted_documents() {
4612 let secret = [0x42u8; 32];
4613 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret); let observer = Arc::new(CollectingObserver::new());
4617 mesh2.add_observer(observer.clone());
4618
4619 let doc = mesh1.build_document();
4621 assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4622
4623 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4625 assert!(result.is_none());
4626
4627 let events = observer.events();
4629 assert!(events.iter().any(|e| matches!(
4630 e,
4631 PeatEvent::SecurityViolation {
4632 kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4633 ..
4634 }
4635 )));
4636 }
4637
4638 #[test]
4639 fn test_non_strict_mode_accepts_unencrypted_documents() {
4640 let secret = [0x42u8; 32];
4641 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); let doc = mesh1.build_document();
4646
4647 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4649 assert!(result.is_some());
4650 }
4651
4652 #[test]
4653 fn test_strict_mode_security_violation_event_includes_source() {
4654 let secret = [0x42u8; 32];
4655 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4656 let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4657
4658 let observer = Arc::new(CollectingObserver::new());
4659 mesh2.add_observer(observer.clone());
4660
4661 let doc = mesh1.build_document();
4662
4663 mesh2.on_ble_discovered(
4665 "test-device-uuid",
4666 Some("PEAT_TEST-11111111"),
4667 -65,
4668 Some("TEST"),
4669 500,
4670 );
4671 mesh2.on_ble_connected("test-device-uuid", 600);
4672
4673 let _result = mesh2.on_ble_data_received("test-device-uuid", &doc, 1000);
4674
4675 let events = observer.events();
4677 let violation = events.iter().find(|e| {
4678 matches!(
4679 e,
4680 PeatEvent::SecurityViolation {
4681 kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4682 ..
4683 }
4684 )
4685 });
4686 assert!(violation.is_some());
4687
4688 if let Some(PeatEvent::SecurityViolation { source, .. }) = violation {
4689 assert!(source.is_some());
4690 assert_eq!(source.as_ref().unwrap(), "test-device-uuid");
4691 }
4692 }
4693
4694 #[test]
4695 fn test_decryption_failure_emits_security_violation() {
4696 let secret1 = [0x42u8; 32];
4697 let secret2 = [0x43u8; 32]; let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4699 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4700
4701 let observer = Arc::new(CollectingObserver::new());
4702 mesh2.add_observer(observer.clone());
4703
4704 let doc = mesh1.build_document();
4706
4707 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4709 assert!(result.is_none());
4710
4711 let events = observer.events();
4713 assert!(events.iter().any(|e| matches!(
4714 e,
4715 PeatEvent::SecurityViolation {
4716 kind: crate::observer::SecurityViolationKind::DecryptionFailed,
4717 ..
4718 }
4719 )));
4720 }
4721
4722 #[test]
4723 fn test_strict_mode_builder_chain() {
4724 let secret = [0x42u8; 32];
4725 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4726 .with_encryption(secret)
4727 .with_strict_encryption()
4728 .with_sync_interval(10_000)
4729 .with_peer_timeout(60_000);
4730
4731 let mesh = PeatMesh::new(config);
4732
4733 assert!(mesh.is_encryption_enabled());
4734 assert!(mesh.is_strict_encryption_enabled());
4735 }
4736
4737 fn create_relay_mesh(node_id: u32, callsign: &str) -> PeatMesh {
4740 let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_relay();
4741 PeatMesh::new(config)
4742 }
4743
4744 #[test]
4745 fn test_relay_disabled_by_default() {
4746 let mesh = create_mesh(0x11111111, "ALPHA-1");
4747 assert!(!mesh.is_relay_enabled());
4748 }
4749
4750 #[test]
4751 fn test_relay_enabled() {
4752 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4753 assert!(mesh.is_relay_enabled());
4754 }
4755
4756 #[test]
4757 fn test_relay_config_builder() {
4758 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4759 .with_relay()
4760 .with_max_relay_hops(5)
4761 .with_relay_fanout(3)
4762 .with_seen_cache_ttl(60_000);
4763
4764 assert!(config.enable_relay);
4765 assert_eq!(config.max_relay_hops, 5);
4766 assert_eq!(config.relay_fanout, 3);
4767 assert_eq!(config.seen_cache_ttl_ms, 60_000);
4768 }
4769
4770 #[test]
4771 fn test_seen_message_deduplication() {
4772 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4773 let origin = NodeId::new(0x22222222);
4774 let msg_id = crate::relay::MessageId::from_content(origin, 1000, 0xDEADBEEF);
4775
4776 assert!(mesh.mark_message_seen(msg_id, origin, 1000));
4778
4779 assert!(!mesh.mark_message_seen(msg_id, origin, 2000));
4781
4782 assert_eq!(mesh.seen_cache_size(), 1);
4783 }
4784
4785 #[test]
4786 fn test_wrap_for_relay() {
4787 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4788
4789 let payload = vec![1, 2, 3, 4, 5];
4790 let wrapped = mesh.wrap_for_relay(payload.clone());
4791
4792 assert_eq!(wrapped[0], crate::relay::RELAY_ENVELOPE_MARKER);
4794
4795 let envelope = crate::relay::RelayEnvelope::decode(&wrapped).unwrap();
4797 assert_eq!(envelope.payload, payload);
4798 assert_eq!(envelope.origin_node, NodeId::new(0x11111111));
4799 assert_eq!(envelope.hop_count, 0);
4800 }
4801
4802 #[test]
4803 fn test_process_relay_envelope_new_message() {
4804 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4805 let observer = Arc::new(CollectingObserver::new());
4806 mesh.add_observer(observer.clone());
4807
4808 let payload = vec![1, 2, 3, 4, 5];
4810 let envelope =
4811 crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4812 .with_max_hops(7);
4813 let data = envelope.encode();
4814
4815 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4817
4818 assert!(decision.is_some());
4819 let decision = decision.unwrap();
4820 assert_eq!(decision.payload, payload);
4821 assert_eq!(decision.origin_node.as_u32(), 0x22222222);
4822 assert_eq!(decision.hop_count, 0);
4823 assert!(decision.should_relay);
4824 assert!(decision.relay_envelope.is_some());
4825
4826 let relay_env = decision.relay_envelope.unwrap();
4828 assert_eq!(relay_env.hop_count, 1);
4829 }
4830
4831 #[test]
4832 fn test_process_relay_envelope_duplicate() {
4833 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4834 let observer = Arc::new(CollectingObserver::new());
4835 mesh.add_observer(observer.clone());
4836
4837 let payload = vec![1, 2, 3, 4, 5];
4838 let envelope = crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload);
4839 let data = envelope.encode();
4840
4841 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4843 assert!(decision.is_some());
4844
4845 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 2000);
4847 assert!(decision.is_none());
4848
4849 let events = observer.events();
4851 assert!(events
4852 .iter()
4853 .any(|e| matches!(e, PeatEvent::DuplicateMessageDropped { .. })));
4854 }
4855
4856 #[test]
4857 fn test_process_relay_envelope_ttl_expired() {
4858 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4859 let observer = Arc::new(CollectingObserver::new());
4860 mesh.add_observer(observer.clone());
4861
4862 let payload = vec![1, 2, 3, 4, 5];
4864 let mut envelope =
4865 crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4866 .with_max_hops(3);
4867
4868 envelope = envelope.relay().unwrap(); envelope = envelope.relay().unwrap(); envelope = envelope.relay().unwrap(); let data = envelope.encode();
4874
4875 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4877
4878 assert!(decision.is_some());
4879 let decision = decision.unwrap();
4880 assert_eq!(decision.payload, payload);
4881 assert!(!decision.should_relay); assert!(decision.relay_envelope.is_none());
4883
4884 let events = observer.events();
4886 assert!(events
4887 .iter()
4888 .any(|e| matches!(e, PeatEvent::MessageTtlExpired { .. })));
4889 }
4890
4891 #[test]
4892 fn test_build_relay_document() {
4893 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4894
4895 let relay_doc = mesh.build_relay_document();
4896
4897 assert_eq!(relay_doc[0], crate::relay::RELAY_ENVELOPE_MARKER);
4899
4900 let envelope = crate::relay::RelayEnvelope::decode(&relay_doc).unwrap();
4902 assert_eq!(envelope.origin_node.as_u32(), 0x11111111);
4903
4904 let doc = crate::document::PeatDocument::decode(&envelope.payload);
4906 assert!(doc.is_some());
4907 }
4908
4909 #[test]
4910 fn test_relay_targets_excludes_source() {
4911 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4912
4913 mesh.on_ble_discovered(
4915 "peer-1",
4916 Some("PEAT_TEST-22222222"),
4917 -60,
4918 Some("TEST"),
4919 1000,
4920 );
4921 mesh.on_ble_connected("peer-1", 1000);
4922
4923 mesh.on_ble_discovered(
4924 "peer-2",
4925 Some("PEAT_TEST-33333333"),
4926 -65,
4927 Some("TEST"),
4928 1000,
4929 );
4930 mesh.on_ble_connected("peer-2", 1000);
4931
4932 mesh.on_ble_discovered(
4933 "peer-3",
4934 Some("PEAT_TEST-44444444"),
4935 -70,
4936 Some("TEST"),
4937 1000,
4938 );
4939 mesh.on_ble_connected("peer-3", 1000);
4940
4941 let targets = mesh.get_relay_targets(Some(NodeId::new(0x33333333)));
4943
4944 assert!(targets.iter().all(|p| p.node_id.as_u32() != 0x33333333));
4946 }
4947
4948 #[test]
4949 fn test_clear_seen_cache() {
4950 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4951 let origin = NodeId::new(0x22222222);
4952
4953 mesh.mark_message_seen(
4955 crate::relay::MessageId::from_content(origin, 1000, 0x11111111),
4956 origin,
4957 1000,
4958 );
4959 mesh.mark_message_seen(
4960 crate::relay::MessageId::from_content(origin, 2000, 0x22222222),
4961 origin,
4962 2000,
4963 );
4964
4965 assert_eq!(mesh.seen_cache_size(), 2);
4966
4967 mesh.clear_seen_cache();
4969 assert_eq!(mesh.seen_cache_size(), 0);
4970 }
4971}