1#[cfg(not(feature = "std"))]
57use alloc::{string::String, sync::Arc, vec::Vec};
58#[cfg(feature = "std")]
59use std::collections::HashMap;
60#[cfg(feature = "std")]
61use std::sync::Arc;
62
63use crate::document::{ENCRYPTED_MARKER, KEY_EXCHANGE_MARKER, PEER_E2EE_MARKER};
64use crate::document_sync::DocumentSync;
65use crate::gossip::{GossipStrategy, RandomFanout};
66use crate::observer::{DisconnectReason, PeatEvent, PeatObserver, SecurityViolationKind};
67use crate::peer::{
68 ConnectionStateGraph, FullStateCountSummary, IndirectPeer, PeatPeer, PeerConnectionState,
69 PeerDegree, PeerManagerConfig, StateCountSummary,
70};
71use crate::peer_manager::PeerManager;
72use crate::relay::{
73 MessageId, RelayEnvelope, SeenMessageCache, DEFAULT_MAX_HOPS, DEFAULT_SEEN_TTL_MS,
74 RELAY_ENVELOPE_MARKER,
75};
76use crate::security::{
77 DeviceIdentity, IdentityAttestation, IdentityRegistry, KeyExchangeMessage, MeshEncryptionKey,
78 PeerEncryptedMessage, PeerSessionManager, RegistryResult, SessionState,
79};
80use crate::sync::crdt::{EventType, Peripheral, PeripheralType};
81use crate::sync::delta::{DeltaEncoder, DeltaStats};
82use crate::sync::delta_document::{DeltaDocument, Operation};
83use crate::NodeId;
84
85#[cfg(feature = "std")]
86use crate::observer::ObserverManager;
87
88use crate::registry::DocumentRegistry;
89
90#[derive(Debug, Clone)]
92pub struct PeatMeshConfig {
93 pub node_id: NodeId,
95
96 pub callsign: String,
98
99 pub mesh_id: String,
101
102 pub peripheral_type: PeripheralType,
104
105 pub peer_config: PeerManagerConfig,
107
108 pub sync_interval_ms: u64,
110
111 pub auto_broadcast_events: bool,
113
114 pub encryption_secret: Option<[u8; 32]>,
120
121 pub strict_encryption: bool,
129
130 pub enable_relay: bool,
137
138 pub max_relay_hops: u8,
143
144 pub relay_fanout: usize,
150
151 pub seen_cache_ttl_ms: u64,
156}
157
158impl PeatMeshConfig {
159 pub fn new(node_id: NodeId, callsign: &str, mesh_id: &str) -> Self {
161 Self {
162 node_id,
163 callsign: callsign.into(),
164 mesh_id: mesh_id.into(),
165 peripheral_type: PeripheralType::SoldierSensor,
166 peer_config: PeerManagerConfig::with_mesh_id(mesh_id),
167 sync_interval_ms: 5000,
168 auto_broadcast_events: true,
169 encryption_secret: None,
170 strict_encryption: false,
171 enable_relay: false,
172 max_relay_hops: DEFAULT_MAX_HOPS,
173 relay_fanout: 2,
174 seen_cache_ttl_ms: DEFAULT_SEEN_TTL_MS,
175 }
176 }
177
178 pub fn with_encryption(mut self, secret: [u8; 32]) -> Self {
183 self.encryption_secret = Some(secret);
184 self
185 }
186
187 pub fn with_peripheral_type(mut self, ptype: PeripheralType) -> Self {
189 self.peripheral_type = ptype;
190 self
191 }
192
193 pub fn with_sync_interval(mut self, interval_ms: u64) -> Self {
195 self.sync_interval_ms = interval_ms;
196 self
197 }
198
199 pub fn with_peer_timeout(mut self, timeout_ms: u64) -> Self {
201 self.peer_config.peer_timeout_ms = timeout_ms;
202 self
203 }
204
205 pub fn with_max_peers(mut self, max: usize) -> Self {
207 self.peer_config.max_peers = max;
208 self
209 }
210
211 pub fn with_strict_encryption(mut self) -> Self {
219 self.strict_encryption = true;
220 self
221 }
222
223 pub fn with_relay(mut self) -> Self {
228 self.enable_relay = true;
229 self
230 }
231
232 pub fn with_max_relay_hops(mut self, max_hops: u8) -> Self {
236 self.max_relay_hops = max_hops;
237 self
238 }
239
240 pub fn with_relay_fanout(mut self, fanout: usize) -> Self {
244 self.relay_fanout = fanout.max(1);
245 self
246 }
247
248 pub fn with_seen_cache_ttl(mut self, ttl_ms: u64) -> Self {
252 self.seen_cache_ttl_ms = ttl_ms;
253 self
254 }
255}
256
257#[cfg(feature = "std")]
259type AppDocumentStore =
260 std::sync::RwLock<HashMap<(u8, u32, u64), Box<dyn core::any::Any + Send + Sync>>>;
261
262#[cfg(feature = "std")]
267pub struct PeatMesh {
268 config: PeatMeshConfig,
270
271 peer_manager: PeerManager,
273
274 document_sync: DocumentSync,
276
277 observers: ObserverManager,
279
280 last_sync_ms: std::sync::atomic::AtomicU32,
282
283 last_cleanup_ms: std::sync::atomic::AtomicU32,
285
286 encryption_key: Option<MeshEncryptionKey>,
288
289 peer_sessions: std::sync::Mutex<Option<PeerSessionManager>>,
291
292 connection_graph: std::sync::Mutex<ConnectionStateGraph>,
294
295 seen_cache: std::sync::Mutex<SeenMessageCache>,
297
298 gossip_strategy: Box<dyn GossipStrategy>,
300
301 delta_encoder: std::sync::Mutex<DeltaEncoder>,
306
307 identity: Option<DeviceIdentity>,
312
313 identity_registry: std::sync::Mutex<IdentityRegistry>,
317
318 peer_peripherals: std::sync::RwLock<HashMap<NodeId, Peripheral>>,
323
324 document_registry: DocumentRegistry,
329
330 app_documents: AppDocumentStore,
336}
337
338#[cfg(feature = "std")]
339impl PeatMesh {
340 pub fn new(config: PeatMeshConfig) -> Self {
342 let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
343 let document_sync = DocumentSync::with_peripheral_type(
344 config.node_id,
345 &config.callsign,
346 config.peripheral_type,
347 );
348
349 let encryption_key = config
351 .encryption_secret
352 .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
353
354 let connection_graph = ConnectionStateGraph::with_config(
356 config.peer_config.rssi_degraded_threshold,
357 config.peer_config.lost_timeout_ms,
358 );
359
360 let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
362
363 let gossip_strategy: Box<dyn GossipStrategy> =
365 Box::new(RandomFanout::new(config.relay_fanout));
366
367 let delta_encoder = DeltaEncoder::new(config.node_id);
369
370 let document_registry = DocumentRegistry::new();
371
372 Self {
373 config,
374 peer_manager,
375 document_sync,
376 observers: ObserverManager::new(),
377 last_sync_ms: std::sync::atomic::AtomicU32::new(0),
378 last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
379 encryption_key,
380 peer_sessions: std::sync::Mutex::new(None),
381 connection_graph: std::sync::Mutex::new(connection_graph),
382 seen_cache: std::sync::Mutex::new(seen_cache),
383 gossip_strategy,
384 delta_encoder: std::sync::Mutex::new(delta_encoder),
385 identity: None,
386 identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
387 peer_peripherals: std::sync::RwLock::new(HashMap::new()),
388 document_registry,
389 app_documents: std::sync::RwLock::new(HashMap::new()),
390 }
391 }
392
393 pub fn with_identity(config: PeatMeshConfig, identity: DeviceIdentity) -> Self {
399 let mut config = config;
401 config.node_id = identity.node_id();
402
403 let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
404 let document_sync = DocumentSync::with_peripheral_type(
405 config.node_id,
406 &config.callsign,
407 config.peripheral_type,
408 );
409
410 let encryption_key = config
411 .encryption_secret
412 .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
413
414 let connection_graph = ConnectionStateGraph::with_config(
415 config.peer_config.rssi_degraded_threshold,
416 config.peer_config.lost_timeout_ms,
417 );
418
419 let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
420 let gossip_strategy: Box<dyn GossipStrategy> =
421 Box::new(RandomFanout::new(config.relay_fanout));
422 let delta_encoder = DeltaEncoder::new(config.node_id);
423
424 let document_registry = DocumentRegistry::new();
425
426 Self {
427 config,
428 peer_manager,
429 document_sync,
430 observers: ObserverManager::new(),
431 last_sync_ms: std::sync::atomic::AtomicU32::new(0),
432 last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
433 encryption_key,
434 peer_sessions: std::sync::Mutex::new(None),
435 connection_graph: std::sync::Mutex::new(connection_graph),
436 seen_cache: std::sync::Mutex::new(seen_cache),
437 gossip_strategy,
438 delta_encoder: std::sync::Mutex::new(delta_encoder),
439 identity: Some(identity),
440 identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
441 peer_peripherals: std::sync::RwLock::new(HashMap::new()),
442 document_registry,
443 app_documents: std::sync::RwLock::new(HashMap::new()),
444 }
445 }
446
447 pub fn from_genesis(
455 genesis: &crate::security::MeshGenesis,
456 identity: DeviceIdentity,
457 callsign: &str,
458 ) -> Self {
459 let config = PeatMeshConfig::new(identity.node_id(), callsign, &genesis.mesh_id())
460 .with_encryption(genesis.encryption_secret());
461
462 Self::with_identity(config, identity)
463 }
464
465 #[cfg(feature = "std")]
491 pub fn from_persisted(
492 state: crate::security::PersistedState,
493 callsign: &str,
494 ) -> Result<Self, crate::security::PersistenceError> {
495 let identity = state.restore_identity()?;
497
498 let genesis = state.restore_genesis();
500
501 let mesh = if let Some(ref gen) = genesis {
503 Self::from_genesis(gen, identity, callsign)
504 } else {
505 let config = PeatMeshConfig::new(identity.node_id(), callsign, "RESTORED");
506 Self::with_identity(config, identity)
507 };
508
509 let restored_registry = state.restore_registry();
511 if let Ok(mut registry) = mesh.identity_registry.lock() {
512 *registry = restored_registry;
513 }
514
515 log::info!(
516 "PeatMesh restored from persisted state: node_id={:08X}, known_peers={}",
517 mesh.config.node_id.as_u32(),
518 mesh.known_identity_count()
519 );
520
521 Ok(mesh)
522 }
523
524 #[cfg(feature = "std")]
537 pub fn to_persisted_state(
538 &self,
539 genesis: Option<&crate::security::MeshGenesis>,
540 ) -> Option<crate::security::PersistedState> {
541 let identity = self.identity.as_ref()?;
542 let registry = self.identity_registry.lock().ok()?;
543
544 Some(crate::security::PersistedState::with_registry(
545 identity, genesis, ®istry,
546 ))
547 }
548
549 pub fn is_encryption_enabled(&self) -> bool {
553 self.encryption_key.is_some()
554 }
555
556 pub fn is_strict_encryption_enabled(&self) -> bool {
560 self.config.strict_encryption && self.encryption_key.is_some()
561 }
562
563 pub fn enable_encryption(&mut self, secret: &[u8; 32]) {
568 self.encryption_key = Some(MeshEncryptionKey::from_shared_secret(
569 &self.config.mesh_id,
570 secret,
571 ));
572 }
573
574 pub fn disable_encryption(&mut self) {
576 self.encryption_key = None;
577 }
578
579 fn encrypt_document(&self, plaintext: &[u8]) -> Vec<u8> {
584 match &self.encryption_key {
585 Some(key) => {
586 match key.encrypt_to_bytes(plaintext) {
588 Ok(ciphertext) => {
589 let mut buf = Vec::with_capacity(2 + ciphertext.len());
590 buf.push(ENCRYPTED_MARKER);
591 buf.push(0x00); buf.extend_from_slice(&ciphertext);
593 buf
594 }
595 Err(e) => {
596 log::error!("Encryption failed: {}", e);
597 plaintext.to_vec()
599 }
600 }
601 }
602 None => plaintext.to_vec(),
603 }
604 }
605
606 fn decrypt_document<'a>(
614 &self,
615 data: &'a [u8],
616 source_hint: Option<&str>,
617 ) -> Option<std::borrow::Cow<'a, [u8]>> {
618 log::debug!(
619 "decrypt_document: len={}, first_byte=0x{:02X}, source={:?}",
620 data.len(),
621 data.first().copied().unwrap_or(0),
622 source_hint
623 );
624
625 if data.len() >= 2 && data[0] == ENCRYPTED_MARKER {
627 let _reserved = data[1];
629 let encrypted_payload = &data[2..];
630
631 log::debug!(
632 "decrypt_document: encrypted payload len={}, nonce+ciphertext",
633 encrypted_payload.len()
634 );
635
636 match &self.encryption_key {
637 Some(key) => match key.decrypt_from_bytes(encrypted_payload) {
638 Ok(plaintext) => {
639 log::debug!(
640 "decrypt_document: SUCCESS, plaintext len={}",
641 plaintext.len()
642 );
643 Some(std::borrow::Cow::Owned(plaintext))
644 }
645 Err(e) => {
646 log::warn!(
647 "decrypt_document: FAILED (wrong key or corrupted): {} [payload_len={}, source={:?}]",
648 e,
649 encrypted_payload.len(),
650 source_hint
651 );
652 self.notify(PeatEvent::SecurityViolation {
653 kind: SecurityViolationKind::DecryptionFailed,
654 source: source_hint.map(String::from),
655 });
656 None
657 }
658 },
659 None => {
660 log::warn!(
661 "decrypt_document: encryption not enabled but received encrypted doc"
662 );
663 None
664 }
665 }
666 } else {
667 if self.config.strict_encryption && self.encryption_key.is_some() {
670 log::warn!(
671 "Rejected unencrypted document in strict encryption mode (source: {:?})",
672 source_hint
673 );
674 self.notify(PeatEvent::SecurityViolation {
675 kind: SecurityViolationKind::UnencryptedInStrictMode,
676 source: source_hint.map(String::from),
677 });
678 None
679 } else {
680 Some(std::borrow::Cow::Borrowed(data))
682 }
683 }
684 }
685
686 pub fn decrypt_only(&self, data: &[u8]) -> Option<Vec<u8>> {
700 self.decrypt_document(data, None)
701 .map(|cow| cow.into_owned())
702 }
703
704 pub fn has_identity(&self) -> bool {
708 self.identity.is_some()
709 }
710
711 pub fn public_key(&self) -> Option<[u8; 32]> {
713 self.identity.as_ref().map(|id| id.public_key())
714 }
715
716 pub fn create_attestation(&self, now_ms: u64) -> Option<IdentityAttestation> {
720 self.identity
721 .as_ref()
722 .map(|id| id.create_attestation(now_ms))
723 }
724
725 pub fn verify_peer_identity(&self, attestation: &IdentityAttestation) -> RegistryResult {
734 self.identity_registry
735 .lock()
736 .unwrap()
737 .verify_or_register(attestation)
738 }
739
740 pub fn is_peer_identity_known(&self, node_id: NodeId) -> bool {
742 self.identity_registry.lock().unwrap().is_known(node_id)
743 }
744
745 pub fn peer_public_key(&self, node_id: NodeId) -> Option<[u8; 32]> {
747 self.identity_registry
748 .lock()
749 .unwrap()
750 .get_public_key(node_id)
751 .copied()
752 }
753
754 pub fn known_identity_count(&self) -> usize {
756 self.identity_registry.lock().unwrap().len()
757 }
758
759 pub fn pre_register_peer_identity(&self, node_id: NodeId, public_key: [u8; 32], now_ms: u64) {
764 self.identity_registry
765 .lock()
766 .unwrap()
767 .pre_register(node_id, public_key, now_ms);
768 }
769
770 pub fn forget_peer_identity(&self, node_id: NodeId) {
774 self.identity_registry.lock().unwrap().remove(node_id);
775 }
776
777 pub fn sign(&self, data: &[u8]) -> Option<[u8; 64]> {
781 self.identity.as_ref().map(|id| id.sign(data))
782 }
783
784 pub fn verify_peer_signature(
789 &self,
790 node_id: NodeId,
791 data: &[u8],
792 signature: &[u8; 64],
793 ) -> bool {
794 if let Some(public_key) = self.peer_public_key(node_id) {
795 crate::security::verify_signature(&public_key, data, signature)
796 } else {
797 false
798 }
799 }
800
801 pub fn is_relay_enabled(&self) -> bool {
805 self.config.enable_relay
806 }
807
808 pub fn enable_relay(&mut self) {
810 self.config.enable_relay = true;
811 }
812
813 pub fn disable_relay(&mut self) {
815 self.config.enable_relay = false;
816 }
817
818 pub fn has_seen_message(&self, message_id: &MessageId) -> bool {
822 self.seen_cache.lock().unwrap().has_seen(message_id)
823 }
824
825 pub fn mark_message_seen(&self, message_id: MessageId, origin: NodeId, now_ms: u64) -> bool {
829 self.seen_cache
830 .lock()
831 .unwrap()
832 .check_and_mark(message_id, origin, now_ms)
833 }
834
835 pub fn seen_cache_size(&self) -> usize {
837 self.seen_cache.lock().unwrap().len()
838 }
839
840 pub fn clear_seen_cache(&self) {
842 self.seen_cache.lock().unwrap().clear();
843 }
844
845 pub fn wrap_for_relay(&self, payload: Vec<u8>) -> Vec<u8> {
850 let envelope = RelayEnvelope::broadcast(self.config.node_id, payload)
851 .with_max_hops(self.config.max_relay_hops);
852 envelope.encode()
853 }
854
855 pub fn get_relay_targets(&self, exclude_peer: Option<NodeId>) -> Vec<PeatPeer> {
860 let connected = self.peer_manager.get_connected_peers();
861 let filtered: Vec<_> = if let Some(exclude) = exclude_peer {
862 connected
863 .into_iter()
864 .filter(|p| p.node_id != exclude)
865 .collect()
866 } else {
867 connected
868 };
869
870 self.gossip_strategy
871 .select_peers(&filtered)
872 .into_iter()
873 .cloned()
874 .collect()
875 }
876
877 pub fn process_relay_envelope(
887 &self,
888 data: &[u8],
889 source_peer: NodeId,
890 now_ms: u64,
891 ) -> Option<RelayDecision> {
892 let envelope = RelayEnvelope::decode(data)?;
894
895 if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
898 let is_new = self.connection_graph.lock().unwrap().on_relay_received(
899 source_peer,
900 envelope.origin_node,
901 envelope.hop_count,
902 now_ms,
903 );
904
905 if is_new {
906 log::debug!(
907 "Discovered indirect peer {:08X} via {:08X} ({} hops)",
908 envelope.origin_node.as_u32(),
909 source_peer.as_u32(),
910 envelope.hop_count
911 );
912 }
913 }
914
915 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
917 let stats = self
919 .seen_cache
920 .lock()
921 .unwrap()
922 .get_stats(&envelope.message_id);
923 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
924
925 self.notify(PeatEvent::DuplicateMessageDropped {
926 origin_node: envelope.origin_node,
927 seen_count,
928 });
929
930 log::debug!(
931 "Dropped duplicate message {} from {:08X} (seen {} times)",
932 envelope.message_id,
933 envelope.origin_node.as_u32(),
934 seen_count
935 );
936 return None;
937 }
938
939 if !envelope.can_relay() {
941 self.notify(PeatEvent::MessageTtlExpired {
942 origin_node: envelope.origin_node,
943 hop_count: envelope.hop_count,
944 });
945
946 log::debug!(
947 "Message {} from {:08X} TTL expired at hop {}",
948 envelope.message_id,
949 envelope.origin_node.as_u32(),
950 envelope.hop_count
951 );
952
953 return Some(RelayDecision {
955 payload: envelope.payload,
956 origin_node: envelope.origin_node,
957 hop_count: envelope.hop_count,
958 should_relay: false,
959 relay_envelope: None,
960 });
961 }
962
963 let should_relay = self.config.enable_relay;
965 let relay_envelope = if should_relay {
966 envelope.relay() } else {
968 None
969 };
970
971 Some(RelayDecision {
972 payload: envelope.payload,
973 origin_node: envelope.origin_node,
974 hop_count: envelope.hop_count,
975 should_relay,
976 relay_envelope,
977 })
978 }
979
980 pub fn build_relay_document(&self) -> Vec<u8> {
985 let doc = self.build_document(); self.wrap_for_relay(doc)
987 }
988
989 pub fn register_peer_for_delta(&self, peer_id: &NodeId) {
996 let mut encoder = self.delta_encoder.lock().unwrap();
997 encoder.add_peer(peer_id);
998 log::debug!(
999 "Registered peer {:08X} for delta sync tracking",
1000 peer_id.as_u32()
1001 );
1002 }
1003
1004 pub fn unregister_peer_for_delta(&self, peer_id: &NodeId) {
1008 let mut encoder = self.delta_encoder.lock().unwrap();
1009 encoder.remove_peer(peer_id);
1010 log::debug!(
1011 "Unregistered peer {:08X} from delta sync tracking",
1012 peer_id.as_u32()
1013 );
1014 }
1015
1016 pub fn reset_peer_delta_state(&self, peer_id: &NodeId) {
1021 let mut encoder = self.delta_encoder.lock().unwrap();
1022 encoder.reset_peer(peer_id);
1023 log::debug!("Reset delta sync state for peer {:08X}", peer_id.as_u32());
1024 }
1025
1026 pub fn record_delta_sent(&self, peer_id: &NodeId, bytes: usize) {
1028 let mut encoder = self.delta_encoder.lock().unwrap();
1029 encoder.record_sent(peer_id, bytes);
1030 }
1031
1032 pub fn record_delta_received(&self, peer_id: &NodeId, bytes: usize, timestamp: u64) {
1034 let mut encoder = self.delta_encoder.lock().unwrap();
1035 encoder.record_received(peer_id, bytes, timestamp);
1036 }
1037
1038 pub fn delta_stats(&self) -> DeltaStats {
1043 self.delta_encoder.lock().unwrap().stats()
1044 }
1045
1046 pub fn peer_delta_stats(&self, peer_id: &NodeId) -> Option<(u64, u64, u32)> {
1050 let encoder = self.delta_encoder.lock().unwrap();
1051 encoder
1052 .get_peer_state(peer_id)
1053 .map(|state| (state.bytes_sent, state.bytes_received, state.sync_count))
1054 }
1055
1056 pub fn build_delta_document_for_peer(&self, peer_id: &NodeId, now_ms: u64) -> Option<Vec<u8>> {
1064 let mut all_operations: Vec<Operation> = Vec::new();
1066
1067 for (node_id_u32, count) in self.document_sync.counter_entries() {
1070 all_operations.push(Operation::IncrementCounter {
1071 counter_id: 0, node_id: NodeId::new(node_id_u32),
1073 amount: count,
1074 timestamp: count, });
1076 }
1077
1078 let peripheral = self.document_sync.peripheral_snapshot();
1081 let peripheral_timestamp = peripheral
1082 .last_event
1083 .as_ref()
1084 .map(|e| e.timestamp)
1085 .unwrap_or(1); all_operations.push(Operation::UpdatePeripheral {
1087 peripheral,
1088 timestamp: peripheral_timestamp,
1089 });
1090
1091 if let Some(emergency) = self.document_sync.emergency_snapshot() {
1093 let source_node = NodeId::new(emergency.source_node());
1094 let timestamp = emergency.timestamp();
1095
1096 all_operations.push(Operation::SetEmergency {
1098 source_node,
1099 timestamp,
1100 known_peers: emergency.all_nodes(),
1101 });
1102
1103 for acked_node in emergency.acked_nodes() {
1105 all_operations.push(Operation::AckEmergency {
1106 node_id: NodeId::new(acked_node),
1107 emergency_timestamp: timestamp,
1108 });
1109 }
1110 }
1111
1112 for app_op in self.app_document_delta_ops() {
1114 all_operations.push(Operation::App(app_op));
1115 }
1116
1117 let filtered_operations: Vec<Operation> = {
1119 let encoder = self.delta_encoder.lock().unwrap();
1120 if let Some(peer_state) = encoder.get_peer_state(peer_id) {
1121 all_operations
1122 .into_iter()
1123 .filter(|op| peer_state.needs_send(&op.key(), op.timestamp()))
1124 .collect()
1125 } else {
1126 all_operations
1128 }
1129 };
1130
1131 if filtered_operations.is_empty() {
1133 return None;
1134 }
1135
1136 {
1138 let mut encoder = self.delta_encoder.lock().unwrap();
1139 if let Some(peer_state) = encoder.get_peer_state_mut(peer_id) {
1140 for op in &filtered_operations {
1141 peer_state.mark_sent(&op.key(), op.timestamp());
1142 }
1143 }
1144 }
1145
1146 let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1148 for op in filtered_operations {
1149 delta.add_operation(op);
1150 }
1151
1152 let encoded = delta.encode();
1154 let result = self.encrypt_document(&encoded);
1155
1156 {
1158 let mut encoder = self.delta_encoder.lock().unwrap();
1159 encoder.record_sent(peer_id, result.len());
1160 }
1161
1162 Some(result)
1163 }
1164
1165 pub fn build_full_delta_document(&self, now_ms: u64) -> Vec<u8> {
1170 let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1171
1172 for (node_id_u32, count) in self.document_sync.counter_entries() {
1174 delta.add_operation(Operation::IncrementCounter {
1175 counter_id: 0,
1176 node_id: NodeId::new(node_id_u32),
1177 amount: count,
1178 timestamp: now_ms,
1179 });
1180 }
1181
1182 let peripheral = self.document_sync.peripheral_snapshot();
1184 let peripheral_timestamp = peripheral
1185 .last_event
1186 .as_ref()
1187 .map(|e| e.timestamp)
1188 .unwrap_or(now_ms);
1189 delta.add_operation(Operation::UpdatePeripheral {
1190 peripheral,
1191 timestamp: peripheral_timestamp,
1192 });
1193
1194 if let Some(emergency) = self.document_sync.emergency_snapshot() {
1196 let source_node = NodeId::new(emergency.source_node());
1197 let timestamp = emergency.timestamp();
1198
1199 delta.add_operation(Operation::SetEmergency {
1200 source_node,
1201 timestamp,
1202 known_peers: emergency.all_nodes(),
1203 });
1204
1205 for acked_node in emergency.acked_nodes() {
1206 delta.add_operation(Operation::AckEmergency {
1207 node_id: NodeId::new(acked_node),
1208 emergency_timestamp: timestamp,
1209 });
1210 }
1211 }
1212
1213 for app_op in self.app_document_delta_ops() {
1215 delta.add_operation(Operation::App(app_op));
1216 }
1217
1218 let encoded = delta.encode();
1219 self.encrypt_document(&encoded)
1220 }
1221
1222 fn process_delta_document_internal(
1226 &self,
1227 source_node: NodeId,
1228 data: &[u8],
1229 now_ms: u64,
1230 relay_data: Option<Vec<u8>>,
1231 origin_node: Option<NodeId>,
1232 hop_count: u8,
1233 ) -> Option<DataReceivedResult> {
1234 let delta = DeltaDocument::decode(data)?;
1236
1237 if delta.origin_node == self.config.node_id {
1239 return None;
1240 }
1241
1242 let mut counter_changed = false;
1244 let mut emergency_changed = false;
1245 let mut is_emergency = false;
1246 let mut is_ack = false;
1247 let mut event_timestamp = 0u64;
1248 let mut peer_peripheral: Option<crate::sync::crdt::Peripheral> = None;
1249
1250 log::info!(
1251 "Delta document from {:08X}: {} operations, data_len={}",
1252 delta.origin_node.as_u32(),
1253 delta.operations.len(),
1254 data.len()
1255 );
1256 for op in &delta.operations {
1257 log::info!(" Operation: {}", op.key());
1258 match op {
1259 Operation::IncrementCounter {
1260 node_id, amount, ..
1261 } => {
1262 let current = self.document_sync.counter_entries();
1264 let current_value = current
1265 .iter()
1266 .find(|(id, _)| *id == node_id.as_u32())
1267 .map(|(_, v)| *v)
1268 .unwrap_or(0);
1269
1270 if *amount > current_value {
1271 counter_changed = true;
1274 }
1275 }
1276 Operation::UpdatePeripheral {
1277 peripheral,
1278 timestamp,
1279 } => {
1280 if let Ok(mut peripherals) = self.peer_peripherals.write() {
1282 peripherals.insert(delta.origin_node, peripheral.clone());
1283 }
1284 peer_peripheral = Some(peripheral.clone());
1286 if *timestamp > event_timestamp {
1288 event_timestamp = *timestamp;
1289 }
1290 }
1291 Operation::SetEmergency { timestamp, .. } => {
1292 is_emergency = true;
1293 emergency_changed = true;
1294 event_timestamp = *timestamp;
1295 }
1296 Operation::AckEmergency {
1297 emergency_timestamp,
1298 ..
1299 } => {
1300 is_ack = true;
1301 emergency_changed = true;
1302 if *emergency_timestamp > event_timestamp {
1303 event_timestamp = *emergency_timestamp;
1304 }
1305 }
1306 Operation::ClearEmergency {
1307 emergency_timestamp,
1308 } => {
1309 emergency_changed = true;
1310 if *emergency_timestamp > event_timestamp {
1311 event_timestamp = *emergency_timestamp;
1312 }
1313 }
1314 Operation::App(app_op) => {
1315 let doc_timestamp = app_op.timestamp & 0x0000_FFFF_FFFF_FFFF;
1321
1322 log::info!(
1323 "App operation received: type={:02X} op_code={:02X} from {:08X} ts={} payload_len={}",
1324 app_op.type_id,
1325 app_op.op_code,
1326 app_op.source_node,
1327 doc_timestamp,
1328 app_op.payload.len()
1329 );
1330
1331 let doc_key = (app_op.type_id, app_op.source_node, doc_timestamp);
1333 let changed = {
1334 let mut docs = self.app_documents.write().unwrap();
1335
1336 if let Some(existing) = docs.get_mut(&doc_key) {
1337 self.document_registry.apply_delta_op(
1339 app_op.type_id,
1340 existing.as_mut(),
1341 app_op,
1342 )
1343 } else {
1344 if let Some(decoded) = self
1346 .document_registry
1347 .decode(app_op.type_id, &app_op.payload)
1348 {
1349 docs.insert(doc_key, decoded);
1350 true
1351 } else {
1352 log::debug!(
1355 "Received delta for unknown doc {:?}, waiting for full state",
1356 doc_key
1357 );
1358 false
1359 }
1360 }
1361 };
1362
1363 self.observers.notify(PeatEvent::app_document_received(
1365 app_op.type_id,
1366 NodeId::new(app_op.source_node),
1367 doc_timestamp,
1368 changed,
1369 ));
1370 }
1371 }
1372 }
1373
1374 self.peer_manager.record_sync(source_node, now_ms);
1376
1377 {
1379 let mut encoder = self.delta_encoder.lock().unwrap();
1380 encoder.record_received(&source_node, data.len(), now_ms);
1381 }
1382
1383 if is_emergency {
1385 self.notify(PeatEvent::EmergencyReceived {
1386 from_node: delta.origin_node,
1387 });
1388 } else if is_ack {
1389 self.notify(PeatEvent::AckReceived {
1390 from_node: delta.origin_node,
1391 });
1392 }
1393
1394 if counter_changed {
1395 let total_count = self.document_sync.total_count();
1396 self.notify(PeatEvent::DocumentSynced {
1397 from_node: delta.origin_node,
1398 total_count,
1399 });
1400 }
1401
1402 if relay_data.is_some() {
1404 let relay_targets = self.get_relay_targets(Some(source_node));
1405 self.notify(PeatEvent::MessageRelayed {
1406 origin_node: origin_node.unwrap_or(delta.origin_node),
1407 relay_count: relay_targets.len(),
1408 hop_count,
1409 });
1410 }
1411
1412 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
1413 DataReceivedResult::peripheral_fields(&peer_peripheral);
1414
1415 Some(DataReceivedResult {
1416 source_node: delta.origin_node,
1417 is_emergency,
1418 is_ack,
1419 counter_changed,
1420 emergency_changed,
1421 total_count: self.document_sync.total_count(),
1422 event_timestamp,
1423 relay_data,
1424 origin_node,
1425 hop_count,
1426 callsign,
1427 battery_percent,
1428 heart_rate,
1429 event_type,
1430 latitude,
1431 longitude,
1432 altitude,
1433 })
1434 }
1435
1436 pub fn enable_peer_e2ee(&self) {
1444 let mut sessions = self.peer_sessions.lock().unwrap();
1445 if sessions.is_none() {
1446 *sessions = Some(PeerSessionManager::new(self.config.node_id));
1447 log::info!(
1448 "Per-peer E2EE enabled for node {:08X}",
1449 self.config.node_id.as_u32()
1450 );
1451 }
1452 }
1453
1454 pub fn disable_peer_e2ee(&self) {
1458 let mut sessions = self.peer_sessions.lock().unwrap();
1459 *sessions = None;
1460 log::info!("Per-peer E2EE disabled");
1461 }
1462
1463 pub fn is_peer_e2ee_enabled(&self) -> bool {
1465 self.peer_sessions.lock().unwrap().is_some()
1466 }
1467
1468 pub fn peer_e2ee_public_key(&self) -> Option<[u8; 32]> {
1472 self.peer_sessions
1473 .lock()
1474 .unwrap()
1475 .as_ref()
1476 .map(|s| s.our_public_key())
1477 }
1478
1479 pub fn initiate_peer_e2ee(&self, peer_node_id: NodeId, now_ms: u64) -> Option<Vec<u8>> {
1485 let mut sessions = self.peer_sessions.lock().unwrap();
1486 let session_mgr = sessions.as_mut()?;
1487
1488 let key_exchange = session_mgr.initiate_session(peer_node_id, now_ms);
1489 let mut buf = Vec::with_capacity(2 + 37);
1490 buf.push(KEY_EXCHANGE_MARKER);
1491 buf.push(0x00); buf.extend_from_slice(&key_exchange.encode());
1493
1494 log::info!(
1495 "Initiated E2EE session with peer {:08X}",
1496 peer_node_id.as_u32()
1497 );
1498 Some(buf)
1499 }
1500
1501 pub fn has_peer_e2ee_session(&self, peer_node_id: NodeId) -> bool {
1503 self.peer_sessions
1504 .lock()
1505 .unwrap()
1506 .as_ref()
1507 .is_some_and(|s| s.has_session(peer_node_id))
1508 }
1509
1510 pub fn peer_e2ee_session_state(&self, peer_node_id: NodeId) -> Option<SessionState> {
1512 self.peer_sessions
1513 .lock()
1514 .unwrap()
1515 .as_ref()
1516 .and_then(|s| s.session_state(peer_node_id))
1517 }
1518
1519 pub fn send_peer_e2ee(
1524 &self,
1525 peer_node_id: NodeId,
1526 plaintext: &[u8],
1527 now_ms: u64,
1528 ) -> Option<Vec<u8>> {
1529 let mut sessions = self.peer_sessions.lock().unwrap();
1530 let session_mgr = sessions.as_mut()?;
1531
1532 match session_mgr.encrypt_for_peer(peer_node_id, plaintext, now_ms) {
1533 Ok(encrypted) => {
1534 let mut buf = Vec::with_capacity(2 + encrypted.encode().len());
1535 buf.push(PEER_E2EE_MARKER);
1536 buf.push(0x00); buf.extend_from_slice(&encrypted.encode());
1538 Some(buf)
1539 }
1540 Err(e) => {
1541 log::warn!(
1542 "Failed to encrypt for peer {:08X}: {:?}",
1543 peer_node_id.as_u32(),
1544 e
1545 );
1546 None
1547 }
1548 }
1549 }
1550
1551 pub fn close_peer_e2ee(&self, peer_node_id: NodeId) {
1553 let mut sessions = self.peer_sessions.lock().unwrap();
1554 if let Some(session_mgr) = sessions.as_mut() {
1555 session_mgr.close_session(peer_node_id);
1556 self.notify(PeatEvent::PeerE2eeClosed { peer_node_id });
1557 log::info!(
1558 "Closed E2EE session with peer {:08X}",
1559 peer_node_id.as_u32()
1560 );
1561 }
1562 }
1563
1564 pub fn peer_e2ee_session_count(&self) -> usize {
1566 self.peer_sessions
1567 .lock()
1568 .unwrap()
1569 .as_ref()
1570 .map(|s| s.session_count())
1571 .unwrap_or(0)
1572 }
1573
1574 pub fn peer_e2ee_established_count(&self) -> usize {
1576 self.peer_sessions
1577 .lock()
1578 .unwrap()
1579 .as_ref()
1580 .map(|s| s.established_count())
1581 .unwrap_or(0)
1582 }
1583
1584 fn handle_key_exchange(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1589 if data.len() < 2 || data[0] != KEY_EXCHANGE_MARKER {
1590 return None;
1591 }
1592
1593 let payload = &data[2..];
1594 let msg = KeyExchangeMessage::decode(payload)?;
1595
1596 let mut sessions = self.peer_sessions.lock().unwrap();
1597 let session_mgr = sessions.as_mut()?;
1598
1599 let (response, established) = session_mgr.handle_key_exchange(&msg, now_ms)?;
1600
1601 if established {
1602 self.notify(PeatEvent::PeerE2eeEstablished {
1603 peer_node_id: msg.sender_node_id,
1604 });
1605 log::info!(
1606 "E2EE session established with peer {:08X}",
1607 msg.sender_node_id.as_u32()
1608 );
1609 }
1610
1611 let mut buf = Vec::with_capacity(2 + 37);
1613 buf.push(KEY_EXCHANGE_MARKER);
1614 buf.push(0x00);
1615 buf.extend_from_slice(&response.encode());
1616 Some(buf)
1617 }
1618
1619 fn handle_peer_e2ee_message(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1624 if data.len() < 2 || data[0] != PEER_E2EE_MARKER {
1625 return None;
1626 }
1627
1628 let payload = &data[2..];
1629 let msg = PeerEncryptedMessage::decode(payload)?;
1630
1631 let mut sessions = self.peer_sessions.lock().unwrap();
1632 let session_mgr = sessions.as_mut()?;
1633
1634 match session_mgr.decrypt_from_peer(&msg, now_ms) {
1635 Ok(plaintext) => {
1636 self.notify(PeatEvent::PeerE2eeMessageReceived {
1638 from_node: msg.sender_node_id,
1639 data: plaintext.clone(),
1640 });
1641 Some(plaintext)
1642 }
1643 Err(e) => {
1644 log::warn!(
1645 "Failed to decrypt E2EE message from {:08X}: {:?}",
1646 msg.sender_node_id.as_u32(),
1647 e
1648 );
1649 None
1650 }
1651 }
1652 }
1653
1654 pub fn node_id(&self) -> NodeId {
1658 self.config.node_id
1659 }
1660
1661 pub fn callsign(&self) -> &str {
1663 &self.config.callsign
1664 }
1665
1666 pub fn mesh_id(&self) -> &str {
1668 &self.config.mesh_id
1669 }
1670
1671 pub fn device_name(&self) -> String {
1673 format!(
1674 "PEAT_{}-{:08X}",
1675 self.config.mesh_id,
1676 self.config.node_id.as_u32()
1677 )
1678 }
1679
1680 pub fn get_peer_callsign(&self, node_id: NodeId) -> Option<String> {
1685 self.peer_peripherals.read().ok().and_then(|peripherals| {
1686 peripherals
1687 .get(&node_id)
1688 .map(|p| p.callsign_str().to_string())
1689 })
1690 }
1691
1692 pub fn get_peer_peripheral(&self, node_id: NodeId) -> Option<Peripheral> {
1697 self.peer_peripherals
1698 .read()
1699 .ok()
1700 .and_then(|peripherals| peripherals.get(&node_id).cloned())
1701 }
1702
1703 pub fn document_registry(&self) -> &DocumentRegistry {
1718 &self.document_registry
1719 }
1720
1721 pub fn store_app_document<T: crate::registry::DocumentType>(&self, doc: T) -> bool {
1728 let type_id = T::TYPE_ID;
1729 let (source_node, timestamp) = doc.identity();
1730 let key = (type_id, source_node, timestamp);
1731
1732 let mut docs = self.app_documents.write().unwrap();
1733
1734 if let Some(existing) = docs.get_mut(&key) {
1735 self.document_registry
1737 .merge(type_id, existing.as_mut(), &doc)
1738 } else {
1739 docs.insert(key, Box::new(doc));
1741 true
1742 }
1743 }
1744
1745 pub fn store_app_document_boxed(
1752 &self,
1753 type_id: u8,
1754 source_node: u32,
1755 timestamp: u64,
1756 doc: Box<dyn core::any::Any + Send + Sync>,
1757 ) -> bool {
1758 let key = (type_id, source_node, timestamp);
1759
1760 let mut docs = self.app_documents.write().unwrap();
1761
1762 if let Some(existing) = docs.get_mut(&key) {
1763 self.document_registry
1765 .merge(type_id, existing.as_mut(), doc.as_ref())
1766 } else {
1767 docs.insert(key, doc);
1769 true
1770 }
1771 }
1772
1773 pub fn get_app_document<T: crate::registry::DocumentType>(
1777 &self,
1778 source_node: u32,
1779 timestamp: u64,
1780 ) -> Option<T> {
1781 let key = (T::TYPE_ID, source_node, timestamp);
1782
1783 let docs = self.app_documents.read().unwrap();
1784 docs.get(&key).and_then(|d| d.downcast_ref::<T>()).cloned()
1785 }
1786
1787 pub fn get_all_app_documents_of_type<T: crate::registry::DocumentType>(&self) -> Vec<T> {
1791 let docs = self.app_documents.read().unwrap();
1792 docs.iter()
1793 .filter(|((type_id, _, _), _)| *type_id == T::TYPE_ID)
1794 .filter_map(|(_, doc)| doc.downcast_ref::<T>().cloned())
1795 .collect()
1796 }
1797
1798 pub fn app_document_delta_ops(&self) -> Vec<crate::registry::AppOperation> {
1802 let docs = self.app_documents.read().unwrap();
1803 let mut ops = Vec::new();
1804
1805 for ((type_id, _source, _ts), doc) in docs.iter() {
1806 if let Some(op) = self.document_registry.to_delta_op(*type_id, doc.as_ref()) {
1807 ops.push(op);
1808 }
1809 }
1810
1811 ops
1812 }
1813
1814 pub fn app_document_keys(&self, type_id: u8) -> Vec<(u32, u64)> {
1818 let docs = self.app_documents.read().unwrap();
1819 docs.keys()
1820 .filter(|(tid, _, _)| *tid == type_id)
1821 .map(|(_, source, ts)| (*source, *ts))
1822 .collect()
1823 }
1824
1825 pub fn app_document_count(&self) -> usize {
1827 self.app_documents.read().unwrap().len()
1828 }
1829
1830 pub fn add_observer(&self, observer: Arc<dyn PeatObserver>) {
1834 self.observers.add(observer);
1835 }
1836
1837 pub fn remove_observer(&self, observer: &Arc<dyn PeatObserver>) {
1839 self.observers.remove(observer);
1840 }
1841
1842 pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
1849 let data = self.document_sync.send_emergency(timestamp);
1850 self.notify(PeatEvent::MeshStateChanged {
1851 peer_count: self.peer_manager.peer_count(),
1852 connected_count: self.peer_manager.connected_count(),
1853 });
1854 self.encrypt_document(&data)
1855 }
1856
1857 pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
1862 let data = self.document_sync.send_ack(timestamp);
1863 self.notify(PeatEvent::MeshStateChanged {
1864 peer_count: self.peer_manager.peer_count(),
1865 connected_count: self.peer_manager.connected_count(),
1866 });
1867 self.encrypt_document(&data)
1868 }
1869
1870 pub fn broadcast_bytes(&self, payload: &[u8]) -> Vec<u8> {
1877 self.encrypt_document(payload)
1878 }
1879
1880 pub fn clear_event(&self) {
1882 self.document_sync.clear_event();
1883 }
1884
1885 pub fn is_emergency_active(&self) -> bool {
1887 self.document_sync.is_emergency_active()
1888 }
1889
1890 pub fn is_ack_active(&self) -> bool {
1892 self.document_sync.is_ack_active()
1893 }
1894
1895 pub fn current_event(&self) -> Option<EventType> {
1897 self.document_sync.current_event()
1898 }
1899
1900 pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
1909 let data = self.document_sync.start_emergency(timestamp, known_peers);
1910 self.notify(PeatEvent::MeshStateChanged {
1911 peer_count: self.peer_manager.peer_count(),
1912 connected_count: self.peer_manager.connected_count(),
1913 });
1914 self.encrypt_document(&data)
1915 }
1916
1917 pub fn start_emergency_with_known_peers(&self, timestamp: u64) -> Vec<u8> {
1921 let peers: Vec<u32> = self
1922 .peer_manager
1923 .get_peers()
1924 .iter()
1925 .map(|p| p.node_id.as_u32())
1926 .collect();
1927 self.start_emergency(timestamp, &peers)
1928 }
1929
1930 pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
1935 let result = self.document_sync.ack_emergency(timestamp);
1936 if result.is_some() {
1937 self.notify(PeatEvent::MeshStateChanged {
1938 peer_count: self.peer_manager.peer_count(),
1939 connected_count: self.peer_manager.connected_count(),
1940 });
1941 }
1942 result.map(|data| self.encrypt_document(&data))
1943 }
1944
1945 pub fn clear_emergency(&self) {
1947 self.document_sync.clear_emergency();
1948 }
1949
1950 pub fn has_active_emergency(&self) -> bool {
1952 self.document_sync.has_active_emergency()
1953 }
1954
1955 pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
1959 self.document_sync.get_emergency_status()
1960 }
1961
1962 pub fn has_peer_acked(&self, peer_id: u32) -> bool {
1964 self.document_sync.has_peer_acked(peer_id)
1965 }
1966
1967 pub fn all_peers_acked(&self) -> bool {
1969 self.document_sync.all_peers_acked()
1970 }
1971
1972 #[cfg(feature = "legacy-chat")]
1982 pub fn send_chat(&self, sender: &str, text: &str, timestamp: u64) -> Option<Vec<u8>> {
1983 if self.document_sync.add_chat_message(sender, text, timestamp) {
1984 Some(self.encrypt_document(&self.build_document()))
1985 } else {
1986 None
1987 }
1988 }
1989
1990 #[cfg(feature = "legacy-chat")]
1998 pub fn send_chat_reply(
1999 &self,
2000 sender: &str,
2001 text: &str,
2002 reply_to_node: u32,
2003 reply_to_timestamp: u64,
2004 timestamp: u64,
2005 ) -> Option<Vec<u8>> {
2006 if self.document_sync.add_chat_reply(
2007 sender,
2008 text,
2009 reply_to_node,
2010 reply_to_timestamp,
2011 timestamp,
2012 ) {
2013 Some(self.encrypt_document(&self.build_document()))
2014 } else {
2015 None
2016 }
2017 }
2018
2019 #[cfg(feature = "legacy-chat")]
2021 pub fn chat_count(&self) -> usize {
2022 self.document_sync.chat_count()
2023 }
2024
2025 #[cfg(feature = "legacy-chat")]
2029 pub fn chat_messages_since(
2030 &self,
2031 since_timestamp: u64,
2032 ) -> Vec<(u32, u64, String, String, u32, u64)> {
2033 self.document_sync.chat_messages_since(since_timestamp)
2034 }
2035
2036 #[cfg(feature = "legacy-chat")]
2040 pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
2041 self.document_sync.all_chat_messages()
2042 }
2043
2044 pub fn on_ble_discovered(
2050 &self,
2051 identifier: &str,
2052 name: Option<&str>,
2053 rssi: i8,
2054 mesh_id: Option<&str>,
2055 now_ms: u64,
2056 ) -> Option<PeatPeer> {
2057 let (node_id, is_new) = self
2058 .peer_manager
2059 .on_discovered(identifier, name, rssi, mesh_id, now_ms)?;
2060
2061 let peer = self.peer_manager.get_peer(node_id)?;
2062
2063 {
2065 let mut graph = self.connection_graph.lock().unwrap();
2066 graph.on_discovered(
2067 node_id,
2068 identifier.to_string(),
2069 name.map(|s| s.to_string()),
2070 mesh_id.map(|s| s.to_string()),
2071 rssi,
2072 now_ms,
2073 );
2074 }
2075
2076 if is_new {
2077 self.notify(PeatEvent::PeerDiscovered { peer: peer.clone() });
2078 self.notify_mesh_state_changed();
2079 }
2080
2081 Some(peer)
2082 }
2083
2084 pub fn on_ble_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
2088 let node_id = match self.peer_manager.on_connected(identifier, now_ms) {
2089 Some(id) => id,
2090 None => {
2091 log::warn!(
2092 "on_ble_connected: identifier {:?} not in peer map — \
2093 use on_incoming_connection() for peripheral connections",
2094 identifier
2095 );
2096 return None;
2097 }
2098 };
2099
2100 {
2102 let mut graph = self.connection_graph.lock().unwrap();
2103 graph.on_connected(node_id, now_ms);
2104 }
2105
2106 self.register_peer_for_delta(&node_id);
2108
2109 self.notify(PeatEvent::PeerConnected { node_id });
2110 self.notify_mesh_state_changed();
2111 Some(node_id)
2112 }
2113
2114 pub fn on_ble_disconnected(
2116 &self,
2117 identifier: &str,
2118 reason: DisconnectReason,
2119 ) -> Option<NodeId> {
2120 let (node_id, observer_reason) = self.peer_manager.on_disconnected(identifier, reason)?;
2121
2122 {
2124 let mut graph = self.connection_graph.lock().unwrap();
2125 let platform_reason = match observer_reason {
2126 DisconnectReason::LocalRequest => crate::platform::DisconnectReason::LocalRequest,
2127 DisconnectReason::RemoteRequest => crate::platform::DisconnectReason::RemoteRequest,
2128 DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2129 DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2130 DisconnectReason::ConnectionFailed => {
2131 crate::platform::DisconnectReason::ConnectionFailed
2132 }
2133 DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2134 };
2135 let now_ms = std::time::SystemTime::now()
2136 .duration_since(std::time::UNIX_EPOCH)
2137 .map(|d| d.as_millis() as u64)
2138 .unwrap_or(0);
2139 graph.on_disconnected(node_id, platform_reason, now_ms);
2140
2141 graph.remove_via_peer(node_id);
2144 }
2145
2146 self.unregister_peer_for_delta(&node_id);
2148
2149 self.notify(PeatEvent::PeerDisconnected {
2150 node_id,
2151 reason: observer_reason,
2152 });
2153 self.notify_mesh_state_changed();
2154 Some(node_id)
2155 }
2156
2157 pub fn on_peer_disconnected(&self, node_id: NodeId, reason: DisconnectReason) {
2161 if self
2162 .peer_manager
2163 .on_disconnected_by_node_id(node_id, reason)
2164 {
2165 {
2167 let mut graph = self.connection_graph.lock().unwrap();
2168 let platform_reason = match reason {
2169 DisconnectReason::LocalRequest => {
2170 crate::platform::DisconnectReason::LocalRequest
2171 }
2172 DisconnectReason::RemoteRequest => {
2173 crate::platform::DisconnectReason::RemoteRequest
2174 }
2175 DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2176 DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2177 DisconnectReason::ConnectionFailed => {
2178 crate::platform::DisconnectReason::ConnectionFailed
2179 }
2180 DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2181 };
2182 let now_ms = std::time::SystemTime::now()
2183 .duration_since(std::time::UNIX_EPOCH)
2184 .map(|d| d.as_millis() as u64)
2185 .unwrap_or(0);
2186 graph.on_disconnected(node_id, platform_reason, now_ms);
2187
2188 graph.remove_via_peer(node_id);
2190 }
2191
2192 self.unregister_peer_for_delta(&node_id);
2194
2195 self.notify(PeatEvent::PeerDisconnected { node_id, reason });
2196 self.notify_mesh_state_changed();
2197 }
2198 }
2199
2200 pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
2204 let is_new = self
2205 .peer_manager
2206 .on_incoming_connection(identifier, node_id, now_ms);
2207
2208 {
2210 let mut graph = self.connection_graph.lock().unwrap();
2211 if is_new {
2212 graph.on_discovered(
2213 node_id,
2214 identifier.to_string(),
2215 None,
2216 Some(self.config.mesh_id.clone()),
2217 -50, now_ms,
2219 );
2220 }
2221 graph.on_connected(node_id, now_ms);
2222 }
2223
2224 self.register_peer_for_delta(&node_id);
2226
2227 if is_new {
2228 if let Some(peer) = self.peer_manager.get_peer(node_id) {
2229 self.notify(PeatEvent::PeerDiscovered { peer });
2230 }
2231 }
2232
2233 self.notify(PeatEvent::PeerConnected { node_id });
2234 self.notify_mesh_state_changed();
2235
2236 is_new
2237 }
2238
2239 pub fn on_ble_data_received(
2246 &self,
2247 identifier: &str,
2248 data: &[u8],
2249 now_ms: u64,
2250 ) -> Option<DataReceivedResult> {
2251 let node_id = self.peer_manager.get_node_id(identifier)?;
2253
2254 if data.len() >= 2 {
2256 match data[0] {
2257 KEY_EXCHANGE_MARKER => {
2258 let _response = self.handle_key_exchange(data, now_ms);
2260 return None;
2262 }
2263 PEER_E2EE_MARKER => {
2264 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2266 return None;
2268 }
2269 RELAY_ENVELOPE_MARKER => {
2270 return self
2272 .handle_relay_envelope_with_identifier(node_id, identifier, data, now_ms);
2273 }
2274 _ => {}
2275 }
2276 }
2277
2278 self.process_document_data_with_identifier(node_id, identifier, data, now_ms, None, None, 0)
2280 }
2281
2282 #[allow(clippy::too_many_arguments)]
2284 fn process_document_data_with_identifier(
2285 &self,
2286 source_node: NodeId,
2287 identifier: &str,
2288 data: &[u8],
2289 now_ms: u64,
2290 relay_data: Option<Vec<u8>>,
2291 origin_node: Option<NodeId>,
2292 hop_count: u8,
2293 ) -> Option<DataReceivedResult> {
2294 let decrypted = self.decrypt_document(data, Some(identifier))?;
2296
2297 if DeltaDocument::is_delta_document(&decrypted) {
2299 return self.process_delta_document_internal(
2300 source_node,
2301 &decrypted,
2302 now_ms,
2303 relay_data,
2304 origin_node,
2305 hop_count,
2306 );
2307 }
2308
2309 let result = self.document_sync.merge_document(&decrypted)?;
2311
2312 if let Some(ref peripheral) = result.peer_peripheral {
2314 if let Ok(mut peripherals) = self.peer_peripherals.write() {
2315 peripherals.insert(result.source_node, peripheral.clone());
2316 }
2317 }
2318
2319 self.peer_manager.record_sync(source_node, now_ms);
2321
2322 if result.is_emergency() {
2324 self.notify(PeatEvent::EmergencyReceived {
2325 from_node: result.source_node,
2326 });
2327 } else if result.is_ack() {
2328 self.notify(PeatEvent::AckReceived {
2329 from_node: result.source_node,
2330 });
2331 }
2332
2333 if result.counter_changed {
2334 self.notify(PeatEvent::DocumentSynced {
2335 from_node: result.source_node,
2336 total_count: result.total_count,
2337 });
2338 }
2339
2340 if relay_data.is_some() {
2342 let relay_targets = self.get_relay_targets(Some(source_node));
2343 self.notify(PeatEvent::MessageRelayed {
2344 origin_node: origin_node.unwrap_or(result.source_node),
2345 relay_count: relay_targets.len(),
2346 hop_count,
2347 });
2348 }
2349
2350 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2351 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2352
2353 Some(DataReceivedResult {
2354 source_node: result.source_node,
2355 is_emergency: result.is_emergency(),
2356 is_ack: result.is_ack(),
2357 counter_changed: result.counter_changed,
2358 emergency_changed: result.emergency_changed,
2359 total_count: result.total_count,
2360 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2361 relay_data,
2362 origin_node,
2363 hop_count,
2364 callsign,
2365 battery_percent,
2366 heart_rate,
2367 event_type,
2368 latitude,
2369 longitude,
2370 altitude,
2371 })
2372 }
2373
2374 fn handle_relay_envelope_with_identifier(
2376 &self,
2377 source_node: NodeId,
2378 identifier: &str,
2379 data: &[u8],
2380 now_ms: u64,
2381 ) -> Option<DataReceivedResult> {
2382 let envelope = RelayEnvelope::decode(data)?;
2384
2385 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2387 let stats = self
2388 .seen_cache
2389 .lock()
2390 .unwrap()
2391 .get_stats(&envelope.message_id);
2392 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2393
2394 self.notify(PeatEvent::DuplicateMessageDropped {
2395 origin_node: envelope.origin_node,
2396 seen_count,
2397 });
2398 return None;
2399 }
2400
2401 let relay_data = if envelope.can_relay() && self.config.enable_relay {
2403 envelope.relay().map(|e| e.encode())
2404 } else {
2405 if !envelope.can_relay() {
2406 self.notify(PeatEvent::MessageTtlExpired {
2407 origin_node: envelope.origin_node,
2408 hop_count: envelope.hop_count,
2409 });
2410 }
2411 None
2412 };
2413
2414 self.process_document_data_with_identifier(
2416 source_node,
2417 identifier,
2418 &envelope.payload,
2419 now_ms,
2420 relay_data,
2421 Some(envelope.origin_node),
2422 envelope.hop_count,
2423 )
2424 }
2425
2426 pub fn on_ble_data_received_from_node(
2433 &self,
2434 node_id: NodeId,
2435 data: &[u8],
2436 now_ms: u64,
2437 ) -> Option<DataReceivedResult> {
2438 if data.len() >= 2 {
2440 match data[0] {
2441 KEY_EXCHANGE_MARKER => {
2442 let _response = self.handle_key_exchange(data, now_ms);
2443 return None;
2444 }
2445 PEER_E2EE_MARKER => {
2446 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2447 return None;
2448 }
2449 RELAY_ENVELOPE_MARKER => {
2450 return self.handle_relay_envelope(node_id, data, now_ms);
2452 }
2453 _ => {}
2454 }
2455 }
2456
2457 self.process_document_data(node_id, data, now_ms, None, None, 0)
2459 }
2460
2461 pub fn on_ble_data_received_anonymous(
2471 &self,
2472 identifier: &str,
2473 data: &[u8],
2474 now_ms: u64,
2475 ) -> Option<DataReceivedResult> {
2476 log::debug!(
2477 "on_ble_data_received_anonymous: identifier={}, len={}, marker=0x{:02X}",
2478 identifier,
2479 data.len(),
2480 data.first().copied().unwrap_or(0)
2481 );
2482
2483 let decrypted = match self.decrypt_document(data, Some(identifier)) {
2485 Some(d) => d,
2486 None => {
2487 log::warn!(
2488 "on_ble_data_received_anonymous: decrypt/parse FAILED for {} byte doc from {}",
2489 data.len(),
2490 identifier
2491 );
2492 return None;
2493 }
2494 };
2495
2496 if decrypted.len() < 8 {
2499 log::warn!("Decrypted document too short to extract source_node");
2500 return None;
2501 }
2502
2503 let source_node_u32 =
2504 u32::from_le_bytes([decrypted[4], decrypted[5], decrypted[6], decrypted[7]]);
2505 let source_node = NodeId::new(source_node_u32);
2506
2507 log::info!(
2508 "Anonymous document from {}: source_node={:08X}, len={}",
2509 identifier,
2510 source_node_u32,
2511 decrypted.len()
2512 );
2513
2514 self.peer_manager
2517 .register_identifier(identifier, source_node);
2518
2519 let is_delta = DeltaDocument::is_delta_document(&decrypted);
2521 log::info!(
2522 "Document format: delta={}, first_byte=0x{:02X}, len={}",
2523 is_delta,
2524 decrypted.first().copied().unwrap_or(0),
2525 decrypted.len()
2526 );
2527
2528 if is_delta {
2529 return self.process_delta_document_internal(
2530 source_node,
2531 &decrypted,
2532 now_ms,
2533 None,
2534 None,
2535 0,
2536 );
2537 }
2538
2539 const APP_LAYER_MARKER: u8 = 0xAF;
2543 if decrypted.first().copied() == Some(APP_LAYER_MARKER) {
2544 log::debug!(
2545 "App-layer message (0xAF) from {:08X}, {} bytes - passing to relay",
2546 source_node.as_u32(),
2547 decrypted.len()
2548 );
2549 return Some(DataReceivedResult {
2550 source_node,
2551 is_emergency: false,
2552 is_ack: false,
2553 counter_changed: false,
2554 emergency_changed: false,
2555 total_count: 0,
2556 event_timestamp: now_ms,
2557 relay_data: Some(decrypted.to_vec()),
2558 origin_node: None,
2559 hop_count: 0,
2560 callsign: None,
2561 battery_percent: None,
2562 heart_rate: None,
2563 event_type: None,
2564 latitude: None,
2565 longitude: None,
2566 altitude: None,
2567 });
2568 }
2569
2570 log::info!(
2572 "Processing legacy document from {:08X}",
2573 source_node.as_u32()
2574 );
2575 let result = self.document_sync.merge_document(&decrypted)?;
2576
2577 log::info!(
2579 "Merge result: peer_peripheral={}, counter_changed={}",
2580 result.peer_peripheral.is_some(),
2581 result.counter_changed
2582 );
2583 if let Some(ref p) = result.peer_peripheral {
2584 log::info!("Peripheral callsign: '{}'", p.callsign_str());
2585 }
2586
2587 self.peer_manager.record_sync(source_node, now_ms);
2589
2590 if result.is_emergency() {
2592 self.notify(PeatEvent::EmergencyReceived {
2593 from_node: result.source_node,
2594 });
2595 } else if result.is_ack() {
2596 self.notify(PeatEvent::AckReceived {
2597 from_node: result.source_node,
2598 });
2599 }
2600
2601 if result.counter_changed {
2602 self.notify(PeatEvent::DocumentSynced {
2603 from_node: result.source_node,
2604 total_count: result.total_count,
2605 });
2606 }
2607
2608 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2609 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2610
2611 Some(DataReceivedResult {
2612 source_node: result.source_node,
2613 is_emergency: result.is_emergency(),
2614 is_ack: result.is_ack(),
2615 counter_changed: result.counter_changed,
2616 emergency_changed: result.emergency_changed,
2617 total_count: result.total_count,
2618 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2619 relay_data: None,
2620 origin_node: None,
2621 hop_count: 0,
2622 callsign,
2623 battery_percent,
2624 heart_rate,
2625 event_type,
2626 latitude,
2627 longitude,
2628 altitude,
2629 })
2630 }
2631
2632 fn process_document_data(
2634 &self,
2635 source_node: NodeId,
2636 data: &[u8],
2637 now_ms: u64,
2638 relay_data: Option<Vec<u8>>,
2639 origin_node: Option<NodeId>,
2640 hop_count: u8,
2641 ) -> Option<DataReceivedResult> {
2642 let source_hint = format!("node:{:08X}", source_node.as_u32());
2644 let decrypted = self.decrypt_document(data, Some(&source_hint))?;
2645
2646 if DeltaDocument::is_delta_document(&decrypted) {
2648 return self.process_delta_document_internal(
2649 source_node,
2650 &decrypted,
2651 now_ms,
2652 relay_data,
2653 origin_node,
2654 hop_count,
2655 );
2656 }
2657
2658 let result = self.document_sync.merge_document(&decrypted)?;
2660
2661 if let Some(ref peripheral) = result.peer_peripheral {
2663 if let Ok(mut peripherals) = self.peer_peripherals.write() {
2664 peripherals.insert(result.source_node, peripheral.clone());
2665 }
2666 }
2667
2668 self.peer_manager.record_sync(source_node, now_ms);
2670
2671 if result.is_emergency() {
2673 self.notify(PeatEvent::EmergencyReceived {
2674 from_node: result.source_node,
2675 });
2676 } else if result.is_ack() {
2677 self.notify(PeatEvent::AckReceived {
2678 from_node: result.source_node,
2679 });
2680 }
2681
2682 if result.counter_changed {
2683 self.notify(PeatEvent::DocumentSynced {
2684 from_node: result.source_node,
2685 total_count: result.total_count,
2686 });
2687 }
2688
2689 if relay_data.is_some() {
2691 let relay_targets = self.get_relay_targets(Some(source_node));
2692 self.notify(PeatEvent::MessageRelayed {
2693 origin_node: origin_node.unwrap_or(result.source_node),
2694 relay_count: relay_targets.len(),
2695 hop_count,
2696 });
2697 }
2698
2699 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2700 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2701
2702 Some(DataReceivedResult {
2703 source_node: result.source_node,
2704 is_emergency: result.is_emergency(),
2705 is_ack: result.is_ack(),
2706 counter_changed: result.counter_changed,
2707 emergency_changed: result.emergency_changed,
2708 total_count: result.total_count,
2709 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2710 relay_data,
2711 origin_node,
2712 hop_count,
2713 callsign,
2714 battery_percent,
2715 heart_rate,
2716 event_type,
2717 latitude,
2718 longitude,
2719 altitude,
2720 })
2721 }
2722
2723 fn handle_relay_envelope(
2725 &self,
2726 source_node: NodeId,
2727 data: &[u8],
2728 now_ms: u64,
2729 ) -> Option<DataReceivedResult> {
2730 let decision = self.process_relay_envelope(data, source_node, now_ms)?;
2732
2733 let relay_data = if decision.should_relay {
2735 decision.relay_data()
2736 } else {
2737 None
2738 };
2739
2740 self.process_document_data(
2742 source_node,
2743 &decision.payload,
2744 now_ms,
2745 relay_data,
2746 Some(decision.origin_node),
2747 decision.hop_count,
2748 )
2749 }
2750
2751 pub fn on_ble_data(
2760 &self,
2761 identifier: &str,
2762 data: &[u8],
2763 now_ms: u64,
2764 ) -> Option<DataReceivedResult> {
2765 if data.len() >= 2 {
2767 match data[0] {
2768 KEY_EXCHANGE_MARKER => {
2769 let _response = self.handle_key_exchange(data, now_ms);
2770 return None;
2771 }
2772 PEER_E2EE_MARKER => {
2773 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2774 return None;
2775 }
2776 RELAY_ENVELOPE_MARKER => {
2777 return self.handle_relay_envelope_with_incoming(identifier, data, now_ms);
2779 }
2780 _ => {}
2781 }
2782 }
2783
2784 self.process_incoming_document(identifier, data, now_ms, None, None, 0)
2786 }
2787
2788 fn process_incoming_document(
2790 &self,
2791 identifier: &str,
2792 data: &[u8],
2793 now_ms: u64,
2794 relay_data: Option<Vec<u8>>,
2795 origin_node: Option<NodeId>,
2796 hop_count: u8,
2797 ) -> Option<DataReceivedResult> {
2798 let decrypted = self.decrypt_document(data, Some(identifier))?;
2800
2801 let result = self.document_sync.merge_document(&decrypted)?;
2803
2804 self.peer_manager.record_sync(result.source_node, now_ms);
2806
2807 if origin_node.is_none() {
2812 let is_new =
2814 self.peer_manager
2815 .on_incoming_connection(identifier, result.source_node, now_ms);
2816
2817 {
2819 let mut graph = self.connection_graph.lock().unwrap();
2820 if is_new {
2821 graph.on_discovered(
2822 result.source_node,
2823 identifier.to_string(),
2824 None,
2825 Some(self.config.mesh_id.clone()),
2826 -50, now_ms,
2828 );
2829 }
2830 graph.on_connected(result.source_node, now_ms);
2831 }
2832 }
2833
2834 if result.is_emergency() {
2836 self.notify(PeatEvent::EmergencyReceived {
2837 from_node: result.source_node,
2838 });
2839 } else if result.is_ack() {
2840 self.notify(PeatEvent::AckReceived {
2841 from_node: result.source_node,
2842 });
2843 }
2844
2845 if result.counter_changed {
2846 self.notify(PeatEvent::DocumentSynced {
2847 from_node: result.source_node,
2848 total_count: result.total_count,
2849 });
2850 }
2851
2852 if relay_data.is_some() {
2854 let relay_targets = self.get_relay_targets(Some(result.source_node));
2855 self.notify(PeatEvent::MessageRelayed {
2856 origin_node: origin_node.unwrap_or(result.source_node),
2857 relay_count: relay_targets.len(),
2858 hop_count,
2859 });
2860 }
2861
2862 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2863 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2864
2865 Some(DataReceivedResult {
2866 source_node: result.source_node,
2867 is_emergency: result.is_emergency(),
2868 is_ack: result.is_ack(),
2869 counter_changed: result.counter_changed,
2870 emergency_changed: result.emergency_changed,
2871 total_count: result.total_count,
2872 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2873 relay_data,
2874 origin_node,
2875 hop_count,
2876 callsign,
2877 battery_percent,
2878 heart_rate,
2879 event_type,
2880 latitude,
2881 longitude,
2882 altitude,
2883 })
2884 }
2885
2886 fn handle_relay_envelope_with_incoming(
2888 &self,
2889 identifier: &str,
2890 data: &[u8],
2891 now_ms: u64,
2892 ) -> Option<DataReceivedResult> {
2893 let envelope = RelayEnvelope::decode(data)?;
2895
2896 if let Some(source_peer) = self.peer_manager.get_node_id(identifier) {
2899 if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
2900 let is_new = self.connection_graph.lock().unwrap().on_relay_received(
2901 source_peer,
2902 envelope.origin_node,
2903 envelope.hop_count,
2904 now_ms,
2905 );
2906
2907 if is_new {
2908 log::debug!(
2909 "Discovered indirect peer {:08X} via {:08X} ({} hops)",
2910 envelope.origin_node.as_u32(),
2911 source_peer.as_u32(),
2912 envelope.hop_count
2913 );
2914 }
2915 }
2916 }
2917
2918 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2920 let stats = self
2922 .seen_cache
2923 .lock()
2924 .unwrap()
2925 .get_stats(&envelope.message_id);
2926 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2927
2928 self.notify(PeatEvent::DuplicateMessageDropped {
2929 origin_node: envelope.origin_node,
2930 seen_count,
2931 });
2932 return None;
2933 }
2934
2935 let (should_relay, relay_data) = if envelope.can_relay() && self.config.enable_relay {
2937 let relay_env = envelope.relay();
2938 (true, relay_env.map(|e| e.encode()))
2939 } else {
2940 if !envelope.can_relay() {
2941 self.notify(PeatEvent::MessageTtlExpired {
2942 origin_node: envelope.origin_node,
2943 hop_count: envelope.hop_count,
2944 });
2945 }
2946 (false, None)
2947 };
2948
2949 self.process_incoming_document(
2951 identifier,
2952 &envelope.payload,
2953 now_ms,
2954 if should_relay { relay_data } else { None },
2955 Some(envelope.origin_node),
2956 envelope.hop_count,
2957 )
2958 }
2959
2960 pub fn tick(&self, now_ms: u64) -> Option<Vec<u8>> {
2970 use std::sync::atomic::Ordering;
2971
2972 let now_ms_32 = now_ms as u32;
2974
2975 let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
2977 let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
2978 if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
2979 self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
2980 let removed = self.peer_manager.cleanup_stale(now_ms);
2981 for node_id in &removed {
2982 self.notify(PeatEvent::PeerLost { node_id: *node_id });
2983 }
2984 if !removed.is_empty() {
2985 self.notify_mesh_state_changed();
2986 }
2987
2988 {
2990 let mut graph = self.connection_graph.lock().unwrap();
2991 let newly_lost = graph.tick(now_ms);
2992 graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
2994 drop(graph);
2995
2996 for node_id in newly_lost {
2999 if !removed.contains(&node_id) {
3001 self.notify(PeatEvent::PeerLost { node_id });
3002 }
3003 }
3004 }
3005 }
3006
3007 let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3009 let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3010 if sync_elapsed >= self.config.sync_interval_ms as u32 {
3011 self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3012 if self.peer_manager.connected_count() > 0 {
3014 let doc = self.document_sync.build_document();
3015 return Some(self.encrypt_document(&doc));
3016 }
3017 }
3018
3019 None
3020 }
3021
3022 pub fn tick_with_peer_deltas(&self, now_ms: u64) -> Vec<(NodeId, Vec<u8>)> {
3031 use std::sync::atomic::Ordering;
3032 let now_ms_32 = now_ms as u32;
3033
3034 let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3036 let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3037 if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3038 self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3039 let removed = self.peer_manager.cleanup_stale(now_ms);
3040 for node_id in &removed {
3041 self.notify(PeatEvent::PeerLost { node_id: *node_id });
3042 }
3043 if !removed.is_empty() {
3044 self.notify_mesh_state_changed();
3045 }
3046
3047 {
3049 let mut graph = self.connection_graph.lock().unwrap();
3050 let newly_lost = graph.tick(now_ms);
3051 graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3052 drop(graph);
3053
3054 for node_id in newly_lost {
3055 if !removed.contains(&node_id) {
3056 self.notify(PeatEvent::PeerLost { node_id });
3057 }
3058 }
3059 }
3060 }
3061
3062 let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3064 let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3065 if sync_elapsed >= self.config.sync_interval_ms as u32 {
3066 self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3067
3068 let doc = self.document_sync.build_document();
3070 let encrypted = self.encrypt_document(&doc);
3071 let mut results = Vec::new();
3072 for peer in self.get_connected_peers() {
3073 results.push((peer.node_id, encrypted.clone()));
3074 }
3075 return results;
3076 }
3077
3078 Vec::new()
3079 }
3080
3081 pub fn get_peers(&self) -> Vec<PeatPeer> {
3085 self.peer_manager.get_peers()
3086 }
3087
3088 pub fn get_connected_peers(&self) -> Vec<PeatPeer> {
3090 self.peer_manager.get_connected_peers()
3091 }
3092
3093 pub fn get_peer(&self, node_id: NodeId) -> Option<PeatPeer> {
3095 self.peer_manager.get_peer(node_id)
3096 }
3097
3098 pub fn peer_count(&self) -> usize {
3100 self.peer_manager.peer_count()
3101 }
3102
3103 pub fn connected_count(&self) -> usize {
3105 self.peer_manager.connected_count()
3106 }
3107
3108 pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
3110 self.peer_manager.matches_mesh(device_mesh_id)
3111 }
3112
3113 pub fn get_connection_graph(&self) -> Vec<PeerConnectionState> {
3137 self.connection_graph.lock().unwrap().get_all_owned()
3138 }
3139
3140 pub fn get_peer_connection_state(&self, node_id: NodeId) -> Option<PeerConnectionState> {
3142 self.connection_graph
3143 .lock()
3144 .unwrap()
3145 .get_peer(node_id)
3146 .cloned()
3147 }
3148
3149 pub fn get_connected_states(&self) -> Vec<PeerConnectionState> {
3151 self.connection_graph
3152 .lock()
3153 .unwrap()
3154 .get_connected()
3155 .into_iter()
3156 .cloned()
3157 .collect()
3158 }
3159
3160 pub fn get_degraded_peers(&self) -> Vec<PeerConnectionState> {
3162 self.connection_graph
3163 .lock()
3164 .unwrap()
3165 .get_degraded()
3166 .into_iter()
3167 .cloned()
3168 .collect()
3169 }
3170
3171 pub fn get_recently_disconnected(
3175 &self,
3176 within_ms: u64,
3177 now_ms: u64,
3178 ) -> Vec<PeerConnectionState> {
3179 self.connection_graph
3180 .lock()
3181 .unwrap()
3182 .get_recently_disconnected(within_ms, now_ms)
3183 .into_iter()
3184 .cloned()
3185 .collect()
3186 }
3187
3188 pub fn get_lost_peers(&self) -> Vec<PeerConnectionState> {
3190 self.connection_graph
3191 .lock()
3192 .unwrap()
3193 .get_lost()
3194 .into_iter()
3195 .cloned()
3196 .collect()
3197 }
3198
3199 pub fn get_connection_state_counts(&self) -> StateCountSummary {
3201 self.connection_graph.lock().unwrap().state_counts()
3202 }
3203
3204 pub fn get_indirect_peers(&self) -> Vec<IndirectPeer> {
3212 self.connection_graph
3213 .lock()
3214 .unwrap()
3215 .get_indirect_peers_owned()
3216 }
3217
3218 pub fn get_peer_degree(&self, node_id: NodeId) -> Option<PeerDegree> {
3225 self.connection_graph.lock().unwrap().peer_degree(node_id)
3226 }
3227
3228 pub fn get_full_state_counts(&self) -> FullStateCountSummary {
3233 self.connection_graph.lock().unwrap().full_state_counts()
3234 }
3235
3236 pub fn get_paths_to_peer(&self, node_id: NodeId) -> Vec<(NodeId, u8)> {
3241 self.connection_graph.lock().unwrap().get_paths_to(node_id)
3242 }
3243
3244 pub fn is_peer_known(&self, node_id: NodeId) -> bool {
3246 self.connection_graph.lock().unwrap().is_known(node_id)
3247 }
3248
3249 pub fn indirect_peer_count(&self) -> usize {
3251 self.connection_graph.lock().unwrap().indirect_peer_count()
3252 }
3253
3254 pub fn cleanup_indirect_peers(&self, now_ms: u64) -> Vec<NodeId> {
3259 self.connection_graph
3260 .lock()
3261 .unwrap()
3262 .cleanup_indirect(now_ms)
3263 }
3264
3265 pub fn total_count(&self) -> u64 {
3267 self.document_sync.total_count()
3268 }
3269
3270 pub fn document_version(&self) -> u32 {
3272 self.document_sync.version()
3273 }
3274
3275 pub fn version(&self) -> u32 {
3277 self.document_sync.version()
3278 }
3279
3280 pub fn update_health(&self, battery_percent: u8) {
3282 self.document_sync.update_health(battery_percent);
3283 }
3284
3285 pub fn update_activity(&self, activity: u8) {
3287 self.document_sync.update_activity(activity);
3288 }
3289
3290 pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
3292 self.document_sync
3293 .update_health_full(battery_percent, activity);
3294 }
3295
3296 pub fn update_heart_rate(&self, heart_rate: u8) {
3298 self.document_sync.update_heart_rate(heart_rate);
3299 }
3300
3301 pub fn update_location(&self, latitude: f32, longitude: f32, altitude: Option<f32>) {
3303 self.document_sync
3304 .update_location(latitude, longitude, altitude);
3305 }
3306
3307 pub fn clear_location(&self) {
3309 self.document_sync.clear_location();
3310 }
3311
3312 pub fn update_callsign(&self, callsign: &str) {
3314 self.document_sync.update_callsign(callsign);
3315 }
3316
3317 pub fn set_peripheral_event(&self, event_type: EventType, timestamp: u64) {
3319 self.document_sync
3320 .set_peripheral_event(event_type, timestamp);
3321 }
3322
3323 pub fn clear_peripheral_event(&self) {
3325 self.document_sync.clear_peripheral_event();
3326 }
3327
3328 #[allow(clippy::too_many_arguments)]
3333 pub fn update_peripheral_state(
3334 &self,
3335 callsign: &str,
3336 battery_percent: u8,
3337 heart_rate: Option<u8>,
3338 latitude: Option<f32>,
3339 longitude: Option<f32>,
3340 altitude: Option<f32>,
3341 event_type: Option<EventType>,
3342 timestamp: u64,
3343 ) {
3344 self.document_sync.update_peripheral_state(
3345 callsign,
3346 battery_percent,
3347 heart_rate,
3348 latitude,
3349 longitude,
3350 altitude,
3351 event_type,
3352 timestamp,
3353 );
3354 }
3355
3356 pub fn build_document(&self) -> Vec<u8> {
3360 let doc = self.document_sync.build_document();
3361 self.encrypt_document(&doc)
3362 }
3363
3364 pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<PeatPeer> {
3366 self.peer_manager.peers_needing_sync(now_ms)
3367 }
3368
3369 fn notify(&self, event: PeatEvent) {
3372 self.observers.notify(event);
3373 }
3374
3375 fn notify_mesh_state_changed(&self) {
3376 self.notify(PeatEvent::MeshStateChanged {
3377 peer_count: self.peer_manager.peer_count(),
3378 connected_count: self.peer_manager.connected_count(),
3379 });
3380 }
3381
3382 pub fn check_canned_message(&self, source_node: u32, timestamp: u64, _ttl_ms: u64) -> bool {
3402 let mut id_bytes = [0u8; 16];
3405 id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3406 id_bytes[4..12].copy_from_slice(×tamp.to_le_bytes());
3407 let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3408
3409 let seen = self.seen_cache.lock().unwrap();
3411 !seen.has_seen(&message_id)
3412 }
3413
3414 pub fn mark_canned_message_seen(&self, source_node: u32, timestamp: u64) {
3419 let now = std::time::SystemTime::now()
3420 .duration_since(std::time::UNIX_EPOCH)
3421 .map(|d| d.as_millis() as u64)
3422 .unwrap_or(0);
3423
3424 let mut id_bytes = [0u8; 16];
3426 id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3427 id_bytes[4..12].copy_from_slice(×tamp.to_le_bytes());
3428 let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3429 let origin = NodeId::new(source_node);
3430
3431 let mut seen = self.seen_cache.lock().unwrap();
3432 seen.mark_seen(message_id, origin, now);
3433 }
3434
3435 pub fn get_connected_peer_identifiers(&self) -> Vec<String> {
3440 self.peer_manager.get_connected_identifiers()
3441 }
3442}
3443
3444#[derive(Debug, Clone)]
3446pub struct DataReceivedResult {
3447 pub source_node: NodeId,
3449
3450 pub is_emergency: bool,
3452
3453 pub is_ack: bool,
3455
3456 pub counter_changed: bool,
3458
3459 pub emergency_changed: bool,
3461
3462 pub total_count: u64,
3464
3465 pub event_timestamp: u64,
3467
3468 pub relay_data: Option<Vec<u8>>,
3473
3474 pub origin_node: Option<NodeId>,
3476
3477 pub hop_count: u8,
3479
3480 pub callsign: Option<String>,
3483
3484 pub battery_percent: Option<u8>,
3486
3487 pub heart_rate: Option<u8>,
3489
3490 pub event_type: Option<u8>,
3492
3493 pub latitude: Option<f32>,
3495
3496 pub longitude: Option<f32>,
3498
3499 pub altitude: Option<f32>,
3501}
3502
3503impl DataReceivedResult {
3504 #[allow(clippy::type_complexity)]
3506 fn peripheral_fields(
3507 peripheral: &Option<crate::sync::crdt::Peripheral>,
3508 ) -> (
3509 Option<String>,
3510 Option<u8>,
3511 Option<u8>,
3512 Option<u8>,
3513 Option<f32>,
3514 Option<f32>,
3515 Option<f32>,
3516 ) {
3517 match peripheral {
3518 Some(p) => {
3519 let callsign = {
3520 let s = p.callsign_str();
3521 if s.is_empty() {
3522 None
3523 } else {
3524 Some(s.to_string())
3525 }
3526 };
3527 let battery = if p.health.battery_percent > 0 {
3528 Some(p.health.battery_percent)
3529 } else {
3530 None
3531 };
3532 let heart_rate = p.health.heart_rate;
3533 let event_type = p.last_event.as_ref().map(|e| e.event_type as u8);
3534 let (lat, lon, alt) = match &p.location {
3535 Some(loc) => (Some(loc.latitude), Some(loc.longitude), loc.altitude),
3536 None => (None, None, None),
3537 };
3538 (callsign, battery, heart_rate, event_type, lat, lon, alt)
3539 }
3540 None => (None, None, None, None, None, None, None),
3541 }
3542 }
3543}
3544
3545#[derive(Debug, Clone)]
3547pub struct RelayDecision {
3548 pub payload: Vec<u8>,
3550
3551 pub origin_node: NodeId,
3553
3554 pub hop_count: u8,
3556
3557 pub should_relay: bool,
3559
3560 pub relay_envelope: Option<RelayEnvelope>,
3564}
3565
3566impl RelayDecision {
3567 pub fn relay_data(&self) -> Option<Vec<u8>> {
3571 self.relay_envelope.as_ref().map(|e| e.encode())
3572 }
3573}
3574
3575#[cfg(all(test, feature = "std"))]
3576mod tests {
3577 use super::*;
3578 use crate::observer::CollectingObserver;
3579
3580 const TEST_TIMESTAMP: u64 = 1705276800000;
3582
3583 fn create_mesh(node_id: u32, callsign: &str) -> PeatMesh {
3584 let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST");
3585 PeatMesh::new(config)
3586 }
3587
3588 #[test]
3589 fn test_mesh_creation() {
3590 let mesh = create_mesh(0x12345678, "ALPHA-1");
3591
3592 assert_eq!(mesh.node_id().as_u32(), 0x12345678);
3593 assert_eq!(mesh.callsign(), "ALPHA-1");
3594 assert_eq!(mesh.mesh_id(), "TEST");
3595 assert_eq!(mesh.device_name(), "PEAT_TEST-12345678");
3596 }
3597
3598 #[test]
3599 fn test_peer_discovery() {
3600 let mesh = create_mesh(0x11111111, "ALPHA-1");
3601 let observer = Arc::new(CollectingObserver::new());
3602 mesh.add_observer(observer.clone());
3603
3604 let peer = mesh.on_ble_discovered(
3606 "device-uuid",
3607 Some("PEAT_TEST-22222222"),
3608 -65,
3609 Some("TEST"),
3610 1000,
3611 );
3612
3613 assert!(peer.is_some());
3614 let peer = peer.unwrap();
3615 assert_eq!(peer.node_id.as_u32(), 0x22222222);
3616
3617 let events = observer.events();
3619 assert!(events
3620 .iter()
3621 .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
3622 assert!(events
3623 .iter()
3624 .any(|e| matches!(e, PeatEvent::MeshStateChanged { .. })));
3625 }
3626
3627 #[test]
3628 fn test_connection_lifecycle() {
3629 let mesh = create_mesh(0x11111111, "ALPHA-1");
3630 let observer = Arc::new(CollectingObserver::new());
3631 mesh.add_observer(observer.clone());
3632
3633 mesh.on_ble_discovered(
3635 "device-uuid",
3636 Some("PEAT_TEST-22222222"),
3637 -65,
3638 Some("TEST"),
3639 1000,
3640 );
3641
3642 let node_id = mesh.on_ble_connected("device-uuid", 2000);
3643 assert_eq!(node_id, Some(NodeId::new(0x22222222)));
3644 assert_eq!(mesh.connected_count(), 1);
3645
3646 let node_id = mesh.on_ble_disconnected("device-uuid", DisconnectReason::RemoteRequest);
3648 assert_eq!(node_id, Some(NodeId::new(0x22222222)));
3649 assert_eq!(mesh.connected_count(), 0);
3650
3651 let events = observer.events();
3653 assert!(events
3654 .iter()
3655 .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
3656 assert!(events
3657 .iter()
3658 .any(|e| matches!(e, PeatEvent::PeerDisconnected { .. })));
3659 }
3660
3661 #[test]
3662 fn test_emergency_flow() {
3663 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3664 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3665
3666 let observer2 = Arc::new(CollectingObserver::new());
3667 mesh2.add_observer(observer2.clone());
3668
3669 let doc = mesh1.send_emergency(TEST_TIMESTAMP);
3671 assert!(mesh1.is_emergency_active());
3672
3673 let result =
3675 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3676
3677 assert!(result.is_some());
3678 let result = result.unwrap();
3679 assert!(result.is_emergency);
3680 assert_eq!(result.source_node.as_u32(), 0x11111111);
3681
3682 let events = observer2.events();
3684 assert!(events
3685 .iter()
3686 .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
3687 }
3688
3689 #[test]
3690 fn test_ack_flow() {
3691 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3692 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3693
3694 let observer2 = Arc::new(CollectingObserver::new());
3695 mesh2.add_observer(observer2.clone());
3696
3697 let doc = mesh1.send_ack(TEST_TIMESTAMP);
3699 assert!(mesh1.is_ack_active());
3700
3701 let result =
3703 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3704
3705 assert!(result.is_some());
3706 let result = result.unwrap();
3707 assert!(result.is_ack);
3708
3709 let events = observer2.events();
3711 assert!(events
3712 .iter()
3713 .any(|e| matches!(e, PeatEvent::AckReceived { .. })));
3714 }
3715
3716 #[test]
3717 fn test_tick_cleanup() {
3718 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3719 .with_peer_timeout(10_000);
3720 let mesh = PeatMesh::new(config);
3721
3722 let observer = Arc::new(CollectingObserver::new());
3723 mesh.add_observer(observer.clone());
3724
3725 mesh.on_ble_discovered(
3727 "device-uuid",
3728 Some("PEAT_TEST-22222222"),
3729 -65,
3730 Some("TEST"),
3731 1000,
3732 );
3733 assert_eq!(mesh.peer_count(), 1);
3734
3735 mesh.tick(5000);
3737 assert_eq!(mesh.peer_count(), 1);
3738
3739 mesh.tick(20000);
3741 assert_eq!(mesh.peer_count(), 0);
3742
3743 let events = observer.events();
3745 assert!(events
3746 .iter()
3747 .any(|e| matches!(e, PeatEvent::PeerLost { .. })));
3748 }
3749
3750 #[test]
3751 fn test_tick_sync_broadcast() {
3752 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3753 .with_sync_interval(5000);
3754 let mesh = PeatMesh::new(config);
3755
3756 mesh.on_ble_discovered(
3758 "device-uuid",
3759 Some("PEAT_TEST-22222222"),
3760 -65,
3761 Some("TEST"),
3762 1000,
3763 );
3764 mesh.on_ble_connected("device-uuid", 1000);
3765
3766 let _result = mesh.tick(0);
3768 let result = mesh.tick(3000);
3772 assert!(result.is_none());
3773
3774 let result = mesh.tick(6000);
3776 assert!(result.is_some());
3777
3778 let result = mesh.tick(6100);
3780 assert!(result.is_none());
3781
3782 let result = mesh.tick(12000);
3784 assert!(result.is_some());
3785 }
3786
3787 #[test]
3788 fn test_incoming_connection() {
3789 let mesh = create_mesh(0x11111111, "ALPHA-1");
3790 let observer = Arc::new(CollectingObserver::new());
3791 mesh.add_observer(observer.clone());
3792
3793 let is_new = mesh.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
3795
3796 assert!(is_new);
3797 assert_eq!(mesh.peer_count(), 1);
3798 assert_eq!(mesh.connected_count(), 1);
3799
3800 let events = observer.events();
3802 assert!(events
3803 .iter()
3804 .any(|e| matches!(e, PeatEvent::PeerDiscovered { .. })));
3805 assert!(events
3806 .iter()
3807 .any(|e| matches!(e, PeatEvent::PeerConnected { .. })));
3808 }
3809
3810 #[test]
3811 fn test_mesh_filtering() {
3812 let mesh = create_mesh(0x11111111, "ALPHA-1");
3813
3814 let peer = mesh.on_ble_discovered(
3816 "device-uuid-1",
3817 Some("PEAT_OTHER-22222222"),
3818 -65,
3819 Some("OTHER"),
3820 1000,
3821 );
3822 assert!(peer.is_none());
3823 assert_eq!(mesh.peer_count(), 0);
3824
3825 let peer = mesh.on_ble_discovered(
3827 "device-uuid-2",
3828 Some("PEAT_TEST-33333333"),
3829 -65,
3830 Some("TEST"),
3831 1000,
3832 );
3833 assert!(peer.is_some());
3834 assert_eq!(mesh.peer_count(), 1);
3835 }
3836
3837 fn create_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
3840 let config =
3841 PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_encryption(secret);
3842 PeatMesh::new(config)
3843 }
3844
3845 #[test]
3846 fn test_encryption_enabled() {
3847 let secret = [0x42u8; 32];
3848 let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3849
3850 assert!(mesh.is_encryption_enabled());
3851 }
3852
3853 #[test]
3854 fn test_encryption_disabled_by_default() {
3855 let mesh = create_mesh(0x11111111, "ALPHA-1");
3856
3857 assert!(!mesh.is_encryption_enabled());
3858 }
3859
3860 #[test]
3861 fn test_encrypted_document_exchange() {
3862 let secret = [0x42u8; 32];
3863 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3864 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
3865
3866 let doc = mesh1.build_document();
3868
3869 assert!(doc.len() >= 2);
3871 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
3872
3873 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3875
3876 assert!(result.is_some());
3877 let result = result.unwrap();
3878 assert_eq!(result.source_node.as_u32(), 0x11111111);
3879 }
3880
3881 #[test]
3882 fn test_encrypted_emergency_exchange() {
3883 let secret = [0x42u8; 32];
3884 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3885 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
3886
3887 let observer = Arc::new(CollectingObserver::new());
3888 mesh2.add_observer(observer.clone());
3889
3890 let doc = mesh1.send_emergency(TEST_TIMESTAMP);
3892
3893 let result =
3895 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3896
3897 assert!(result.is_some());
3898 let result = result.unwrap();
3899 assert!(result.is_emergency);
3900
3901 let events = observer.events();
3903 assert!(events
3904 .iter()
3905 .any(|e| matches!(e, PeatEvent::EmergencyReceived { .. })));
3906 }
3907
3908 #[test]
3909 fn test_wrong_key_fails_decrypt() {
3910 let secret1 = [0x42u8; 32];
3911 let secret2 = [0x43u8; 32]; let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
3913 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
3914
3915 let doc = mesh1.build_document();
3917
3918 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3920
3921 assert!(result.is_none());
3922 }
3923
3924 #[test]
3925 fn test_unencrypted_mesh_can_read_unencrypted() {
3926 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3927 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3928
3929 let doc = mesh1.build_document();
3931
3932 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3934
3935 assert!(result.is_some());
3936 }
3937
3938 #[test]
3939 fn test_encrypted_mesh_can_receive_unencrypted() {
3940 let secret = [0x42u8; 32];
3942 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); let doc = mesh1.build_document();
3947
3948 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3950
3951 assert!(result.is_some());
3952 }
3953
3954 #[test]
3955 fn test_unencrypted_mesh_cannot_receive_encrypted() {
3956 let secret = [0x42u8; 32];
3957 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret); let mesh2 = create_mesh(0x22222222, "BRAVO-1"); let doc = mesh1.build_document();
3962
3963 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3965
3966 assert!(result.is_none());
3967 }
3968
3969 #[test]
3970 fn test_enable_disable_encryption() {
3971 let mut mesh = create_mesh(0x11111111, "ALPHA-1");
3972
3973 assert!(!mesh.is_encryption_enabled());
3974
3975 let secret = [0x42u8; 32];
3977 mesh.enable_encryption(&secret);
3978 assert!(mesh.is_encryption_enabled());
3979
3980 let doc = mesh.build_document();
3982 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
3983
3984 mesh.disable_encryption();
3986 assert!(!mesh.is_encryption_enabled());
3987
3988 let doc = mesh.build_document();
3990 assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
3991 }
3992
3993 #[test]
3994 fn test_encryption_overhead() {
3995 let secret = [0x42u8; 32];
3996 let mesh_encrypted = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3997 let mesh_unencrypted = create_mesh(0x22222222, "BRAVO-1");
3998
3999 let doc_encrypted = mesh_encrypted.build_document();
4000 let doc_unencrypted = mesh_unencrypted.build_document();
4001
4002 let overhead = doc_encrypted.len() - doc_unencrypted.len();
4008 assert_eq!(overhead, 30); }
4010
4011 #[test]
4014 fn test_peer_e2ee_enable_disable() {
4015 let mesh = create_mesh(0x11111111, "ALPHA-1");
4016
4017 assert!(!mesh.is_peer_e2ee_enabled());
4018 assert!(mesh.peer_e2ee_public_key().is_none());
4019
4020 mesh.enable_peer_e2ee();
4021 assert!(mesh.is_peer_e2ee_enabled());
4022 assert!(mesh.peer_e2ee_public_key().is_some());
4023
4024 mesh.disable_peer_e2ee();
4025 assert!(!mesh.is_peer_e2ee_enabled());
4026 }
4027
4028 #[test]
4029 fn test_peer_e2ee_initiate_session() {
4030 let mesh = create_mesh(0x11111111, "ALPHA-1");
4031 mesh.enable_peer_e2ee();
4032
4033 let key_exchange = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4034 assert!(key_exchange.is_some());
4035
4036 let key_exchange = key_exchange.unwrap();
4037 assert_eq!(key_exchange[0], crate::document::KEY_EXCHANGE_MARKER);
4039
4040 assert_eq!(mesh.peer_e2ee_session_count(), 1);
4042 assert_eq!(mesh.peer_e2ee_established_count(), 0);
4043 }
4044
4045 #[test]
4046 fn test_peer_e2ee_full_handshake() {
4047 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4048 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4049
4050 mesh1.enable_peer_e2ee();
4051 mesh2.enable_peer_e2ee();
4052
4053 let observer1 = Arc::new(CollectingObserver::new());
4054 let observer2 = Arc::new(CollectingObserver::new());
4055 mesh1.add_observer(observer1.clone());
4056 mesh2.add_observer(observer2.clone());
4057
4058 let key_exchange1 = mesh1
4060 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4061 .unwrap();
4062
4063 let response = mesh2.handle_key_exchange(&key_exchange1, 1000);
4065 assert!(response.is_some());
4066
4067 assert!(mesh2.has_peer_e2ee_session(NodeId::new(0x11111111)));
4069
4070 let key_exchange2 = response.unwrap();
4072 let _ = mesh1.handle_key_exchange(&key_exchange2, 1000);
4073
4074 assert!(mesh1.has_peer_e2ee_session(NodeId::new(0x22222222)));
4076
4077 let events1 = observer1.events();
4079 assert!(events1
4080 .iter()
4081 .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4082
4083 let events2 = observer2.events();
4084 assert!(events2
4085 .iter()
4086 .any(|e| matches!(e, PeatEvent::PeerE2eeEstablished { .. })));
4087 }
4088
4089 #[test]
4090 fn test_peer_e2ee_encrypt_decrypt() {
4091 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4092 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4093
4094 mesh1.enable_peer_e2ee();
4095 mesh2.enable_peer_e2ee();
4096
4097 let key_exchange1 = mesh1
4099 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4100 .unwrap();
4101 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4102 mesh1.handle_key_exchange(&key_exchange2, 1000);
4103
4104 let plaintext = b"Secret message from mesh1";
4106 let encrypted = mesh1.send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000);
4107 assert!(encrypted.is_some());
4108
4109 let encrypted = encrypted.unwrap();
4110 assert_eq!(encrypted[0], crate::document::PEER_E2EE_MARKER);
4112
4113 let observer2 = Arc::new(CollectingObserver::new());
4115 mesh2.add_observer(observer2.clone());
4116
4117 let decrypted = mesh2.handle_peer_e2ee_message(&encrypted, 2000);
4118 assert!(decrypted.is_some());
4119 assert_eq!(decrypted.unwrap(), plaintext);
4120
4121 let events = observer2.events();
4123 assert!(events.iter().any(|e| matches!(
4124 e,
4125 PeatEvent::PeerE2eeMessageReceived { from_node, data }
4126 if from_node.as_u32() == 0x11111111 && data == plaintext
4127 )));
4128 }
4129
4130 #[test]
4131 fn test_peer_e2ee_bidirectional() {
4132 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4133 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4134
4135 mesh1.enable_peer_e2ee();
4136 mesh2.enable_peer_e2ee();
4137
4138 let key_exchange1 = mesh1
4140 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4141 .unwrap();
4142 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4143 mesh1.handle_key_exchange(&key_exchange2, 1000);
4144
4145 let msg1 = mesh1
4147 .send_peer_e2ee(NodeId::new(0x22222222), b"Hello from mesh1", 2000)
4148 .unwrap();
4149 let dec1 = mesh2.handle_peer_e2ee_message(&msg1, 2000).unwrap();
4150 assert_eq!(dec1, b"Hello from mesh1");
4151
4152 let msg2 = mesh2
4154 .send_peer_e2ee(NodeId::new(0x11111111), b"Hello from mesh2", 2000)
4155 .unwrap();
4156 let dec2 = mesh1.handle_peer_e2ee_message(&msg2, 2000).unwrap();
4157 assert_eq!(dec2, b"Hello from mesh2");
4158 }
4159
4160 #[test]
4161 fn test_peer_e2ee_close_session() {
4162 let mesh = create_mesh(0x11111111, "ALPHA-1");
4163 mesh.enable_peer_e2ee();
4164
4165 let observer = Arc::new(CollectingObserver::new());
4166 mesh.add_observer(observer.clone());
4167
4168 mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4170 assert_eq!(mesh.peer_e2ee_session_count(), 1);
4171
4172 mesh.close_peer_e2ee(NodeId::new(0x22222222));
4174
4175 let events = observer.events();
4177 assert!(events
4178 .iter()
4179 .any(|e| matches!(e, PeatEvent::PeerE2eeClosed { .. })));
4180 }
4181
4182 #[test]
4183 fn test_peer_e2ee_without_enabling() {
4184 let mesh = create_mesh(0x11111111, "ALPHA-1");
4185
4186 let result = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4188 assert!(result.is_none());
4189
4190 let result = mesh.send_peer_e2ee(NodeId::new(0x22222222), b"test", 1000);
4191 assert!(result.is_none());
4192
4193 assert!(!mesh.has_peer_e2ee_session(NodeId::new(0x22222222)));
4194 }
4195
4196 #[test]
4197 fn test_peer_e2ee_overhead() {
4198 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4199 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4200
4201 mesh1.enable_peer_e2ee();
4202 mesh2.enable_peer_e2ee();
4203
4204 let key_exchange1 = mesh1
4206 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4207 .unwrap();
4208 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4209 mesh1.handle_key_exchange(&key_exchange2, 1000);
4210
4211 let plaintext = b"Test message";
4213 let encrypted = mesh1
4214 .send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000)
4215 .unwrap();
4216
4217 let overhead = encrypted.len() - plaintext.len();
4226 assert_eq!(overhead, 46);
4227 }
4228
4229 fn create_strict_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> PeatMesh {
4232 let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST")
4233 .with_encryption(secret)
4234 .with_strict_encryption();
4235 PeatMesh::new(config)
4236 }
4237
4238 #[test]
4239 fn test_strict_encryption_enabled() {
4240 let secret = [0x42u8; 32];
4241 let mesh = create_strict_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4242
4243 assert!(mesh.is_encryption_enabled());
4244 assert!(mesh.is_strict_encryption_enabled());
4245 }
4246
4247 #[test]
4248 fn test_strict_encryption_disabled_by_default() {
4249 let secret = [0x42u8; 32];
4250 let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4251
4252 assert!(mesh.is_encryption_enabled());
4253 assert!(!mesh.is_strict_encryption_enabled());
4254 }
4255
4256 #[test]
4257 fn test_strict_encryption_requires_encryption_enabled() {
4258 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4260 .with_strict_encryption(); let mesh = PeatMesh::new(config);
4262
4263 assert!(!mesh.is_encryption_enabled());
4264 assert!(!mesh.is_strict_encryption_enabled());
4265 }
4266
4267 #[test]
4268 fn test_strict_mode_accepts_encrypted_documents() {
4269 let secret = [0x42u8; 32];
4270 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4271 let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4272
4273 let doc = mesh1.build_document();
4275 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4276
4277 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4279 assert!(result.is_some());
4280 }
4281
4282 #[test]
4283 fn test_strict_mode_rejects_unencrypted_documents() {
4284 let secret = [0x42u8; 32];
4285 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret); let observer = Arc::new(CollectingObserver::new());
4289 mesh2.add_observer(observer.clone());
4290
4291 let doc = mesh1.build_document();
4293 assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4294
4295 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4297 assert!(result.is_none());
4298
4299 let events = observer.events();
4301 assert!(events.iter().any(|e| matches!(
4302 e,
4303 PeatEvent::SecurityViolation {
4304 kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4305 ..
4306 }
4307 )));
4308 }
4309
4310 #[test]
4311 fn test_non_strict_mode_accepts_unencrypted_documents() {
4312 let secret = [0x42u8; 32];
4313 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); let doc = mesh1.build_document();
4318
4319 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4321 assert!(result.is_some());
4322 }
4323
4324 #[test]
4325 fn test_strict_mode_security_violation_event_includes_source() {
4326 let secret = [0x42u8; 32];
4327 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4328 let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4329
4330 let observer = Arc::new(CollectingObserver::new());
4331 mesh2.add_observer(observer.clone());
4332
4333 let doc = mesh1.build_document();
4334
4335 mesh2.on_ble_discovered(
4337 "test-device-uuid",
4338 Some("PEAT_TEST-11111111"),
4339 -65,
4340 Some("TEST"),
4341 500,
4342 );
4343 mesh2.on_ble_connected("test-device-uuid", 600);
4344
4345 let _result = mesh2.on_ble_data_received("test-device-uuid", &doc, 1000);
4346
4347 let events = observer.events();
4349 let violation = events.iter().find(|e| {
4350 matches!(
4351 e,
4352 PeatEvent::SecurityViolation {
4353 kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4354 ..
4355 }
4356 )
4357 });
4358 assert!(violation.is_some());
4359
4360 if let Some(PeatEvent::SecurityViolation { source, .. }) = violation {
4361 assert!(source.is_some());
4362 assert_eq!(source.as_ref().unwrap(), "test-device-uuid");
4363 }
4364 }
4365
4366 #[test]
4367 fn test_decryption_failure_emits_security_violation() {
4368 let secret1 = [0x42u8; 32];
4369 let secret2 = [0x43u8; 32]; let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4371 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4372
4373 let observer = Arc::new(CollectingObserver::new());
4374 mesh2.add_observer(observer.clone());
4375
4376 let doc = mesh1.build_document();
4378
4379 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4381 assert!(result.is_none());
4382
4383 let events = observer.events();
4385 assert!(events.iter().any(|e| matches!(
4386 e,
4387 PeatEvent::SecurityViolation {
4388 kind: crate::observer::SecurityViolationKind::DecryptionFailed,
4389 ..
4390 }
4391 )));
4392 }
4393
4394 #[test]
4395 fn test_strict_mode_builder_chain() {
4396 let secret = [0x42u8; 32];
4397 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4398 .with_encryption(secret)
4399 .with_strict_encryption()
4400 .with_sync_interval(10_000)
4401 .with_peer_timeout(60_000);
4402
4403 let mesh = PeatMesh::new(config);
4404
4405 assert!(mesh.is_encryption_enabled());
4406 assert!(mesh.is_strict_encryption_enabled());
4407 }
4408
4409 fn create_relay_mesh(node_id: u32, callsign: &str) -> PeatMesh {
4412 let config = PeatMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_relay();
4413 PeatMesh::new(config)
4414 }
4415
4416 #[test]
4417 fn test_relay_disabled_by_default() {
4418 let mesh = create_mesh(0x11111111, "ALPHA-1");
4419 assert!(!mesh.is_relay_enabled());
4420 }
4421
4422 #[test]
4423 fn test_relay_enabled() {
4424 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4425 assert!(mesh.is_relay_enabled());
4426 }
4427
4428 #[test]
4429 fn test_relay_config_builder() {
4430 let config = PeatMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4431 .with_relay()
4432 .with_max_relay_hops(5)
4433 .with_relay_fanout(3)
4434 .with_seen_cache_ttl(60_000);
4435
4436 assert!(config.enable_relay);
4437 assert_eq!(config.max_relay_hops, 5);
4438 assert_eq!(config.relay_fanout, 3);
4439 assert_eq!(config.seen_cache_ttl_ms, 60_000);
4440 }
4441
4442 #[test]
4443 fn test_seen_message_deduplication() {
4444 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4445 let origin = NodeId::new(0x22222222);
4446 let msg_id = crate::relay::MessageId::from_content(origin, 1000, 0xDEADBEEF);
4447
4448 assert!(mesh.mark_message_seen(msg_id, origin, 1000));
4450
4451 assert!(!mesh.mark_message_seen(msg_id, origin, 2000));
4453
4454 assert_eq!(mesh.seen_cache_size(), 1);
4455 }
4456
4457 #[test]
4458 fn test_wrap_for_relay() {
4459 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4460
4461 let payload = vec![1, 2, 3, 4, 5];
4462 let wrapped = mesh.wrap_for_relay(payload.clone());
4463
4464 assert_eq!(wrapped[0], crate::relay::RELAY_ENVELOPE_MARKER);
4466
4467 let envelope = crate::relay::RelayEnvelope::decode(&wrapped).unwrap();
4469 assert_eq!(envelope.payload, payload);
4470 assert_eq!(envelope.origin_node, NodeId::new(0x11111111));
4471 assert_eq!(envelope.hop_count, 0);
4472 }
4473
4474 #[test]
4475 fn test_process_relay_envelope_new_message() {
4476 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4477 let observer = Arc::new(CollectingObserver::new());
4478 mesh.add_observer(observer.clone());
4479
4480 let payload = vec![1, 2, 3, 4, 5];
4482 let envelope =
4483 crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4484 .with_max_hops(7);
4485 let data = envelope.encode();
4486
4487 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4489
4490 assert!(decision.is_some());
4491 let decision = decision.unwrap();
4492 assert_eq!(decision.payload, payload);
4493 assert_eq!(decision.origin_node.as_u32(), 0x22222222);
4494 assert_eq!(decision.hop_count, 0);
4495 assert!(decision.should_relay);
4496 assert!(decision.relay_envelope.is_some());
4497
4498 let relay_env = decision.relay_envelope.unwrap();
4500 assert_eq!(relay_env.hop_count, 1);
4501 }
4502
4503 #[test]
4504 fn test_process_relay_envelope_duplicate() {
4505 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4506 let observer = Arc::new(CollectingObserver::new());
4507 mesh.add_observer(observer.clone());
4508
4509 let payload = vec![1, 2, 3, 4, 5];
4510 let envelope = crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload);
4511 let data = envelope.encode();
4512
4513 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4515 assert!(decision.is_some());
4516
4517 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 2000);
4519 assert!(decision.is_none());
4520
4521 let events = observer.events();
4523 assert!(events
4524 .iter()
4525 .any(|e| matches!(e, PeatEvent::DuplicateMessageDropped { .. })));
4526 }
4527
4528 #[test]
4529 fn test_process_relay_envelope_ttl_expired() {
4530 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4531 let observer = Arc::new(CollectingObserver::new());
4532 mesh.add_observer(observer.clone());
4533
4534 let payload = vec![1, 2, 3, 4, 5];
4536 let mut envelope =
4537 crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4538 .with_max_hops(3);
4539
4540 envelope = envelope.relay().unwrap(); envelope = envelope.relay().unwrap(); envelope = envelope.relay().unwrap(); let data = envelope.encode();
4546
4547 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4549
4550 assert!(decision.is_some());
4551 let decision = decision.unwrap();
4552 assert_eq!(decision.payload, payload);
4553 assert!(!decision.should_relay); assert!(decision.relay_envelope.is_none());
4555
4556 let events = observer.events();
4558 assert!(events
4559 .iter()
4560 .any(|e| matches!(e, PeatEvent::MessageTtlExpired { .. })));
4561 }
4562
4563 #[test]
4564 fn test_build_relay_document() {
4565 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4566
4567 let relay_doc = mesh.build_relay_document();
4568
4569 assert_eq!(relay_doc[0], crate::relay::RELAY_ENVELOPE_MARKER);
4571
4572 let envelope = crate::relay::RelayEnvelope::decode(&relay_doc).unwrap();
4574 assert_eq!(envelope.origin_node.as_u32(), 0x11111111);
4575
4576 let doc = crate::document::PeatDocument::decode(&envelope.payload);
4578 assert!(doc.is_some());
4579 }
4580
4581 #[test]
4582 fn test_relay_targets_excludes_source() {
4583 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4584
4585 mesh.on_ble_discovered(
4587 "peer-1",
4588 Some("PEAT_TEST-22222222"),
4589 -60,
4590 Some("TEST"),
4591 1000,
4592 );
4593 mesh.on_ble_connected("peer-1", 1000);
4594
4595 mesh.on_ble_discovered(
4596 "peer-2",
4597 Some("PEAT_TEST-33333333"),
4598 -65,
4599 Some("TEST"),
4600 1000,
4601 );
4602 mesh.on_ble_connected("peer-2", 1000);
4603
4604 mesh.on_ble_discovered(
4605 "peer-3",
4606 Some("PEAT_TEST-44444444"),
4607 -70,
4608 Some("TEST"),
4609 1000,
4610 );
4611 mesh.on_ble_connected("peer-3", 1000);
4612
4613 let targets = mesh.get_relay_targets(Some(NodeId::new(0x33333333)));
4615
4616 assert!(targets.iter().all(|p| p.node_id.as_u32() != 0x33333333));
4618 }
4619
4620 #[test]
4621 fn test_clear_seen_cache() {
4622 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4623 let origin = NodeId::new(0x22222222);
4624
4625 mesh.mark_message_seen(
4627 crate::relay::MessageId::from_content(origin, 1000, 0x11111111),
4628 origin,
4629 1000,
4630 );
4631 mesh.mark_message_seen(
4632 crate::relay::MessageId::from_content(origin, 2000, 0x22222222),
4633 origin,
4634 2000,
4635 );
4636
4637 assert_eq!(mesh.seen_cache_size(), 2);
4638
4639 mesh.clear_seen_cache();
4641 assert_eq!(mesh.seen_cache_size(), 0);
4642 }
4643}